You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/10/07 22:54:35 UTC

hive git commit: HIVE-11969 : start Tez session in background when starting CLI (Sergey Shelukhin, reviewed by Gopal V)

Repository: hive
Updated Branches:
  refs/heads/master 556877c24 -> 7f9023ea0


HIVE-11969 : start Tez session in background when starting CLI (Sergey Shelukhin, reviewed by Gopal V)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7f9023ea
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7f9023ea
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7f9023ea

Branch: refs/heads/master
Commit: 7f9023ea0323821626f17e30d04b5acffb1d3048
Parents: 556877c
Author: Sergey Shelukhin <se...@apache.org>
Authored: Wed Oct 7 13:55:36 2015 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Wed Oct 7 13:55:36 2015 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/cli/CliDriver.java   |   9 +-
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   3 +
 .../hive/ql/exec/tez/TezSessionState.java       | 204 +++++++++++++++----
 .../apache/hadoop/hive/ql/exec/tez/TezTask.java |   6 +-
 .../hadoop/hive/ql/session/SessionState.java    |  52 +++--
 .../hadoop/hive/ql/exec/tez/TestTezTask.java    |   2 +
 6 files changed, 222 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/7f9023ea/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java
----------------------------------------------------------------------
diff --git a/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java b/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java
index 4b52578..3a80f99 100644
--- a/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java
+++ b/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveVariableSource;
 import org.apache.hadoop.hive.conf.Validator;
 import org.apache.hadoop.hive.conf.VariableSubstitution;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.ql.CommandNeedRetryException;
 import org.apache.hadoop.hive.ql.Driver;
@@ -690,7 +691,13 @@ public class CliDriver {
     }).substitute(conf, prompt);
     prompt2 = spacesForString(prompt);
 
-    SessionState.start(ss);
+    if (HiveConf.getBoolVar(conf, ConfVars.HIVE_CLI_TEZ_SESSION_ASYNC)) {
+      // Start the session in a fire-and-forget manner. When the asynchronously initialized parts of
+      // the session are needed, the corresponding getters and other methods will wait as needed.
+      SessionState.beginStart(ss, console);
+    } else {
+      SessionState.start(ss);
+    }
 
     // execute cli driver work
     try {

http://git-wip-us.apache.org/repos/asf/hive/blob/7f9023ea/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 54a529e..bf48f69 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1712,6 +1712,9 @@ public class HiveConf extends Configuration {
 
     HIVE_CLI_PRINT_HEADER("hive.cli.print.header", false, "Whether to print the names of the columns in query output."),
 
+    HIVE_CLI_TEZ_SESSION_ASYNC("hive.cli.tez.session.async", true, "Whether to start Tez\n" +
+        "session in background when running CLI with Tez, allowing CLI to be available earlier."),
+
     HIVE_ERROR_ON_EMPTY_PARTITION("hive.error.on.empty.partition", false,
         "Whether to throw an exception if dynamic partition insert generates empty results."),
 

http://git-wip-us.apache.org/repos/asf/hive/blob/7f9023ea/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index 568ebbe..6ed6421 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -23,14 +23,19 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import javax.security.auth.login.LoginException;
 
@@ -46,7 +51,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -70,6 +75,9 @@ public class TezSessionState {
   private Path tezScratchDir;
   private LocalResource appJarLr;
   private TezClient session;
+  private Future<TezClient> sessionFuture;
+  /** Console used for user feedback during async session opening. */
+  private LogHelper console;
   private String sessionId;
   private final DagUtils utils;
   private String queueName;
@@ -97,13 +105,40 @@ public class TezSessionState {
     this.sessionId = sessionId;
   }
 
-  /**
-   * Returns whether a session has been established
-   */
+  public boolean isOpening() {
+    if (session != null || sessionFuture == null) return false;
+    try {
+      session = sessionFuture.get(0, TimeUnit.NANOSECONDS);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      return false;
+    } catch (ExecutionException e) {
+      throw new RuntimeException(e);
+    } catch (CancellationException e) {
+      return false;
+    } catch (TimeoutException e) {
+      return true;
+    }
+    return false;
+  }
+
   public boolean isOpen() {
-    return session != null;
+    if (session != null) return true;
+    if (sessionFuture == null) return false;
+    try {
+      session = sessionFuture.get(0, TimeUnit.NANOSECONDS);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      return false;
+    } catch (ExecutionException e) {
+      throw new RuntimeException(e);
+    } catch (TimeoutException | CancellationException e) {
+      return false;
+    }
+    return true;
   }
 
+
   /**
    * Get all open sessions. Only used to clean up at shutdown.
    * @return List<TezSessionState>
@@ -124,9 +159,21 @@ public class TezSessionState {
    * @throws URISyntaxException
    * @throws LoginException
    * @throws TezException
+   * @throws InterruptedException
    */
   public void open(HiveConf conf, String[] additionalFiles)
     throws IOException, LoginException, IllegalArgumentException, URISyntaxException, TezException {
+    openInternal(conf, additionalFiles, false, null);
+  }
+
+  public void beginOpen(HiveConf conf, String[] additionalFiles, LogHelper console)
+    throws IOException, LoginException, IllegalArgumentException, URISyntaxException, TezException {
+    openInternal(conf, additionalFiles, true, console);
+  }
+
+  private void openInternal(
+      final HiveConf conf, String[] additionalFiles, boolean isAsync, LogHelper console)
+          throws IOException, LoginException, IllegalArgumentException, URISyntaxException, TezException {
     this.conf = conf;
     this.queueName = conf.get("tez.queue.name");
     this.doAsEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS);
@@ -152,7 +199,7 @@ public class TezSessionState {
     appJarLr = createJarLocalResource(utils.getExecJarPathLocal());
 
     // configuration for the application master
-    Map<String, LocalResource> commonLocalResources = new HashMap<String, LocalResource>();
+    final Map<String, LocalResource> commonLocalResources = new HashMap<String, LocalResource>();
     commonLocalResources.put(utils.getBaseName(appJarLr), appJarLr);
     for (LocalResource lr : localizedResources) {
       commonLocalResources.put(utils.getBaseName(lr), lr);
@@ -164,7 +211,7 @@ public class TezSessionState {
 
     // and finally we're ready to create and start the session
     // generate basic tez config
-    TezConfiguration tezConfig = new TezConfiguration(conf);
+    final TezConfiguration tezConfig = new TezConfiguration(conf);
     tezConfig.set(TezConfiguration.TEZ_AM_STAGING_DIR, tezScratchDir.toUri().toString());
     Utilities.stripHivePasswordDetails(tezConfig);
 
@@ -176,37 +223,85 @@ public class TezSessionState {
       tezConfig.setInt(TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS, n);
     }
 
-    session = TezClient.create("HIVE-" + sessionId, tezConfig, true,
+    final TezClient session = TezClient.create("HIVE-" + sessionId, tezConfig, true,
         commonLocalResources, null);
 
     LOG.info("Opening new Tez Session (id: " + sessionId
         + ", scratch dir: " + tezScratchDir + ")");
 
     TezJobMonitor.initShutdownHook();
-    session.start();
+    if (!isAsync) {
+      startSessionAndContainers(session, conf, commonLocalResources, tezConfig, false);
+      this.session = session;
+    } else {
+      FutureTask<TezClient> sessionFuture = new FutureTask<>(new Callable<TezClient>() {
+        @Override
+        public TezClient call() throws Exception {
+          return startSessionAndContainers(session, conf, commonLocalResources, tezConfig, true);
+        }
+      });
+      new Thread(sessionFuture, "Tez session start thread").start();
+      // We assume here nobody will try to get session before open() returns.
+      this.console = console;
+      this.sessionFuture = sessionFuture;
+    }
+  }
 
-    if (HiveConf.getBoolVar(conf, ConfVars.HIVE_PREWARM_ENABLED)) {
-      int n = HiveConf.getIntVar(conf, ConfVars.HIVE_PREWARM_NUM_CONTAINERS);
-      LOG.info("Prewarming " + n + " containers  (id: " + sessionId
-          + ", scratch dir: " + tezScratchDir + ")");
-      PreWarmVertex prewarmVertex = utils.createPreWarmVertex(tezConfig, n,
-          commonLocalResources);
-      try {
-        session.preWarm(prewarmVertex);
-      } catch (IOException ie) {
-        if (ie.getMessage().contains("Interrupted while waiting")) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Hive Prewarm threw an exception ", ie);
+  private TezClient startSessionAndContainers(TezClient session, HiveConf conf,
+      Map<String, LocalResource> commonLocalResources, TezConfiguration tezConfig,
+      boolean isOnThread) throws TezException, IOException {
+    session.start();
+    boolean isSuccessful = false;
+    try {
+      if (HiveConf.getBoolVar(conf, ConfVars.HIVE_PREWARM_ENABLED)) {
+        int n = HiveConf.getIntVar(conf, ConfVars.HIVE_PREWARM_NUM_CONTAINERS);
+        LOG.info("Prewarming " + n + " containers  (id: " + sessionId
+            + ", scratch dir: " + tezScratchDir + ")");
+        PreWarmVertex prewarmVertex = utils.createPreWarmVertex(
+            tezConfig, n, commonLocalResources);
+        try {
+          session.preWarm(prewarmVertex);
+        } catch (IOException ie) {
+          if (!isOnThread && ie.getMessage().contains("Interrupted while waiting")) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Hive Prewarm threw an exception ", ie);
+            }
+          } else {
+            throw ie;
           }
-        } else {
-          throw ie;
         }
       }
+      try {
+        session.waitTillReady();
+      } catch (InterruptedException ie) {
+        if (isOnThread) throw new IOException(ie);
+        //ignore
+      }
+      isSuccessful = true;
+      return session;
+    } finally {
+      if (isOnThread && !isSuccessful) {
+        closeAndIgnoreExceptions(session);
+      }
     }
+  }
+
+  private static void closeAndIgnoreExceptions(TezClient session) {
+    try {
+      session.stop();
+    } catch (SessionNotRunning nr) {
+      // Ignore.
+    } catch (IOException | TezException ex) {
+      LOG.info("Failed to close Tez session after failure to initialize: " + ex.getMessage());
+    }
+  }
+
+  public void endOpen() throws InterruptedException, CancellationException {
+    if (this.session != null || this.sessionFuture == null) return;
     try {
-      session.waitTillReady();
-    } catch(InterruptedException ie) {
-      //ignore
+      this.session = this.sessionFuture.get();
+    } catch (ExecutionException e) {
+      throw new RuntimeException(e);
     }
   }
 
@@ -250,21 +345,32 @@ public class TezSessionState {
    * @throws Exception
    */
   public void close(boolean keepTmpDir) throws Exception {
-    if (!isOpen()) {
-      return;
-    }
-
-    LOG.info("Closing Tez Session");
-    try {
-      session.stop();
-    } catch (SessionNotRunning nr) {
-      // ignore
+    if (session != null) {
+      LOG.info("Closing Tez Session");
+      closeClient(session);
+    } else if (sessionFuture != null) {
+      sessionFuture.cancel(true);
+      TezClient asyncSession = null;
+      try {
+        asyncSession = sessionFuture.get(); // In case it was done and noone looked at it.
+      } catch (ExecutionException | CancellationException e) {
+        // ignore
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        // ignore
+      }
+      if (asyncSession != null) {
+        LOG.info("Closing Tez Session");
+        closeClient(asyncSession);
+      }
     }
 
     if (!keepTmpDir) {
       cleanupScratchDir();
     }
     session = null;
+    sessionFuture = null;
+    console = null;
     tezScratchDir = null;
     conf = null;
     appJarLr = null;
@@ -272,6 +378,15 @@ public class TezSessionState {
     localizedResources.clear();
   }
 
+  private void closeClient(TezClient client) throws TezException,
+      IOException {
+    try {
+      client.stop();
+    } catch (SessionNotRunning nr) {
+      // ignore
+    }
+  }
+
   public void cleanupScratchDir () throws IOException {
     FileSystem fs = tezScratchDir.getFileSystem(conf);
     fs.delete(tezScratchDir, true);
@@ -283,6 +398,21 @@ public class TezSessionState {
   }
 
   public TezClient getSession() {
+    if (session == null && sessionFuture != null) {
+      if (!sessionFuture.isDone()) {
+        console.printInfo("Waiting for Tez session and AM to be ready...");
+      }
+      try {
+        session = sessionFuture.get();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        return null;
+      } catch (ExecutionException e) {
+        throw new RuntimeException(e);
+      } catch (CancellationException e) {
+        return null;
+      }
+    }
     return session;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/7f9023ea/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index 2d740ed..c62e929 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.tez.client.TezClient;
 import org.apache.tez.common.counters.CounterGroup;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
@@ -251,7 +252,8 @@ public class TezTask extends Task<TezWork> {
     final boolean missingLocalResources = !session
         .hasResources(inputOutputJars);
 
-    if (!session.isOpen()) {
+    TezClient client = session.getSession();
+    if (client == null) {
       // can happen if the user sets the tez flag after the session was
       // established
       LOG.info("Tez session hasn't been created yet. Opening session");
@@ -263,7 +265,7 @@ public class TezTask extends Task<TezWork> {
       if (missingLocalResources) {
         LOG.info("Tez session missing resources," +
             " adding additional necessary resources");
-        session.getSession().addAppMasterLocalFiles(extraResources);
+        client.addAppMasterLocalFiles(extraResources);
       }
 
       session.refreshLocalResourcesFromConf(conf);

http://git-wip-us.apache.org/repos/asf/hive/blob/7f9023ea/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
index dc8c336..56b0fae 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
@@ -38,6 +38,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.io.FileUtils;
@@ -85,7 +86,6 @@ import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Shell;
 
 import com.google.common.base.Preconditions;
@@ -474,6 +474,21 @@ public class SessionState {
    * when switching from one session to another.
    */
   public static SessionState start(SessionState startSs) {
+    start(startSs, false, null);
+    return startSs;
+  }
+
+  public static void beginStart(SessionState startSs, LogHelper console) {
+    start(startSs, true, console);
+  }
+
+  public static void endStart(SessionState startSs)
+      throws CancellationException, InterruptedException {
+    if (startSs.tezSessionState == null) return;
+    startSs.tezSessionState.endOpen();
+  }
+
+  private static void start(SessionState startSs, boolean isAsync, LogHelper console) {
     setCurrentSessionState(startSs);
 
     if (startSs.hiveHist == null){
@@ -521,20 +536,31 @@ public class SessionState {
       throw new RuntimeException(e);
     }
 
-    if (HiveConf.getVar(startSs.getConf(), HiveConf.ConfVars.HIVE_EXECUTION_ENGINE)
-        .equals("tez") && (startSs.isHiveServerQuery == false)) {
-      try {
-        if (startSs.tezSessionState == null) {
-          startSs.tezSessionState = new TezSessionState(startSs.getSessionId());
-        }
-        if (!startSs.tezSessionState.isOpen()) {
-          startSs.tezSessionState.open(startSs.conf); // should use conf on session start-up
+    String engine = HiveConf.getVar(startSs.getConf(), HiveConf.ConfVars.HIVE_EXECUTION_ENGINE);
+    if (!engine.equals("tez") || startSs.isHiveServerQuery) return;
+
+    try {
+      if (startSs.tezSessionState == null) {
+        startSs.tezSessionState = new TezSessionState(startSs.getSessionId());
+      }
+      if (startSs.tezSessionState.isOpen()) {
+        return;
+      }
+      if (startSs.tezSessionState.isOpening()) {
+        if (!isAsync) {
+          startSs.tezSessionState.endOpen();
         }
-      } catch (Exception e) {
-        throw new RuntimeException(e);
+        return;
+      }
+      // Neither open nor opening.
+      if (!isAsync) {
+        startSs.tezSessionState.open(startSs.conf); // should use conf on session start-up
+      } else {
+        startSs.tezSessionState.beginOpen(startSs.conf, null, console);
       }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
     }
-    return startSs;
   }
 
   /**
@@ -1572,8 +1598,6 @@ public class SessionState {
     }
   }
 
-
-
   public TezSessionState getTezSession() {
     return tezSessionState;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/7f9023ea/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
index d004a27..858cca0 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
@@ -236,6 +236,7 @@ public class TestTezTask {
         .thenReturn(resources);
     when(utils.getBaseName(res)).thenReturn("foo.jar");
     when(sessionState.isOpen()).thenReturn(true);
+    when(sessionState.isOpening()).thenReturn(false);
     when(sessionState.hasResources(inputOutputJars)).thenReturn(false);
     task.updateSession(sessionState, conf, path, inputOutputJars, resMap);
     verify(session).addAppMasterLocalFiles(resMap);
@@ -254,6 +255,7 @@ public class TestTezTask {
         .thenReturn(resources);
     when(utils.getBaseName(res)).thenReturn("foo.jar");
     when(sessionState.isOpen()).thenReturn(true);
+    when(sessionState.isOpening()).thenReturn(false);
     when(sessionState.hasResources(inputOutputJars)).thenReturn(false);
     task.addExtraResourcesToDag(sessionState, dag, inputOutputJars, resMap);
     verify(dag).addTaskLocalFiles(resMap);