You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/05/20 01:16:37 UTC

[2/3] hive git commit: HIVE-16703 : Hive may add the same file to the session and vertex in Tez (Sergey Shelukhin, reviewed by Siddharth Seth)

HIVE-16703 : Hive may add the same file to the session and vertex in Tez (Sergey Shelukhin, reviewed by Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8d951fa4
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8d951fa4
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8d951fa4

Branch: refs/heads/master
Commit: 8d951fa4e0665942c4c1cb44a7914f70b0604f2d
Parents: 5d62dc8
Author: sergey <se...@apache.org>
Authored: Fri May 19 17:24:27 2017 -0700
Committer: sergey <se...@apache.org>
Committed: Fri May 19 17:24:27 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/tez/DagUtils.java       | 29 ++++++++++++++++----
 .../hive/ql/exec/tez/TezSessionState.java       |  5 ++--
 .../apache/hadoop/hive/ql/exec/tez/TezTask.java | 14 +++++++---
 .../hadoop/hive/ql/exec/tez/TestTezTask.java    | 10 +++----
 4 files changed, 41 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/8d951fa4/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index b0457be..b6e55c0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -31,6 +31,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -847,12 +848,15 @@ public class DagUtils {
       String hdfsDirPathStr, Configuration conf) throws IOException, LoginException {
     List<LocalResource> tmpResources = new ArrayList<LocalResource>();
 
-    addTempResources(conf, tmpResources, hdfsDirPathStr, LocalResourceType.FILE, getTempFilesFromConf(conf));
-    addTempResources(conf, tmpResources, hdfsDirPathStr, LocalResourceType.ARCHIVE, getTempArchivesFromConf(conf));
+    addTempResources(conf, tmpResources, hdfsDirPathStr, LocalResourceType.FILE,
+        getTempFilesFromConf(conf), null);
+    addTempResources(conf, tmpResources, hdfsDirPathStr, LocalResourceType.ARCHIVE,
+        getTempArchivesFromConf(conf), null);
     return tmpResources;
   }
 
-  private static String[] getTempFilesFromConf(Configuration conf) {
+  public static String[] getTempFilesFromConf(Configuration conf) {
+    if (conf == null) return new String[0]; // In tests.
     String addedFiles = Utilities.getResourceFiles(conf, SessionState.ResourceType.FILE);
     if (StringUtils.isNotBlank(addedFiles)) {
       HiveConf.setVar(conf, ConfVars.HIVEADDEDFILES, addedFiles);
@@ -888,21 +892,34 @@ public class DagUtils {
    * @throws LoginException when getDefaultDestDir fails with the same exception
    */
   public List<LocalResource> localizeTempFiles(String hdfsDirPathStr, Configuration conf,
-      String[] inputOutputJars) throws IOException, LoginException {
+      String[] inputOutputJars, String[] skipJars) throws IOException, LoginException {
     if (inputOutputJars == null) return null;
     List<LocalResource> tmpResources = new ArrayList<LocalResource>();
-    addTempResources(conf, tmpResources, hdfsDirPathStr, LocalResourceType.FILE, inputOutputJars);
+    addTempResources(conf, tmpResources, hdfsDirPathStr,
+        LocalResourceType.FILE, inputOutputJars, skipJars);
     return tmpResources;
   }
 
   private void addTempResources(Configuration conf,
       List<LocalResource> tmpResources, String hdfsDirPathStr,
       LocalResourceType type,
-      String[] files) throws IOException {
+      String[] files, String[] skipFiles) throws IOException {
+    HashSet<Path> skipFileSet = null;
+    if (skipFiles != null) {
+      skipFileSet = new HashSet<>();
+      for (String skipFile : skipFiles) {
+        if (StringUtils.isBlank(skipFile)) continue;
+        skipFileSet.add(new Path(skipFile));
+      }
+    }
     for (String file : files) {
       if (!StringUtils.isNotBlank(file)) {
         continue;
       }
+      if (skipFileSet != null && skipFileSet.contains(new Path(file))) {
+        LOG.info("Skipping vertex resource " + file + " that already exists in the session");
+        continue;
+      }
       Path hdfsFilePath = new Path(hdfsDirPathStr, getResourceBaseName(new Path(file)));
       LocalResource localResource = localizeResource(new Path(file),
           hdfsFilePath, type, conf);

http://git-wip-us.apache.org/repos/asf/hive/blob/8d951fa4/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index 036e918..fe5c6a1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -476,9 +476,10 @@ public class TezSessionState {
       localizedResources.addAll(lrs);
     }
 
-    // these are local resources that are set through the mr "tmpjars" property
+    // these are local resources that are set through the mr "tmpjars" property; skip session files.
     List<LocalResource> handlerLr = utils.localizeTempFiles(dir, conf,
-      additionalFilesNotFromConf.toArray(new String[additionalFilesNotFromConf.size()]));
+      additionalFilesNotFromConf.toArray(new String[additionalFilesNotFromConf.size()]),
+      DagUtils.getTempFilesFromConf(conf));
 
     if (handlerLr != null) {
       localizedResources.addAll(handlerLr);

http://git-wip-us.apache.org/repos/asf/hive/blob/8d951fa4/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index 1c84c6a..3356dc9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -123,6 +123,7 @@ public class TezTask extends Task<TezWork> {
     return counters;
   }
 
+
   @Override
   public int execute(DriverContext driverContext) {
     int rc = 1;
@@ -161,8 +162,12 @@ public class TezTask extends Task<TezWork> {
         // create the tez tmp dir
         scratchDir = utils.createTezDir(scratchDir, conf);
 
+        // This is used to compare global and vertex resources. Global resources are originally
+        // derived from session conf via localizeTempFilesFromConf. So, use that here.
+        Configuration sessionConf =
+            (session != null && session.getConf() != null) ? session.getConf() : conf;
         Map<String,LocalResource> inputOutputLocalResources =
-            getExtraLocalResources(jobConf, scratchDir, inputOutputJars);
+            getExtraLocalResources(jobConf, scratchDir, inputOutputJars, sessionConf);
 
         // Ensure the session is open and has the necessary local resources
         updateSession(session, jobConf, scratchDir, inputOutputJars, inputOutputLocalResources);
@@ -273,10 +278,11 @@ public class TezTask extends Task<TezWork> {
    * Converted the list of jars into local resources
    */
   Map<String,LocalResource> getExtraLocalResources(JobConf jobConf, Path scratchDir,
-      String[] inputOutputJars) throws Exception {
+      String[] inputOutputJars, Configuration sessionConf) throws Exception {
     final Map<String,LocalResource> resources = new HashMap<String,LocalResource>();
-    final List<LocalResource> localResources = utils.localizeTempFiles(
-        scratchDir.toString(), jobConf, inputOutputJars);
+    // Skip the files already in session local resources...
+    final List<LocalResource> localResources = utils.localizeTempFiles(scratchDir.toString(),
+        jobConf, inputOutputJars, DagUtils.getTempFilesFromConf(sessionConf));
     if (null != localResources) {
       for (LocalResource lr : localResources) {
         resources.put(utils.getBaseName(lr), lr);

http://git-wip-us.apache.org/repos/asf/hive/blob/8d951fa4/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
index 2b52056..70fedb7 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
@@ -245,7 +245,7 @@ public class TestTezTask {
     final Map<String,LocalResource> resMap = new HashMap<String,LocalResource>();
     resMap.put("foo.jar", res);
 
-    when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars))
+    when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars, null))
         .thenReturn(resources);
     when(utils.getBaseName(res)).thenReturn("foo.jar");
     when(sessionState.isOpen()).thenReturn(true);
@@ -264,7 +264,7 @@ public class TestTezTask {
     resMap.put("foo.jar", res);
     DAG dag = mock(DAG.class);
 
-    when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars))
+    when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars, null))
         .thenReturn(resources);
     when(utils.getBaseName(res)).thenReturn("foo.jar");
     when(sessionState.isOpen()).thenReturn(true);
@@ -282,11 +282,11 @@ public class TestTezTask {
     final Map<String,LocalResource> resMap = new HashMap<String,LocalResource>();
     resMap.put("foo.jar", res);
 
-    when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars))
-        .thenReturn(resources);
+    when(utils.localizeTempFiles(eq(path.toString()), eq(conf), eq(inputOutputJars),
+        Mockito.<String[]>any())).thenReturn(resources);
     when(utils.getBaseName(res)).thenReturn("foo.jar");
 
-    assertEquals(resMap, task.getExtraLocalResources(conf, path, inputOutputJars));
+    assertEquals(resMap, task.getExtraLocalResources(conf, path, inputOutputJars, null));
   }
 
   @Test