You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/04/20 02:32:33 UTC
[1/4] hive git commit: HIVE-16461 : DagUtils checks local resource
size on the remote fs (Sergey Shelukhin, reviewed by Siddharth Seth)
Repository: hive
Updated Branches:
refs/heads/branch-2 1542e39b8 -> cce0734de
refs/heads/branch-2.2 48002e7d4 -> 03941e353
refs/heads/branch-2.3 1d5374a9a -> 228629df7
refs/heads/master 3a5edc97e -> 4c9986b36
HIVE-16461 : DagUtils checks local resource size on the remote fs (Sergey Shelukhin, reviewed by Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4c9986b3
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4c9986b3
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4c9986b3
Branch: refs/heads/master
Commit: 4c9986b3612f5ec09cc1cd5fe3e59c1e8cc8a998
Parents: 3a5edc9
Author: sergey <se...@apache.org>
Authored: Wed Apr 19 19:31:53 2017 -0700
Committer: sergey <se...@apache.org>
Committed: Wed Apr 19 19:31:53 2017 -0700
----------------------------------------------------------------------
.../apache/hadoop/hive/ql/exec/tez/DagUtils.java | 19 ++++++++++---------
1 file changed, 10 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/4c9986b3/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index 9589d36..6497495 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -970,10 +970,9 @@ public class DagUtils {
* @return true if the file names match else returns false.
* @throws IOException when any file system related call fails
*/
- private boolean checkPreExisting(Path src, Path dest, Configuration conf)
+ private boolean checkPreExisting(FileSystem sourceFS, Path src, Path dest, Configuration conf)
throws IOException {
FileSystem destFS = dest.getFileSystem(conf);
- FileSystem sourceFS = src.getFileSystem(conf);
FileStatus destStatus = FileUtils.getFileStatusOrNull(destFS, dest);
if (destStatus != null) {
return (sourceFS.getFileStatus(src).getLen() == destStatus.getLen());
@@ -993,7 +992,9 @@ public class DagUtils {
public LocalResource localizeResource(
Path src, Path dest, LocalResourceType type, Configuration conf) throws IOException {
FileSystem destFS = dest.getFileSystem(conf);
- if (src != null && !checkPreExisting(src, dest, conf)) {
+ // We call copyFromLocal below, so we basically assume src is a local file.
+ FileSystem srcFs = FileSystem.getLocal(conf);
+ if (src != null && !checkPreExisting(srcFs, src, dest, conf)) {
// copy the src to the destination and create local resource.
// do not overwrite.
String srcStr = src.toString();
@@ -1005,7 +1006,7 @@ public class DagUtils {
// authoritative one), don't wait infinitely for the notifier, just wait a little bit
// and check HDFS before and after.
if (notifierOld != null
- && checkOrWaitForTheFile(src, dest, conf, notifierOld, 1, 150, false)) {
+ && checkOrWaitForTheFile(srcFs, src, dest, conf, notifierOld, 1, 150, false)) {
return createLocalResource(destFS, dest, type, LocalResourceVisibility.PRIVATE);
}
try {
@@ -1027,7 +1028,7 @@ public class DagUtils {
conf, HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL, TimeUnit.MILLISECONDS);
// Only log on the first wait, and check after wait on the last iteration.
if (!checkOrWaitForTheFile(
- src, dest, conf, notifierOld, waitAttempts, sleepInterval, true)) {
+ srcFs, src, dest, conf, notifierOld, waitAttempts, sleepInterval, true)) {
LOG.error("Could not find the jar that was being uploaded");
throw new IOException("Previous writer likely failed to write " + dest +
". Failing because I am unlikely to write too.");
@@ -1042,10 +1043,10 @@ public class DagUtils {
LocalResourceVisibility.PRIVATE);
}
- public boolean checkOrWaitForTheFile(Path src, Path dest, Configuration conf, Object notifier,
- int waitAttempts, long sleepInterval, boolean doLog) throws IOException {
+ public boolean checkOrWaitForTheFile(FileSystem srcFs, Path src, Path dest, Configuration conf,
+ Object notifier, int waitAttempts, long sleepInterval, boolean doLog) throws IOException {
for (int i = 0; i < waitAttempts; i++) {
- if (checkPreExisting(src, dest, conf)) return true;
+ if (checkPreExisting(srcFs, src, dest, conf)) return true;
if (doLog && i == 0) {
LOG.info("Waiting for the file " + dest + " (" + waitAttempts + " attempts, with "
+ sleepInterval + "ms interval)");
@@ -1064,7 +1065,7 @@ public class DagUtils {
throw new IOException(interruptedException);
}
}
- return checkPreExisting(src, dest, conf); // One last check.
+ return checkPreExisting(srcFs, src, dest, conf); // One last check.
}
/**
[2/4] hive git commit: HIVE-16461 : DagUtils checks local resource
size on the remote fs (Sergey Shelukhin, reviewed by Siddharth Seth)
Posted by se...@apache.org.
HIVE-16461 : DagUtils checks local resource size on the remote fs (Sergey Shelukhin, reviewed by Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cce0734d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cce0734d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cce0734d
Branch: refs/heads/branch-2
Commit: cce0734de7a13c64d885f367312d67a0061c62fd
Parents: 1542e39
Author: sergey <se...@apache.org>
Authored: Wed Apr 19 19:31:53 2017 -0700
Committer: sergey <se...@apache.org>
Committed: Wed Apr 19 19:32:04 2017 -0700
----------------------------------------------------------------------
.../apache/hadoop/hive/ql/exec/tez/DagUtils.java | 19 ++++++++++---------
1 file changed, 10 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/cce0734d/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index aa2dfc7..568df14 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -965,10 +965,9 @@ public class DagUtils {
* @return true if the file names match else returns false.
* @throws IOException when any file system related call fails
*/
- private boolean checkPreExisting(Path src, Path dest, Configuration conf)
+ private boolean checkPreExisting(FileSystem sourceFS, Path src, Path dest, Configuration conf)
throws IOException {
FileSystem destFS = dest.getFileSystem(conf);
- FileSystem sourceFS = src.getFileSystem(conf);
FileStatus destStatus = FileUtils.getFileStatusOrNull(destFS, dest);
if (destStatus != null) {
return (sourceFS.getFileStatus(src).getLen() == destStatus.getLen());
@@ -988,7 +987,9 @@ public class DagUtils {
public LocalResource localizeResource(
Path src, Path dest, LocalResourceType type, Configuration conf) throws IOException {
FileSystem destFS = dest.getFileSystem(conf);
- if (src != null && !checkPreExisting(src, dest, conf)) {
+ // We call copyFromLocal below, so we basically assume src is a local file.
+ FileSystem srcFs = FileSystem.getLocal(conf);
+ if (src != null && !checkPreExisting(srcFs, src, dest, conf)) {
// copy the src to the destination and create local resource.
// do not overwrite.
String srcStr = src.toString();
@@ -1000,7 +1001,7 @@ public class DagUtils {
// authoritative one), don't wait infinitely for the notifier, just wait a little bit
// and check HDFS before and after.
if (notifierOld != null
- && checkOrWaitForTheFile(src, dest, conf, notifierOld, 1, 150, false)) {
+ && checkOrWaitForTheFile(srcFs, src, dest, conf, notifierOld, 1, 150, false)) {
return createLocalResource(destFS, dest, type, LocalResourceVisibility.PRIVATE);
}
try {
@@ -1022,7 +1023,7 @@ public class DagUtils {
conf, HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL, TimeUnit.MILLISECONDS);
// Only log on the first wait, and check after wait on the last iteration.
if (!checkOrWaitForTheFile(
- src, dest, conf, notifierOld, waitAttempts, sleepInterval, true)) {
+ srcFs, src, dest, conf, notifierOld, waitAttempts, sleepInterval, true)) {
LOG.error("Could not find the jar that was being uploaded");
throw new IOException("Previous writer likely failed to write " + dest +
". Failing because I am unlikely to write too.");
@@ -1037,10 +1038,10 @@ public class DagUtils {
LocalResourceVisibility.PRIVATE);
}
- public boolean checkOrWaitForTheFile(Path src, Path dest, Configuration conf, Object notifier,
- int waitAttempts, long sleepInterval, boolean doLog) throws IOException {
+ public boolean checkOrWaitForTheFile(FileSystem srcFs, Path src, Path dest, Configuration conf,
+ Object notifier, int waitAttempts, long sleepInterval, boolean doLog) throws IOException {
for (int i = 0; i < waitAttempts; i++) {
- if (checkPreExisting(src, dest, conf)) return true;
+ if (checkPreExisting(srcFs, src, dest, conf)) return true;
if (doLog && i == 0) {
LOG.info("Waiting for the file " + dest + " (" + waitAttempts + " attempts, with "
+ sleepInterval + "ms interval)");
@@ -1059,7 +1060,7 @@ public class DagUtils {
throw new IOException(interruptedException);
}
}
- return checkPreExisting(src, dest, conf); // One last check.
+ return checkPreExisting(srcFs, src, dest, conf); // One last check.
}
/**
[3/4] hive git commit: HIVE-16461 : DagUtils checks local resource
size on the remote fs (Sergey Shelukhin, reviewed by Siddharth Seth)
Posted by se...@apache.org.
HIVE-16461 : DagUtils checks local resource size on the remote fs (Sergey Shelukhin, reviewed by Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/03941e35
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/03941e35
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/03941e35
Branch: refs/heads/branch-2.2
Commit: 03941e3536144adff0aed24bfbb11d7a6b257110
Parents: 48002e7
Author: sergey <se...@apache.org>
Authored: Wed Apr 19 19:31:53 2017 -0700
Committer: sergey <se...@apache.org>
Committed: Wed Apr 19 19:32:11 2017 -0700
----------------------------------------------------------------------
.../apache/hadoop/hive/ql/exec/tez/DagUtils.java | 19 ++++++++++---------
1 file changed, 10 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/03941e35/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index c6c53cd..6599d31 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -965,10 +965,9 @@ public class DagUtils {
* @return true if the file names match else returns false.
* @throws IOException when any file system related call fails
*/
- private boolean checkPreExisting(Path src, Path dest, Configuration conf)
+ private boolean checkPreExisting(FileSystem sourceFS, Path src, Path dest, Configuration conf)
throws IOException {
FileSystem destFS = dest.getFileSystem(conf);
- FileSystem sourceFS = src.getFileSystem(conf);
FileStatus destStatus = FileUtils.getFileStatusOrNull(destFS, dest);
if (destStatus != null) {
return (sourceFS.getFileStatus(src).getLen() == destStatus.getLen());
@@ -988,7 +987,9 @@ public class DagUtils {
public LocalResource localizeResource(
Path src, Path dest, LocalResourceType type, Configuration conf) throws IOException {
FileSystem destFS = dest.getFileSystem(conf);
- if (src != null && !checkPreExisting(src, dest, conf)) {
+ // We call copyFromLocal below, so we basically assume src is a local file.
+ FileSystem srcFs = FileSystem.getLocal(conf);
+ if (src != null && !checkPreExisting(srcFs, src, dest, conf)) {
// copy the src to the destination and create local resource.
// do not overwrite.
String srcStr = src.toString();
@@ -1000,7 +1001,7 @@ public class DagUtils {
// authoritative one), don't wait infinitely for the notifier, just wait a little bit
// and check HDFS before and after.
if (notifierOld != null
- && checkOrWaitForTheFile(src, dest, conf, notifierOld, 1, 150, false)) {
+ && checkOrWaitForTheFile(srcFs, src, dest, conf, notifierOld, 1, 150, false)) {
return createLocalResource(destFS, dest, type, LocalResourceVisibility.PRIVATE);
}
try {
@@ -1017,7 +1018,7 @@ public class DagUtils {
conf, HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL, TimeUnit.MILLISECONDS);
// Only log on the first wait, and check after wait on the last iteration.
if (!checkOrWaitForTheFile(
- src, dest, conf, notifierOld, waitAttempts, sleepInterval, true)) {
+ srcFs, src, dest, conf, notifierOld, waitAttempts, sleepInterval, true)) {
LOG.error("Could not find the jar that was being uploaded");
throw new IOException("Previous writer likely failed to write " + dest +
". Failing because I am unlikely to write too.");
@@ -1032,10 +1033,10 @@ public class DagUtils {
LocalResourceVisibility.PRIVATE);
}
- public boolean checkOrWaitForTheFile(Path src, Path dest, Configuration conf, Object notifier,
- int waitAttempts, long sleepInterval, boolean doLog) throws IOException {
+ public boolean checkOrWaitForTheFile(FileSystem srcFs, Path src, Path dest, Configuration conf,
+ Object notifier, int waitAttempts, long sleepInterval, boolean doLog) throws IOException {
for (int i = 0; i < waitAttempts; i++) {
- if (checkPreExisting(src, dest, conf)) return true;
+ if (checkPreExisting(srcFs, src, dest, conf)) return true;
if (doLog && i == 0) {
LOG.info("Waiting for the file " + dest + " (" + waitAttempts + " attempts, with "
+ sleepInterval + "ms interval)");
@@ -1054,7 +1055,7 @@ public class DagUtils {
throw new IOException(interruptedException);
}
}
- return checkPreExisting(src, dest, conf); // One last check.
+ return checkPreExisting(srcFs, src, dest, conf); // One last check.
}
/**
[4/4] hive git commit: HIVE-16461 : DagUtils checks local resource
size on the remote fs (Sergey Shelukhin, reviewed by Siddharth Seth)
Posted by se...@apache.org.
HIVE-16461 : DagUtils checks local resource size on the remote fs (Sergey Shelukhin, reviewed by Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/228629df
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/228629df
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/228629df
Branch: refs/heads/branch-2.3
Commit: 228629df74afcf77485ffa2ca1e2594fc2e86894
Parents: 1d5374a
Author: sergey <se...@apache.org>
Authored: Wed Apr 19 19:31:53 2017 -0700
Committer: sergey <se...@apache.org>
Committed: Wed Apr 19 19:32:17 2017 -0700
----------------------------------------------------------------------
.../apache/hadoop/hive/ql/exec/tez/DagUtils.java | 19 ++++++++++---------
1 file changed, 10 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/228629df/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index aa2dfc7..568df14 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -965,10 +965,9 @@ public class DagUtils {
* @return true if the file names match else returns false.
* @throws IOException when any file system related call fails
*/
- private boolean checkPreExisting(Path src, Path dest, Configuration conf)
+ private boolean checkPreExisting(FileSystem sourceFS, Path src, Path dest, Configuration conf)
throws IOException {
FileSystem destFS = dest.getFileSystem(conf);
- FileSystem sourceFS = src.getFileSystem(conf);
FileStatus destStatus = FileUtils.getFileStatusOrNull(destFS, dest);
if (destStatus != null) {
return (sourceFS.getFileStatus(src).getLen() == destStatus.getLen());
@@ -988,7 +987,9 @@ public class DagUtils {
public LocalResource localizeResource(
Path src, Path dest, LocalResourceType type, Configuration conf) throws IOException {
FileSystem destFS = dest.getFileSystem(conf);
- if (src != null && !checkPreExisting(src, dest, conf)) {
+ // We call copyFromLocal below, so we basically assume src is a local file.
+ FileSystem srcFs = FileSystem.getLocal(conf);
+ if (src != null && !checkPreExisting(srcFs, src, dest, conf)) {
// copy the src to the destination and create local resource.
// do not overwrite.
String srcStr = src.toString();
@@ -1000,7 +1001,7 @@ public class DagUtils {
// authoritative one), don't wait infinitely for the notifier, just wait a little bit
// and check HDFS before and after.
if (notifierOld != null
- && checkOrWaitForTheFile(src, dest, conf, notifierOld, 1, 150, false)) {
+ && checkOrWaitForTheFile(srcFs, src, dest, conf, notifierOld, 1, 150, false)) {
return createLocalResource(destFS, dest, type, LocalResourceVisibility.PRIVATE);
}
try {
@@ -1022,7 +1023,7 @@ public class DagUtils {
conf, HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL, TimeUnit.MILLISECONDS);
// Only log on the first wait, and check after wait on the last iteration.
if (!checkOrWaitForTheFile(
- src, dest, conf, notifierOld, waitAttempts, sleepInterval, true)) {
+ srcFs, src, dest, conf, notifierOld, waitAttempts, sleepInterval, true)) {
LOG.error("Could not find the jar that was being uploaded");
throw new IOException("Previous writer likely failed to write " + dest +
". Failing because I am unlikely to write too.");
@@ -1037,10 +1038,10 @@ public class DagUtils {
LocalResourceVisibility.PRIVATE);
}
- public boolean checkOrWaitForTheFile(Path src, Path dest, Configuration conf, Object notifier,
- int waitAttempts, long sleepInterval, boolean doLog) throws IOException {
+ public boolean checkOrWaitForTheFile(FileSystem srcFs, Path src, Path dest, Configuration conf,
+ Object notifier, int waitAttempts, long sleepInterval, boolean doLog) throws IOException {
for (int i = 0; i < waitAttempts; i++) {
- if (checkPreExisting(src, dest, conf)) return true;
+ if (checkPreExisting(srcFs, src, dest, conf)) return true;
if (doLog && i == 0) {
LOG.info("Waiting for the file " + dest + " (" + waitAttempts + " attempts, with "
+ sleepInterval + "ms interval)");
@@ -1059,7 +1060,7 @@ public class DagUtils {
throw new IOException(interruptedException);
}
}
- return checkPreExisting(src, dest, conf); // One last check.
+ return checkPreExisting(srcFs, src, dest, conf); // One last check.
}
/**