You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/06/16 20:28:21 UTC

svn commit: r1602951 - in /hive/trunk/ql/src: java/org/apache/hadoop/hive/ql/exec/tez/ java/org/apache/hadoop/hive/ql/session/ test/org/apache/hadoop/hive/ql/exec/tez/

Author: gunther
Date: Mon Jun 16 18:28:20 2014
New Revision: 1602951

URL: http://svn.apache.org/r1602951
Log:
HIVE-7212: Use resource re-localization instead of restarting sessions in Tez (Gunther Hagleitner, reviewed by Vikram Dixit, Gopal V, Sid Seth)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1602951&r1=1602950&r2=1602951&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Mon Jun 16 18:28:20 2014
@@ -845,7 +845,7 @@ public class DagUtils {
     }
 
     return createLocalResource(destFS, dest, LocalResourceType.FILE,
-        LocalResourceVisibility.APPLICATION);
+        LocalResourceVisibility.PRIVATE);
   }
 
   /**

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java?rev=1602951&r1=1602950&r2=1602951&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java Mon Jun 16 18:28:20 2014
@@ -28,6 +28,7 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 
 import javax.security.auth.login.LoginException;
@@ -76,8 +77,8 @@ public class TezSessionState {
   private boolean defaultQueue = false;
   private String user;
 
-  private HashSet<String> additionalFilesNotFromConf = null;
-  private List<LocalResource> localizedResources;
+  private final Set<String> additionalFilesNotFromConf = new HashSet<String>();
+  private final Set<LocalResource> localizedResources = new HashSet<LocalResource>();
 
   private static List<TezSessionState> openSessions
     = Collections.synchronizedList(new LinkedList<TezSessionState>());
@@ -143,22 +144,15 @@ public class TezSessionState {
     // create the tez tmp dir
     tezScratchDir = createTezDir(sessionId);
 
-    String dir = tezScratchDir.toString();
-    // Localize resources to session scratch dir
-    localizedResources = utils.localizeTempFilesFromConf(dir, conf);
-    List<LocalResource> handlerLr = utils.localizeTempFiles(dir, conf, additionalFiles);
-    if (handlerLr != null) {
-      if (localizedResources == null) {
-        localizedResources = handlerLr;
-      } else {
-        localizedResources.addAll(handlerLr);
-      }
-      additionalFilesNotFromConf = new HashSet<String>();
+    additionalFilesNotFromConf.clear();
+    if (additionalFiles != null) {
       for (String originalFile : additionalFiles) {
         additionalFilesNotFromConf.add(originalFile);
       }
     }
 
+    refreshLocalResourcesFromConf(conf);
+
     // generate basic tez config
     TezConfiguration tezConfig = new TezConfiguration(conf);
 
@@ -171,10 +165,8 @@ public class TezSessionState {
     // configuration for the application master
     Map<String, LocalResource> commonLocalResources = new HashMap<String, LocalResource>();
     commonLocalResources.put(utils.getBaseName(appJarLr), appJarLr);
-    if (localizedResources != null) {
-      for (LocalResource lr : localizedResources) {
-        commonLocalResources.put(utils.getBaseName(lr), lr);
-      }
+    for (LocalResource lr : localizedResources) {
+      commonLocalResources.put(utils.getBaseName(lr), lr);
     }
 
     // Create environment for AM.
@@ -216,9 +208,31 @@ public class TezSessionState {
     openSessions.add(this);
   }
 
+  public void refreshLocalResourcesFromConf(HiveConf conf)
+    throws IOException, LoginException, IllegalArgumentException, URISyntaxException, TezException {
+
+    String dir = tezScratchDir.toString();
+
+    localizedResources.clear();
+
+    // these are local resources set through add file, jar, etc
+    List<LocalResource> lrs = utils.localizeTempFilesFromConf(dir, conf);
+    if (lrs != null) {
+      localizedResources.addAll(lrs);
+    }
+
+    // these are local resources that are set through the mr "tmpjars" property
+    List<LocalResource> handlerLr = utils.localizeTempFiles(dir, conf,
+      additionalFilesNotFromConf.toArray(new String[additionalFilesNotFromConf.size()]));
+
+    if (handlerLr != null) {
+      localizedResources.addAll(handlerLr);
+    }
+  }
+
   public boolean hasResources(String[] localAmResources) {
     if (localAmResources == null || localAmResources.length == 0) return true;
-    if (additionalFilesNotFromConf == null || additionalFilesNotFromConf.isEmpty()) return false;
+    if (additionalFilesNotFromConf.isEmpty()) return false;
     for (String s : localAmResources) {
       if (!additionalFilesNotFromConf.contains(s)) return false;
     }
@@ -252,8 +266,8 @@ public class TezSessionState {
     tezScratchDir = null;
     conf = null;
     appJarLr = null;
-    additionalFilesNotFromConf = null;
-    localizedResources = null;
+    additionalFilesNotFromConf.clear();
+    localizedResources.clear();
   }
 
   public void cleanupScratchDir () throws IOException {
@@ -369,7 +383,7 @@ public class TezSessionState {
   }
 
   public List<LocalResource> getLocalizedResources() {
-    return localizedResources;
+    return new ArrayList<LocalResource>(localizedResources);
   }
 
   public String getUser() {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1602951&r1=1602950&r2=1602951&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Mon Jun 16 18:28:20 2014
@@ -120,33 +120,28 @@ public class TezTask extends Task<TezWor
 
       // create the tez tmp dir
       scratchDir = utils.createTezDir(scratchDir, conf);
-      boolean hasResources = session.hasResources(inputOutputJars);
-
-      if (ss.hasAddedResource()) {
-        // need to re-launch session because of new added jars.
-        hasResources = false;
-        // reset the added resource flag for this session since we would
-        // relocalize (either by restarting or relocalizing) due to the above
-        // hasResources flag.
-        ss.setAddedResource(false);
-      }
-
-      // If we have any jars from input format, we need to restart the session because
-      // AM will need them; so, AM has to be restarted. What a mess...
-      if (!hasResources && session.isOpen()) {
-        LOG.info("Tez session being reopened to pass custom jars to AM");
-        TezSessionPoolManager.getInstance().close(session);
-        session = TezSessionPoolManager.getInstance().getSession(null, conf, false);
-        ss.setTezSession(session);
-      }
 
       if (!session.isOpen()) {
         // can happen if the user sets the tez flag after the session was
         // established
         LOG.info("Tez session hasn't been created yet. Opening session");
         session.open(conf, inputOutputJars);
+      } else {
+        session.refreshLocalResourcesFromConf(conf);
       }
+
       List<LocalResource> additionalLr = session.getLocalizedResources();
+      
+      // log which resources we're adding (apart from the hive exec)
+      if (LOG.isDebugEnabled()) {
+        if (additionalLr == null || additionalLr.size() == 0) {
+          LOG.debug("No local resources to process (other than hive-exec)");
+        } else {
+          for (LocalResource lr: additionalLr) {
+            LOG.debug("Adding local resource: " + lr.getResource());
+          }
+        }
+      }
 
       // unless already installed on all the cluster nodes, we'll have to
       // localize hive-exec.jar as well.
@@ -156,7 +151,7 @@ public class TezTask extends Task<TezWor
       DAG dag = build(jobConf, work, scratchDir, appJarLr, additionalLr, ctx);
 
       // submit will send the job to the cluster and start executing
-      client = submit(jobConf, dag, scratchDir, appJarLr, session);
+      client = submit(jobConf, dag, scratchDir, appJarLr, session, additionalLr);
 
       // finally monitor will print progress until the job is done
       TezJobMonitor monitor = new TezJobMonitor();
@@ -290,15 +285,23 @@ public class TezTask extends Task<TezWor
   }
 
   DAGClient submit(JobConf conf, DAG dag, Path scratchDir,
-      LocalResource appJarLr, TezSessionState sessionState)
+      LocalResource appJarLr, TezSessionState sessionState,
+      List<LocalResource> additionalLr)
       throws Exception {
 
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG);
     DAGClient dagClient = null;
 
+    Map<String, LocalResource> resourceMap = new HashMap<String, LocalResource>();
+    if (additionalLr != null) {
+      for (LocalResource lr: additionalLr) {
+        resourceMap.put(utils.getBaseName(lr), lr);
+      }
+    }
+
     try {
       // ready to start execution on the cluster
-      dagClient = sessionState.getSession().submitDAG(dag);
+      dagClient = sessionState.getSession().submitDAG(dag, resourceMap);
     } catch (SessionNotRunning nr) {
       console.printInfo("Tez session was closed. Reopening...");
 
@@ -306,7 +309,7 @@ public class TezTask extends Task<TezWor
       TezSessionPoolManager.getInstance().closeAndOpen(sessionState, this.conf);
       console.printInfo("Session re-established.");
 
-      dagClient = sessionState.getSession().submitDAG(dag);
+      dagClient = sessionState.getSession().submitDAG(dag, resourceMap);
     }
 
     perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java?rev=1602951&r1=1602950&r2=1602951&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java Mon Jun 16 18:28:20 2014
@@ -164,8 +164,6 @@ public class SessionState {
   private final String CONFIG_AUTHZ_SETTINGS_APPLIED_MARKER =
       "hive.internal.ss.authz.settings.applied.marker";
 
-  private boolean addedResource;
-
   /**
    * Lineage state.
    */
@@ -1004,12 +1002,4 @@ public class SessionState {
     conf.set(CONFIG_AUTHZ_SETTINGS_APPLIED_MARKER, Boolean.TRUE.toString());
 
   }
-
-  public boolean hasAddedResource() {
-    return addedResource;
-  }
-
-  public void setAddedResource(boolean addedResouce) {
-    this.addedResource = addedResouce;
-  }
 }

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java?rev=1602951&r1=1602950&r2=1602951&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java Mon Jun 16 18:28:20 2014
@@ -31,6 +31,7 @@ import static org.mockito.Mockito.when;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
@@ -200,7 +201,7 @@ public class TestTezTask {
   @Test
   public void testSubmit() throws Exception {
     DAG dag = new DAG("test");
-    task.submit(conf, dag, path, appLr, sessionState);
+    task.submit(conf, dag, path, appLr, sessionState, new LinkedList());
     // validate close/reopen
     verify(sessionState, times(1)).open(any(HiveConf.class));
     verify(sessionState, times(1)).close(eq(false));  // now uses pool after HIVE-7043