You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2014/04/09 04:12:02 UTC

svn commit: r1585878 - in /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez: DagUtils.java TezSessionState.java TezTask.java

Author: sershe
Date: Wed Apr  9 02:12:01 2014
New Revision: 1585878

URL: http://svn.apache.org/r1585878
Log:
HIVE-6825 : custom jars for Hive query should be uploaded to scratch dir per query; and/or versioned (Sergey Shelukhin, reviewed by Vikram Dixit K)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1585878&r1=1585877&r2=1585878&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Wed Apr  9 02:12:01 2014
@@ -603,16 +603,7 @@ public class DagUtils {
     Map<String, LocalResource> combinedResources = new HashMap<String, LocalResource>();
 
     combinedResources.putAll(sessionConfig.getSessionResources());
-
-    try {
-      for(LocalResource lr : localizeTempFilesFromConf(getHiveJarDirectory(conf), conf)) {
-        combinedResources.put(getBaseName(lr), lr);
-      }
-    } catch(LoginException le) {
-      throw new IOException(le);
-    }
-
-    if(localResources != null) {
+    if (localResources != null) {
       combinedResources.putAll(localResources);
     }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java?rev=1585878&r1=1585877&r2=1585878&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java Wed Apr  9 02:12:01 2014
@@ -21,6 +21,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -75,7 +76,8 @@ public class TezSessionState {
   private boolean defaultQueue = false;
   private String user;
 
-  private HashSet<String> additionalAmFiles = null;
+  private HashSet<String> additionalFilesNotFromConf = null;
+  private List<LocalResource> localizedResources;
 
   private static List<TezSessionState> openSessions
     = Collections.synchronizedList(new LinkedList<TezSessionState>());
@@ -129,7 +131,7 @@ public class TezSessionState {
    * @throws LoginException
    * @throws TezException
    */
-  public void open(HiveConf conf, List<LocalResource> additionalLr)
+  public void open(HiveConf conf, String[] additionalFiles)
     throws IOException, LoginException, IllegalArgumentException, URISyntaxException, TezException {
     this.conf = conf;
 
@@ -141,6 +143,22 @@ public class TezSessionState {
     // create the tez tmp dir
     tezScratchDir = createTezDir(sessionId);
 
+    String dir = tezScratchDir.toString();
+    // Localize resources to session scratch dir
+    localizedResources = utils.localizeTempFilesFromConf(dir, conf);
+    List<LocalResource> handlerLr = utils.localizeTempFiles(dir, conf, additionalFiles);
+    if (handlerLr != null) {
+      if (localizedResources == null) {
+        localizedResources = handlerLr;
+      } else {
+        localizedResources.addAll(handlerLr);
+      }
+      additionalFilesNotFromConf = new HashSet<String>();
+      for (String originalFile : additionalFiles) {
+        additionalFilesNotFromConf.add(originalFile);
+      }
+    }
+
     // generate basic tez config
     TezConfiguration tezConfig = new TezConfiguration(conf);
 
@@ -153,12 +171,9 @@ public class TezSessionState {
     // configuration for the application master
     Map<String, LocalResource> commonLocalResources = new HashMap<String, LocalResource>();
     commonLocalResources.put(utils.getBaseName(appJarLr), appJarLr);
-    if (additionalLr != null) {
-      additionalAmFiles = new HashSet<String>();
-      for (LocalResource lr : additionalLr) {
-        String baseName = utils.getBaseName(lr);
-        additionalAmFiles.add(baseName);
-        commonLocalResources.put(baseName, lr);
+    if (localizedResources != null) {
+      for (LocalResource lr : localizedResources) {
+        commonLocalResources.put(utils.getBaseName(lr), lr);
       }
     }
 
@@ -172,9 +187,10 @@ public class TezSessionState {
     TezSessionConfiguration sessionConfig = new TezSessionConfiguration(amConfig, tezConfig);
 
     // and finally we're ready to create and start the session
-    session = new TezSession("HIVE-"+sessionId, sessionConfig);
+    session = new TezSession("HIVE-" + sessionId, sessionConfig);
 
-    LOG.info("Opening new Tez Session (id: "+sessionId+", scratch dir: "+tezScratchDir+")");
+    LOG.info("Opening new Tez Session (id: " + sessionId
+        + ", scratch dir: " + tezScratchDir + ")");
 
     session.start();
 
@@ -200,11 +216,11 @@ public class TezSessionState {
     openSessions.add(this);
   }
 
-  public boolean hasResources(List<LocalResource> lrs) {
-    if (lrs == null || lrs.isEmpty()) return true;
-    if (additionalAmFiles == null || additionalAmFiles.isEmpty()) return false;
-    for (LocalResource lr : lrs) {
-      if (!additionalAmFiles.contains(utils.getBaseName(lr))) return false;
+  public boolean hasResources(String[] localAmResources) {
+    if (localAmResources == null || localAmResources.length == 0) return true;
+    if (additionalFilesNotFromConf == null || additionalFilesNotFromConf.isEmpty()) return false;
+    for (String s : localAmResources) {
+      if (!additionalFilesNotFromConf.contains(s)) return false;
     }
     return true;
   }
@@ -236,7 +252,8 @@ public class TezSessionState {
     tezScratchDir = null;
     conf = null;
     appJarLr = null;
-    additionalAmFiles = null;
+    additionalFilesNotFromConf = null;
+    localizedResources = null;
   }
 
   public void cleanupScratchDir () throws IOException {
@@ -392,6 +409,10 @@ public class TezSessionState {
     return conf;
   }
 
+  public List<LocalResource> getLocalizedResources() {
+    return localizedResources;
+  }
+
   public String getUser() {
     return user;
   }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1585878&r1=1585877&r2=1585878&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Wed Apr  9 02:12:01 2014
@@ -130,31 +130,22 @@ public class TezTask extends Task<TezWor
       // create the tez tmp dir
       scratchDir = utils.createTezDir(scratchDir, conf);
 
-      // we need to get the user specified local resources for this dag
-      String hiveJarDir = utils.getHiveJarDirectory(conf);
-      List<LocalResource> additionalLr = utils.localizeTempFilesFromConf(hiveJarDir, conf);
-      List<LocalResource> handlerLr = utils.localizeTempFiles(hiveJarDir, conf, inputOutputJars);
-      if (handlerLr != null) {
-        additionalLr.addAll(handlerLr);
-      }
-
       // If we have any jars from input format, we need to restart the session because
       // AM will need them; so, AM has to be restarted. What a mess...
-      if (!session.hasResources(handlerLr)) {
-        if (session.isOpen()) {
-          LOG.info("Tez session being reopened to pass custom jars to AM");
-          session.close(false);
-          session = TezSessionPoolManager.getInstance().getSession(null, conf, false);
-          ss.setTezSession(session);
-        }
-        session.open(conf, additionalLr);
+      if (!session.hasResources(inputOutputJars) && session.isOpen()) {
+        LOG.info("Tez session being reopened to pass custom jars to AM");
+        session.close(false);
+        session = TezSessionPoolManager.getInstance().getSession(null, conf, false);
+        ss.setTezSession(session);
       }
+
       if (!session.isOpen()) {
         // can happen if the user sets the tez flag after the session was
         // established
         LOG.info("Tez session hasn't been created yet. Opening session");
-        session.open(conf);
+        session.open(conf, inputOutputJars);
       }
+      List<LocalResource> additionalLr = session.getLocalizedResources();
 
       // unless already installed on all the cluster nodes, we'll have to
       // localize hive-exec.jar as well.