You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/07/16 09:52:28 UTC

[10/50] [abbrv] hive git commit: HIVE-4239 : Remove lock on compilation stage (Sergey Shelukhin, reviewed by Thejas M Nair)

HIVE-4239 : Remove lock on compilation stage (Sergey Shelukhin, reviewed by Thejas M Nair)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/be89eac6
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/be89eac6
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/be89eac6

Branch: refs/heads/parquet
Commit: be89eac6e119f8aac09782da96b00f4b9a4b062c
Parents: 08595ff
Author: Sergey Shelukhin <se...@apache.org>
Authored: Thu Jul 9 11:14:43 2015 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Thu Jul 9 11:14:43 2015 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   2 +
 .../java/org/apache/hadoop/hive/ql/Driver.java  |  31 +++--
 .../optimizer/RemoveDynamicPruningBySize.java   |   2 +-
 .../hadoop/hive/ql/parse/GenTezProcContext.java |   8 ++
 .../hadoop/hive/ql/parse/GenTezUtils.java       |  59 +++-----
 .../apache/hadoop/hive/ql/parse/GenTezWork.java |  10 +-
 .../hadoop/hive/ql/parse/TezCompiler.java       |  14 +-
 .../hadoop/hive/ql/session/SessionState.java    |   8 ++
 .../service/cli/session/HiveSessionImpl.java    |  61 ++++++---
 .../cli/session/HiveSessionImplwithUGI.java     |   3 +-
 .../apache/hive/service/cli/CLIServiceTest.java | 135 +++++++++++++++++--
 11 files changed, 245 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/be89eac6/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 4549105..39477d6 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1789,6 +1789,8 @@ public class HiveConf extends Configuration {
         "Transport mode of HiveServer2."),
     HIVE_SERVER2_THRIFT_BIND_HOST("hive.server2.thrift.bind.host", "",
         "Bind host on which to run the HiveServer2 Thrift service."),
+    HIVE_SERVER2_PARALLEL_COMPILATION("hive.driver.parallel.compilation", false, "Whether to\n" +
+        "enable parallel compilation between sessions on HiveServer2. The default is false."),
 
     // http (over thrift) transport settings
     HIVE_SERVER2_THRIFT_HTTP_PORT("hive.server2.thrift.http.port", 10001,

http://git-wip-us.apache.org/repos/asf/hive/blob/be89eac6/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index e04165b..934cb42 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -32,6 +32,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
@@ -125,12 +126,11 @@ public class Driver implements CommandProcessor {
   static final private Log LOG = LogFactory.getLog(CLASS_NAME);
   static final private LogHelper console = new LogHelper(LOG);
 
-  private static final Object compileMonitor = new Object();
-
   private int maxRows = 100;
   ByteStream.Output bos = new ByteStream.Output();
 
-  private HiveConf conf;
+  private final HiveConf conf;
+  private final boolean isParallelEnabled;
   private DataInput resStream;
   private Context ctx;
   private DriverContext driverCxt;
@@ -193,7 +193,7 @@ public class Driver implements CommandProcessor {
   /**
    * Get a Schema with fields represented with native Hive types
    */
-  public static Schema getSchema(BaseSemanticAnalyzer sem, HiveConf conf) {
+  private static Schema getSchema(BaseSemanticAnalyzer sem, HiveConf conf) {
     Schema schema = null;
 
     // If we have a plan, prefer its logical result schema if it's
@@ -284,6 +284,8 @@ public class Driver implements CommandProcessor {
    */
   public Driver(HiveConf conf) {
     this.conf = conf;
+    isParallelEnabled = (conf != null)
+        && HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_PARALLEL_COMPILATION);
   }
 
   public Driver(HiveConf conf, String userName) {
@@ -292,9 +294,9 @@ public class Driver implements CommandProcessor {
   }
 
   public Driver() {
-    if (SessionState.get() != null) {
-      conf = SessionState.get().getConf();
-    }
+    conf = (SessionState.get() != null) ? SessionState.get().getConf() : null;
+    isParallelEnabled = (conf != null)
+        && HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_PARALLEL_COMPILATION);
   }
 
   /**
@@ -1118,10 +1120,23 @@ public class Driver implements CommandProcessor {
     return createProcessorResponse(compileInternal(command));
   }
 
+  private static final ReentrantLock globalCompileLock = new ReentrantLock();
   private int compileInternal(String command) {
+    boolean isParallelEnabled = SessionState.get().isHiveServerQuery() && this.isParallelEnabled;
     int ret;
-    synchronized (compileMonitor) {
+    final ReentrantLock compileLock = isParallelEnabled
+        ? SessionState.get().getCompileLock() : globalCompileLock;
+    compileLock.lock();
+    try {
+      if (isParallelEnabled && LOG.isDebugEnabled()) {
+        LOG.debug("Entering compile: " + command);
+      }
       ret = compile(command);
+      if (isParallelEnabled && LOG.isDebugEnabled()) {
+        LOG.debug("Done with compile: " + command);
+      }
+    } finally {
+      compileLock.unlock();
     }
     if (ret != 0) {
       try {

http://git-wip-us.apache.org/repos/asf/hive/blob/be89eac6/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RemoveDynamicPruningBySize.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RemoveDynamicPruningBySize.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RemoveDynamicPruningBySize.java
index 5d01311..1567326 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RemoveDynamicPruningBySize.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RemoveDynamicPruningBySize.java
@@ -56,7 +56,7 @@ public class RemoveDynamicPruningBySize implements NodeProcessor {
         (context.pruningOpsRemovedByPriorOpt.isEmpty() ||
          !context.pruningOpsRemovedByPriorOpt.contains(event))) {
       context.pruningOpsRemovedByPriorOpt.add(event);
-      GenTezUtils.getUtils().removeBranch(event);
+      GenTezUtils.removeBranch(event);
       // at this point we've found the fork in the op pipeline that has the pruning as a child plan.
       LOG.info("Disabling dynamic pruning for: "
           + ((DynamicPruningEventDesc) desc).getTableScan().getName()

http://git-wip-us.apache.org/repos/asf/hive/blob/be89eac6/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
index adc31ae..f474eae 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
@@ -82,6 +82,9 @@ public class GenTezProcContext implements NodeProcessorCtx{
   // walk.
   public Operator<? extends OperatorDesc> parentOfRoot;
 
+  // sequence number is used to name vertices (e.g.: Map 1, Reduce 14, ...)
+  private int sequenceNumber = 0;
+
   // tez task we're currently processing
   public TezTask currentTask;
 
@@ -188,4 +191,9 @@ public class GenTezProcContext implements NodeProcessorCtx{
 
     rootTasks.add(currentTask);
   }
+
+  /** Not thread-safe. */
+  public int nextSequenceNumber() {
+     return ++sequenceNumber;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/be89eac6/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
index 11c1df6..93ad145 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
@@ -61,42 +61,27 @@ import com.google.common.collect.HashBiMap;
 import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.AUTOPARALLEL;
 
 /**
- * GenTezUtils is a collection of shared helper methods to produce
- * TezWork
+ * GenTezUtils is a collection of shared helper methods to produce TezWork.
+ * All the methods in this class should be static, but some aren't; this is to facilitate testing.
+ * Methods are made non-static on as needed basis.
  */
 public class GenTezUtils {
+  static final private Log LOG = LogFactory.getLog(GenTezUtils.class);
 
-  static final private Log LOG = LogFactory.getLog(GenTezUtils.class.getName());
-
-  // sequence number is used to name vertices (e.g.: Map 1, Reduce 14, ...)
-  private int sequenceNumber = 0;
-
-  // singleton
-  private static GenTezUtils utils;
-
-  public static GenTezUtils getUtils() {
-    if (utils == null) {
-      utils = new GenTezUtils();
-    }
-    return utils;
+  public GenTezUtils() {
   }
 
-  protected GenTezUtils() {
-  }
-
-  public void resetSequenceNumber() {
-    sequenceNumber = 0;
-  }
-
-  public UnionWork createUnionWork(GenTezProcContext context, Operator<?> root, Operator<?> leaf, TezWork tezWork) {
-    UnionWork unionWork = new UnionWork("Union "+ (++sequenceNumber));
+  public static UnionWork createUnionWork(
+      GenTezProcContext context, Operator<?> root, Operator<?> leaf, TezWork tezWork) {
+    UnionWork unionWork = new UnionWork("Union "+ context.nextSequenceNumber());
     context.rootUnionWorkMap.put(root, unionWork);
     context.unionWorkMap.put(leaf, unionWork);
     tezWork.add(unionWork);
     return unionWork;
   }
 
-  public ReduceWork createReduceWork(GenTezProcContext context, Operator<?> root, TezWork tezWork) {
+  public static ReduceWork createReduceWork(
+      GenTezProcContext context, Operator<?> root, TezWork tezWork) {
     assert !root.getParentOperators().isEmpty();
 
     boolean isAutoReduceParallelism =
@@ -107,7 +92,7 @@ public class GenTezUtils {
     float minPartitionFactor = context.conf.getFloatVar(HiveConf.ConfVars.TEZ_MIN_PARTITION_FACTOR);
     long bytesPerReducer = context.conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER);
 
-    ReduceWork reduceWork = new ReduceWork(Utilities.REDUCENAME + (++sequenceNumber));
+    ReduceWork reduceWork = new ReduceWork(Utilities.REDUCENAME + context.nextSequenceNumber());
     LOG.debug("Adding reduce work (" + reduceWork.getName() + ") for " + root);
     reduceWork.setReducer(root);
     reduceWork.setNeedsTagging(GenMapRedUtils.needsTagging(reduceWork));
@@ -161,8 +146,8 @@ public class GenTezUtils {
     return reduceWork;
   }
 
-  protected void setupReduceSink(GenTezProcContext context, ReduceWork reduceWork,
-      ReduceSinkOperator reduceSink) {
+  private static void setupReduceSink(
+      GenTezProcContext context, ReduceWork reduceWork, ReduceSinkOperator reduceSink) {
 
     LOG.debug("Setting up reduce sink: " + reduceSink
         + " with following reduce work: " + reduceWork.getName());
@@ -182,7 +167,7 @@ public class GenTezUtils {
   public MapWork createMapWork(GenTezProcContext context, Operator<?> root,
       TezWork tezWork, PrunedPartitionList partitions) throws SemanticException {
     assert root.getParentOperators().isEmpty();
-    MapWork mapWork = new MapWork(Utilities.MAPNAME + (++sequenceNumber));
+    MapWork mapWork = new MapWork(Utilities.MAPNAME + context.nextSequenceNumber());
     LOG.debug("Adding map work (" + mapWork.getName() + ") for " + root);
 
     // map work starts with table scan operators
@@ -213,7 +198,7 @@ public class GenTezUtils {
   }
 
   // removes any union operator and clones the plan
-  public void removeUnionOperators(Configuration conf, GenTezProcContext context,
+  public static void removeUnionOperators(Configuration conf, GenTezProcContext context,
       BaseWork work)
     throws SemanticException {
 
@@ -354,7 +339,7 @@ public class GenTezUtils {
     work.replaceRoots(replacementMap);
   }
 
-  public void processFileSink(GenTezProcContext context, FileSinkOperator fileSink)
+  public static void processFileSink(GenTezProcContext context, FileSinkOperator fileSink)
       throws SemanticException {
 
     ParseContext parseContext = context.parseContext;
@@ -393,8 +378,8 @@ public class GenTezUtils {
    * @param procCtx
    * @param event
    */
-  public void processAppMasterEvent(GenTezProcContext procCtx, AppMasterEventOperator event) {
-
+  public static void processAppMasterEvent(
+      GenTezProcContext procCtx, AppMasterEventOperator event) {
     if (procCtx.abandonedEventOperatorSet.contains(event)) {
       // don't need this anymore
       return;
@@ -444,7 +429,7 @@ public class GenTezUtils {
   /**
    * getEncosingWork finds the BaseWork any given operator belongs to.
    */
-  public BaseWork getEnclosingWork(Operator<?> op, GenTezProcContext procCtx) {
+  public static BaseWork getEnclosingWork(Operator<?> op, GenTezProcContext procCtx) {
     List<Operator<?>> ops = new ArrayList<Operator<?>>();
     findRoots(op, ops);
     for (Operator<?> r : ops) {
@@ -459,7 +444,7 @@ public class GenTezUtils {
   /*
    * findRoots returns all root operators (in ops) that result in operator op
    */
-  private void findRoots(Operator<?> op, List<Operator<?>> ops) {
+  private static void findRoots(Operator<?> op, List<Operator<?>> ops) {
     List<Operator<?>> parents = op.getParentOperators();
     if (parents == null || parents.isEmpty()) {
       ops.add(op);
@@ -474,7 +459,7 @@ public class GenTezUtils {
    * Remove an operator branch. When we see a fork, we know it's time to do the removal.
    * @param event the leaf node of which branch to be removed
    */
-  public void removeBranch(AppMasterEventOperator event) {
+  public static void removeBranch(AppMasterEventOperator event) {
     Operator<?> child = event;
     Operator<?> curr = event;
 
@@ -485,4 +470,4 @@ public class GenTezUtils {
 
     curr.removeChild(child);
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/be89eac6/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
index 6db8220..6b3e19d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
@@ -61,12 +61,8 @@ public class GenTezWork implements NodeProcessor {
 
   static final private Log LOG = LogFactory.getLog(GenTezWork.class.getName());
 
-  // instance of shared utils
-  private GenTezUtils utils = null;
+  private final GenTezUtils utils;
 
-  /**
-   * Constructor takes utils as parameter to facilitate testing
-   */
   public GenTezWork(GenTezUtils utils) {
     this.utils = utils;
   }
@@ -130,7 +126,7 @@ public class GenTezWork implements NodeProcessor {
       if (context.preceedingWork == null) {
         work = utils.createMapWork(context, root, tezWork, null);
       } else {
-        work = utils.createReduceWork(context, root, tezWork);
+        work = GenTezUtils.createReduceWork(context, root, tezWork);
       }
       context.rootToWorkMap.put(root, work);
     }
@@ -295,7 +291,7 @@ public class GenTezWork implements NodeProcessor {
           // if unionWork is null, it means it is the first time. we need to
           // create a union work object and add this work to it. Subsequent 
           // work should reference the union and not the actual work.
-          unionWork = utils.createUnionWork(context, root, operator, tezWork);
+          unionWork = GenTezUtils.createUnionWork(context, root, operator, tezWork);
           // finally connect the union work with work
           connectUnionWorkWithWork(unionWork, work, tezWork, context);
         }

http://git-wip-us.apache.org/repos/asf/hive/blob/be89eac6/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
index f20393a..9503fa8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
@@ -178,7 +178,7 @@ public class TezCompiler extends TaskCompiler {
       return;
     }
 
-    GenTezUtils.getUtils().removeBranch(victim);
+    GenTezUtils.removeBranch(victim);
     // at this point we've found the fork in the op pipeline that has the pruning as a child plan.
     LOG.info("Disabling dynamic pruning for: "
         + ((DynamicPruningEventDesc) victim.getConf()).getTableScan().toString()
@@ -319,10 +319,10 @@ public class TezCompiler extends TaskCompiler {
       List<Task<MoveWork>> mvTask, Set<ReadEntity> inputs, Set<WriteEntity> outputs)
       throws SemanticException {
 
-    GenTezUtils.getUtils().resetSequenceNumber();
 
     ParseContext tempParseContext = getParseContext(pCtx, rootTasks);
-    GenTezWork genTezWork = new GenTezWork(GenTezUtils.getUtils());
+    GenTezUtils utils = new GenTezUtils();
+    GenTezWork genTezWork = new GenTezWork(utils);
 
     GenTezProcContext procCtx = new GenTezProcContext(
         conf, tempParseContext, mvTask, rootTasks, inputs, outputs);
@@ -351,7 +351,7 @@ public class TezCompiler extends TaskCompiler {
 
     opRules.put(new RuleRegExp("Handle Potential Analyze Command",
         TableScanOperator.getOperatorName() + "%"),
-        new ProcessAnalyzeTable(GenTezUtils.getUtils()));
+        new ProcessAnalyzeTable(utils));
 
     opRules.put(new RuleRegExp("Remember union",
         UnionOperator.getOperatorName() + "%"),
@@ -371,19 +371,19 @@ public class TezCompiler extends TaskCompiler {
 
     // we need to clone some operator plans and remove union operators still
     for (BaseWork w: procCtx.workWithUnionOperators) {
-      GenTezUtils.getUtils().removeUnionOperators(conf, procCtx, w);
+      GenTezUtils.removeUnionOperators(conf, procCtx, w);
     }
 
     // then we make sure the file sink operators are set up right
     for (FileSinkOperator fileSink: procCtx.fileSinkSet) {
-      GenTezUtils.getUtils().processFileSink(procCtx, fileSink);
+      GenTezUtils.processFileSink(procCtx, fileSink);
     }
 
     // and finally we hook up any events that need to be sent to the tez AM
     LOG.debug("There are " + procCtx.eventOperatorSet.size() + " app master events.");
     for (AppMasterEventOperator event : procCtx.eventOperatorSet) {
       LOG.debug("Handling AppMasterEventOperator: " + event);
-      GenTezUtils.getUtils().processAppMasterEvent(procCtx, event);
+      GenTezUtils.processAppMasterEvent(procCtx, event);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/be89eac6/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
index 37d856c..0bc9a46 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
@@ -38,6 +38,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
@@ -108,6 +109,9 @@ public class SessionState {
 
   protected ClassLoader parentLoader;
 
+  // Session-scope compile lock.
+  private final ReentrantLock compileLock = new ReentrantLock();
+
   /**
    * current configuration.
    */
@@ -319,6 +323,10 @@ public class SessionState {
     this.isSilent = isSilent;
   }
 
+  public ReentrantLock getCompileLock() {
+    return compileLock;
+  }
+
   public boolean getIsVerbose() {
     return isVerbose;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/be89eac6/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index 9a20799..a600309 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -23,6 +23,7 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -76,17 +77,26 @@ import org.apache.hive.service.server.ThreadWithGarbageCleanup;
  *
  */
 public class HiveSessionImpl implements HiveSession {
+  private static final String FETCH_WORK_SERDE_CLASS =
+      "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
+  private static final Log LOG = LogFactory.getLog(HiveSessionImpl.class);
+
+  // Shared between threads (including SessionState!)
   private final SessionHandle sessionHandle;
   private String username;
   private final String password;
-  private HiveConf hiveConf;
+  private final HiveConf hiveConf;
+  // TODO: some SessionState internals are not thread safe. The compile-time internals are synced
+  //       via session-scope or global compile lock. The run-time internals work by magic!
+  //       They probably work because races are relatively unlikely and few tools run parallel
+  //       queries from the same session.
+  //       1) OperationState should be refactored out of SessionState, and made thread-local.
+  //       2) Some parts of session state, like mrStats and vars, need proper synchronization.
   private SessionState sessionState;
   private String ipAddress;
-  private static final String FETCH_WORK_SERDE_CLASS =
-      "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
-  private static final Log LOG = LogFactory.getLog(HiveSessionImpl.class);
   private SessionManager sessionManager;
   private OperationManager operationManager;
+  // Synchronized by locking on itself.
   private final Set<OperationHandle> opHandleSet = new HashSet<OperationHandle>();
   private boolean isOperationLogEnabled;
   private File sessionLogDir;
@@ -393,7 +403,7 @@ public class HiveSessionImpl implements HiveSession {
     OperationHandle opHandle = operation.getHandle();
     try {
       operation.run();
-      opHandleSet.add(opHandle);
+      addOpHandle(opHandle);
       return opHandle;
     } catch (HiveSQLException e) {
       // Refering to SQLOperation.java,there is no chance that a HiveSQLException throws and the asyn
@@ -416,7 +426,7 @@ public class HiveSessionImpl implements HiveSession {
     OperationHandle opHandle = operation.getHandle();
     try {
       operation.run();
-      opHandleSet.add(opHandle);
+      addOpHandle(opHandle);
       return opHandle;
     } catch (HiveSQLException e) {
       operationManager.closeOperation(opHandle);
@@ -436,7 +446,7 @@ public class HiveSessionImpl implements HiveSession {
     OperationHandle opHandle = operation.getHandle();
     try {
       operation.run();
-      opHandleSet.add(opHandle);
+      addOpHandle(opHandle);
       return opHandle;
     } catch (HiveSQLException e) {
       operationManager.closeOperation(opHandle);
@@ -457,7 +467,7 @@ public class HiveSessionImpl implements HiveSession {
     OperationHandle opHandle = operation.getHandle();
     try {
       operation.run();
-      opHandleSet.add(opHandle);
+      addOpHandle(opHandle);
       return opHandle;
     } catch (HiveSQLException e) {
       operationManager.closeOperation(opHandle);
@@ -479,7 +489,7 @@ public class HiveSessionImpl implements HiveSession {
     OperationHandle opHandle = operation.getHandle();
     try {
       operation.run();
-      opHandleSet.add(opHandle);
+      addOpHandle(opHandle);
       return opHandle;
     } catch (HiveSQLException e) {
       operationManager.closeOperation(opHandle);
@@ -499,7 +509,7 @@ public class HiveSessionImpl implements HiveSession {
     OperationHandle opHandle = operation.getHandle();
     try {
       operation.run();
-      opHandleSet.add(opHandle);
+      addOpHandle(opHandle);
       return opHandle;
     } catch (HiveSQLException e) {
       operationManager.closeOperation(opHandle);
@@ -524,7 +534,7 @@ public class HiveSessionImpl implements HiveSession {
     OperationHandle opHandle = operation.getHandle();
     try {
       operation.run();
-      opHandleSet.add(opHandle);
+      addOpHandle(opHandle);
       return opHandle;
     } catch (HiveSQLException e) {
       operationManager.closeOperation(opHandle);
@@ -534,6 +544,12 @@ public class HiveSessionImpl implements HiveSession {
     }
   }
 
+  private void addOpHandle(OperationHandle opHandle) {
+    synchronized (opHandleSet) {
+      opHandleSet.add(opHandle);
+    }
+  }
+
   @Override
   public OperationHandle getFunctions(String catalogName, String schemaName, String functionName)
       throws HiveSQLException {
@@ -545,7 +561,7 @@ public class HiveSessionImpl implements HiveSession {
     OperationHandle opHandle = operation.getHandle();
     try {
       operation.run();
-      opHandleSet.add(opHandle);
+      addOpHandle(opHandle);
       return opHandle;
     } catch (HiveSQLException e) {
       operationManager.closeOperation(opHandle);
@@ -560,10 +576,14 @@ public class HiveSessionImpl implements HiveSession {
     try {
       acquire(true);
       // Iterate through the opHandles and close their operations
-      for (OperationHandle opHandle : opHandleSet) {
+      List<OperationHandle> ops = null;
+      synchronized (opHandleSet) {
+        ops = new ArrayList<>(opHandleSet);
+        opHandleSet.clear();
+      }
+      for (OperationHandle opHandle : ops) {
         operationManager.closeOperation(opHandle);
       }
-      opHandleSet.clear();
       // Cleanup session log directory.
       cleanupSessionLogDir();
       HiveHistory hiveHist = sessionState.getHiveHistory();
@@ -630,7 +650,10 @@ public class HiveSessionImpl implements HiveSession {
 
   @Override
   public void closeExpiredOperations() {
-    OperationHandle[] handles = opHandleSet.toArray(new OperationHandle[opHandleSet.size()]);
+    OperationHandle[] handles;
+    synchronized (opHandleSet) {
+      handles = opHandleSet.toArray(new OperationHandle[opHandleSet.size()]);
+    }
     if (handles.length > 0) {
       List<Operation> operations = operationManager.removeExpiredOperations(handles);
       if (!operations.isEmpty()) {
@@ -648,7 +671,9 @@ public class HiveSessionImpl implements HiveSession {
     acquire(false);
     try {
       for (Operation operation : operations) {
-        opHandleSet.remove(operation.getHandle());
+        synchronized (opHandleSet) {
+          opHandleSet.remove(operation.getHandle());
+        }
         try {
           operation.close();
         } catch (Exception e) {
@@ -675,7 +700,9 @@ public class HiveSessionImpl implements HiveSession {
     acquire(true);
     try {
       operationManager.closeOperation(opHandle);
-      opHandleSet.remove(opHandle);
+      synchronized (opHandleSet) {
+        opHandleSet.remove(opHandle);
+      }
     } finally {
       release(true);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/be89eac6/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java
index cd3c3f9..bf808f1 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java
@@ -37,7 +37,8 @@ import org.apache.hive.service.cli.thrift.TProtocolVersion;
 /**
  *
  * HiveSessionImplwithUGI.
- * HiveSession with connecting user's UGI and delegation token if required
+ * HiveSession with connecting user's UGI and delegation token if required.
+ * Note: this object may be shared between threads in HS2.
  */
 public class HiveSessionImplwithUGI extends HiveSessionImpl {
   public static final String HS2TOKEN = "HiveServer2ImpersonationToken";

http://git-wip-us.apache.org/repos/asf/hive/blob/be89eac6/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
----------------------------------------------------------------------
diff --git a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
index b4d517f..c73d152 100644
--- a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
+++ b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
@@ -26,9 +26,18 @@ import static org.junit.Assert.fail;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hive.service.server.HiveServer2;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -38,6 +47,7 @@ import org.junit.Test;
  *
  */
 public abstract class CLIServiceTest {
+  private static final Log LOG = LogFactory.getLog(CLIServiceTest.class);
 
   protected static CLIServiceClient client;
 
@@ -206,7 +216,7 @@ public abstract class CLIServiceTest {
         HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT, TimeUnit.MILLISECONDS);
     queryString = "SELECT NON_EXISTING_COLUMN FROM " + tableName;
     try {
-      runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.ERROR, longPollingTimeout);
+      runAsyncAndWait(sessionHandle, queryString, confOverlay, OperationState.ERROR, longPollingTimeout);
     }
     catch (HiveSQLException e) {
       // expected error
@@ -218,7 +228,7 @@ public abstract class CLIServiceTest {
      * Also check that the sqlState and errorCode should be set
      */
     queryString = "CREATE TABLE NON_EXISTING_TAB (ID STRING) location 'invalid://localhost:10000/a/b/c'";
-    opStatus = runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.ERROR, longPollingTimeout);
+    opStatus = runAsyncAndWait(sessionHandle, queryString, confOverlay, OperationState.ERROR, longPollingTimeout);
     // sqlState, errorCode should be set
     assertEquals(opStatus.getOperationException().getSQLState(), "08S01");
     assertEquals(opStatus.getOperationException().getErrorCode(), 1);
@@ -226,21 +236,21 @@ public abstract class CLIServiceTest {
      * Execute an async query with default config
      */
     queryString = "SELECT ID+1 FROM " + tableName;
-    runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.FINISHED, longPollingTimeout);
+    runAsyncAndWait(sessionHandle, queryString, confOverlay, OperationState.FINISHED, longPollingTimeout);
 
     /**
      * Execute an async query with long polling timeout set to 0
      */
     longPollingTimeout = 0;
     queryString = "SELECT ID+1 FROM " + tableName;
-    runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.FINISHED, longPollingTimeout);
+    runAsyncAndWait(sessionHandle, queryString, confOverlay, OperationState.FINISHED, longPollingTimeout);
 
     /**
      * Execute an async query with long polling timeout set to 500 millis
      */
     longPollingTimeout = 500;
     queryString = "SELECT ID+1 FROM " + tableName;
-    runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.FINISHED, longPollingTimeout);
+    runAsyncAndWait(sessionHandle, queryString, confOverlay, OperationState.FINISHED, longPollingTimeout);
 
     /**
      * Cancellation test
@@ -259,6 +269,92 @@ public abstract class CLIServiceTest {
     client.closeSession(sessionHandle);
   }
 
+
+  private void syncThreadStart(final CountDownLatch cdlIn, final CountDownLatch cdlOut) {
+    cdlIn.countDown();
+    try {
+      cdlOut.await();
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Test
+  public void testExecuteStatementParallel() throws Exception {
+    Map<String, String> confOverlay = new HashMap<String, String>();
+    String tableName = "TEST_EXEC_PARALLEL";
+    String columnDefinitions = "(ID STRING)";
+
+    // Open a session and set up the test data
+    SessionHandle sessionHandle = setupTestData(tableName, columnDefinitions, confOverlay);
+    assertNotNull(sessionHandle);
+
+    long longPollingTimeout = HiveConf.getTimeVar(new HiveConf(),
+        HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT, TimeUnit.MILLISECONDS);
+    confOverlay.put(
+        HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT.varname, longPollingTimeout + "ms");
+
+    int THREAD_COUNT = 10, QUERY_COUNT = 10;
+    // TODO: refactor this into an utility, LLAP tests use this pattern a lot
+    ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
+    CountDownLatch cdlIn = new CountDownLatch(THREAD_COUNT), cdlOut = new CountDownLatch(1);
+    @SuppressWarnings("unchecked")
+    Callable<Void>[] cs = (Callable<Void>[])new Callable[3];
+    // Create callables with different queries.
+    String query = "SELECT ID + %1$d FROM " + tableName;
+    cs[0] = createQueryCallable(
+        query, confOverlay, longPollingTimeout, QUERY_COUNT, cdlIn, cdlOut);
+    query = "SELECT t1.ID, SUM(t2.ID) + %1$d FROM  " + tableName + " t1 CROSS JOIN "
+        + tableName + " t2 GROUP BY t1.ID HAVING t1.ID > 1";
+    cs[1] = createQueryCallable(
+        query, confOverlay, longPollingTimeout, QUERY_COUNT, cdlIn, cdlOut);
+    query = "SELECT b.a FROM (SELECT (t1.ID + %1$d) as a , t2.* FROM  " + tableName
+        + " t1 INNER JOIN " + tableName + " t2 ON t1.ID = t2.ID WHERE t2.ID > 2) b";
+    cs[2] = createQueryCallable(
+        query, confOverlay, longPollingTimeout, QUERY_COUNT, cdlIn, cdlOut);
+
+    @SuppressWarnings("unchecked")
+    FutureTask<Void>[] tasks = (FutureTask<Void>[])new FutureTask[THREAD_COUNT];
+    for (int i = 0; i < THREAD_COUNT; ++i) {
+      tasks[i] = new FutureTask<Void>(cs[i % cs.length]);
+      executor.execute(tasks[i]);
+    }
+    try {
+      cdlIn.await(); // Wait for all threads to be ready.
+      cdlOut.countDown(); // Release them at the same time.
+      for (int i = 0; i < THREAD_COUNT; ++i) {
+        tasks[i].get();
+      }
+    } catch (Throwable t) {
+      throw new RuntimeException(t);
+    }
+
+    // Cleanup
+    client.executeStatement(sessionHandle, "DROP TABLE " + tableName, confOverlay);
+    client.closeSession(sessionHandle);
+  }
+
+  private Callable<Void> createQueryCallable(final String queryStringFormat,
+      final Map<String, String> confOverlay, final long longPollingTimeout,
+      final int queryCount, final CountDownLatch cdlIn, final CountDownLatch cdlOut) {
+    return new Callable<Void>() {
+      public Void call() throws Exception {
+        syncThreadStart(cdlIn, cdlOut);
+        SessionHandle sessionHandle = openSession(confOverlay);
+        OperationHandle[] hs  = new OperationHandle[queryCount];
+        for (int i = 0; i < hs.length; ++i) {
+          String queryString = String.format(queryStringFormat, i);
+          LOG.info("Submitting " + i);
+          hs[i] = client.executeStatementAsync(sessionHandle, queryString, confOverlay);
+        }
+        for (int i = hs.length - 1; i >= 0; --i) {
+          waitForAsyncQuery(hs[i], OperationState.FINISHED, longPollingTimeout);
+        }
+        return null;
+      }
+    };
+  }
+
   /**
    * Sets up a test specific table with the given column definitions and config
    * @param tableName
@@ -268,13 +364,27 @@ public abstract class CLIServiceTest {
    */
   private SessionHandle setupTestData(String tableName, String columnDefinitions,
       Map<String, String> confOverlay) throws Exception {
+    SessionHandle sessionHandle = openSession(confOverlay);
+    createTestTable(tableName, columnDefinitions, confOverlay, sessionHandle);
+    return sessionHandle;
+  }
+
+  private SessionHandle openSession(Map<String, String> confOverlay)
+      throws HiveSQLException {
     SessionHandle sessionHandle = client.openSession("tom", "password", confOverlay);
     assertNotNull(sessionHandle);
+    SessionState.get().setIsHiveServerQuery(true); // Pretend we are in HS2.
 
     String queryString = "SET " + HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname
         + " = false";
     client.executeStatement(sessionHandle, queryString, confOverlay);
+    return sessionHandle;
+  }
 
+  private void createTestTable(String tableName, String columnDefinitions,
+      Map<String, String> confOverlay, SessionHandle sessionHandle)
+      throws HiveSQLException {
+    String queryString;
     // Drop the table if it exists
     queryString = "DROP TABLE IF EXISTS " + tableName;
     client.executeStatement(sessionHandle, queryString, confOverlay);
@@ -282,22 +392,27 @@ public abstract class CLIServiceTest {
     // Create a test table
     queryString = "CREATE TABLE " + tableName + columnDefinitions;
     client.executeStatement(sessionHandle, queryString, confOverlay);
-
-    return sessionHandle;
   }
 
-  private OperationStatus runQueryAsync(SessionHandle sessionHandle, String queryString,
+  private OperationStatus runAsyncAndWait(SessionHandle sessionHandle, String queryString,
       Map<String, String> confOverlay, OperationState expectedState,
       long longPollingTimeout) throws HiveSQLException {
     // Timeout for the iteration in case of asynchronous execute
+    confOverlay.put(
+        HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT.varname, longPollingTimeout + "ms");
+    OperationHandle h = client.executeStatementAsync(sessionHandle, queryString, confOverlay);
+    return waitForAsyncQuery(h, expectedState, longPollingTimeout);
+  }
+
+
+  private OperationStatus waitForAsyncQuery(OperationHandle opHandle,
+      OperationState expectedState, long longPollingTimeout) throws HiveSQLException {
     long testIterationTimeout = System.currentTimeMillis() + 100000;
     long longPollingStart;
     long longPollingEnd;
     long longPollingTimeDelta;
     OperationStatus opStatus = null;
     OperationState state = null;
-    confOverlay.put(HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT.varname, longPollingTimeout + "ms");
-    OperationHandle opHandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay);
     int count = 0;
     while (true) {
       // Break if iteration times out