You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/07/16 09:52:28 UTC
[10/50] [abbrv] hive git commit: HIVE-4239 : Remove lock on
compilation stage (Sergey Shelukhin, reviewed by Thejas M Nair)
HIVE-4239 : Remove lock on compilation stage (Sergey Shelukhin, reviewed by Thejas M Nair)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/be89eac6
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/be89eac6
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/be89eac6
Branch: refs/heads/parquet
Commit: be89eac6e119f8aac09782da96b00f4b9a4b062c
Parents: 08595ff
Author: Sergey Shelukhin <se...@apache.org>
Authored: Thu Jul 9 11:14:43 2015 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Thu Jul 9 11:14:43 2015 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 2 +
.../java/org/apache/hadoop/hive/ql/Driver.java | 31 +++--
.../optimizer/RemoveDynamicPruningBySize.java | 2 +-
.../hadoop/hive/ql/parse/GenTezProcContext.java | 8 ++
.../hadoop/hive/ql/parse/GenTezUtils.java | 59 +++-----
.../apache/hadoop/hive/ql/parse/GenTezWork.java | 10 +-
.../hadoop/hive/ql/parse/TezCompiler.java | 14 +-
.../hadoop/hive/ql/session/SessionState.java | 8 ++
.../service/cli/session/HiveSessionImpl.java | 61 ++++++---
.../cli/session/HiveSessionImplwithUGI.java | 3 +-
.../apache/hive/service/cli/CLIServiceTest.java | 135 +++++++++++++++++--
11 files changed, 245 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/be89eac6/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 4549105..39477d6 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1789,6 +1789,8 @@ public class HiveConf extends Configuration {
"Transport mode of HiveServer2."),
HIVE_SERVER2_THRIFT_BIND_HOST("hive.server2.thrift.bind.host", "",
"Bind host on which to run the HiveServer2 Thrift service."),
+ HIVE_SERVER2_PARALLEL_COMPILATION("hive.driver.parallel.compilation", false, "Whether to\n" +
+ "enable parallel compilation between sessions on HiveServer2. The default is false."),
// http (over thrift) transport settings
HIVE_SERVER2_THRIFT_HTTP_PORT("hive.server2.thrift.http.port", 10001,
http://git-wip-us.apache.org/repos/asf/hive/blob/be89eac6/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index e04165b..934cb42 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -32,6 +32,7 @@ import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
@@ -125,12 +126,11 @@ public class Driver implements CommandProcessor {
static final private Log LOG = LogFactory.getLog(CLASS_NAME);
static final private LogHelper console = new LogHelper(LOG);
- private static final Object compileMonitor = new Object();
-
private int maxRows = 100;
ByteStream.Output bos = new ByteStream.Output();
- private HiveConf conf;
+ private final HiveConf conf;
+ private final boolean isParallelEnabled;
private DataInput resStream;
private Context ctx;
private DriverContext driverCxt;
@@ -193,7 +193,7 @@ public class Driver implements CommandProcessor {
/**
* Get a Schema with fields represented with native Hive types
*/
- public static Schema getSchema(BaseSemanticAnalyzer sem, HiveConf conf) {
+ private static Schema getSchema(BaseSemanticAnalyzer sem, HiveConf conf) {
Schema schema = null;
// If we have a plan, prefer its logical result schema if it's
@@ -284,6 +284,8 @@ public class Driver implements CommandProcessor {
*/
public Driver(HiveConf conf) {
this.conf = conf;
+ isParallelEnabled = (conf != null)
+ && HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_PARALLEL_COMPILATION);
}
public Driver(HiveConf conf, String userName) {
@@ -292,9 +294,9 @@ public class Driver implements CommandProcessor {
}
public Driver() {
- if (SessionState.get() != null) {
- conf = SessionState.get().getConf();
- }
+ conf = (SessionState.get() != null) ? SessionState.get().getConf() : null;
+ isParallelEnabled = (conf != null)
+ && HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_PARALLEL_COMPILATION);
}
/**
@@ -1118,10 +1120,23 @@ public class Driver implements CommandProcessor {
return createProcessorResponse(compileInternal(command));
}
+ private static final ReentrantLock globalCompileLock = new ReentrantLock();
private int compileInternal(String command) {
+ boolean isParallelEnabled = SessionState.get().isHiveServerQuery() && this.isParallelEnabled;
int ret;
- synchronized (compileMonitor) {
+ final ReentrantLock compileLock = isParallelEnabled
+ ? SessionState.get().getCompileLock() : globalCompileLock;
+ compileLock.lock();
+ try {
+ if (isParallelEnabled && LOG.isDebugEnabled()) {
+ LOG.debug("Entering compile: " + command);
+ }
ret = compile(command);
+ if (isParallelEnabled && LOG.isDebugEnabled()) {
+ LOG.debug("Done with compile: " + command);
+ }
+ } finally {
+ compileLock.unlock();
}
if (ret != 0) {
try {
http://git-wip-us.apache.org/repos/asf/hive/blob/be89eac6/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RemoveDynamicPruningBySize.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RemoveDynamicPruningBySize.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RemoveDynamicPruningBySize.java
index 5d01311..1567326 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RemoveDynamicPruningBySize.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RemoveDynamicPruningBySize.java
@@ -56,7 +56,7 @@ public class RemoveDynamicPruningBySize implements NodeProcessor {
(context.pruningOpsRemovedByPriorOpt.isEmpty() ||
!context.pruningOpsRemovedByPriorOpt.contains(event))) {
context.pruningOpsRemovedByPriorOpt.add(event);
- GenTezUtils.getUtils().removeBranch(event);
+ GenTezUtils.removeBranch(event);
// at this point we've found the fork in the op pipeline that has the pruning as a child plan.
LOG.info("Disabling dynamic pruning for: "
+ ((DynamicPruningEventDesc) desc).getTableScan().getName()
http://git-wip-us.apache.org/repos/asf/hive/blob/be89eac6/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
index adc31ae..f474eae 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
@@ -82,6 +82,9 @@ public class GenTezProcContext implements NodeProcessorCtx{
// walk.
public Operator<? extends OperatorDesc> parentOfRoot;
+ // sequence number is used to name vertices (e.g.: Map 1, Reduce 14, ...)
+ private int sequenceNumber = 0;
+
// tez task we're currently processing
public TezTask currentTask;
@@ -188,4 +191,9 @@ public class GenTezProcContext implements NodeProcessorCtx{
rootTasks.add(currentTask);
}
+
+ /** Not thread-safe. */
+ public int nextSequenceNumber() {
+ return ++sequenceNumber;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/be89eac6/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
index 11c1df6..93ad145 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
@@ -61,42 +61,27 @@ import com.google.common.collect.HashBiMap;
import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.AUTOPARALLEL;
/**
- * GenTezUtils is a collection of shared helper methods to produce
- * TezWork
+ * GenTezUtils is a collection of shared helper methods to produce TezWork.
+ * All the methods in this class should be static, but some aren't; this is to facilitate testing.
+ * Methods are made non-static on as needed basis.
*/
public class GenTezUtils {
+ static final private Log LOG = LogFactory.getLog(GenTezUtils.class);
- static final private Log LOG = LogFactory.getLog(GenTezUtils.class.getName());
-
- // sequence number is used to name vertices (e.g.: Map 1, Reduce 14, ...)
- private int sequenceNumber = 0;
-
- // singleton
- private static GenTezUtils utils;
-
- public static GenTezUtils getUtils() {
- if (utils == null) {
- utils = new GenTezUtils();
- }
- return utils;
+ public GenTezUtils() {
}
- protected GenTezUtils() {
- }
-
- public void resetSequenceNumber() {
- sequenceNumber = 0;
- }
-
- public UnionWork createUnionWork(GenTezProcContext context, Operator<?> root, Operator<?> leaf, TezWork tezWork) {
- UnionWork unionWork = new UnionWork("Union "+ (++sequenceNumber));
+ public static UnionWork createUnionWork(
+ GenTezProcContext context, Operator<?> root, Operator<?> leaf, TezWork tezWork) {
+ UnionWork unionWork = new UnionWork("Union "+ context.nextSequenceNumber());
context.rootUnionWorkMap.put(root, unionWork);
context.unionWorkMap.put(leaf, unionWork);
tezWork.add(unionWork);
return unionWork;
}
- public ReduceWork createReduceWork(GenTezProcContext context, Operator<?> root, TezWork tezWork) {
+ public static ReduceWork createReduceWork(
+ GenTezProcContext context, Operator<?> root, TezWork tezWork) {
assert !root.getParentOperators().isEmpty();
boolean isAutoReduceParallelism =
@@ -107,7 +92,7 @@ public class GenTezUtils {
float minPartitionFactor = context.conf.getFloatVar(HiveConf.ConfVars.TEZ_MIN_PARTITION_FACTOR);
long bytesPerReducer = context.conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER);
- ReduceWork reduceWork = new ReduceWork(Utilities.REDUCENAME + (++sequenceNumber));
+ ReduceWork reduceWork = new ReduceWork(Utilities.REDUCENAME + context.nextSequenceNumber());
LOG.debug("Adding reduce work (" + reduceWork.getName() + ") for " + root);
reduceWork.setReducer(root);
reduceWork.setNeedsTagging(GenMapRedUtils.needsTagging(reduceWork));
@@ -161,8 +146,8 @@ public class GenTezUtils {
return reduceWork;
}
- protected void setupReduceSink(GenTezProcContext context, ReduceWork reduceWork,
- ReduceSinkOperator reduceSink) {
+ private static void setupReduceSink(
+ GenTezProcContext context, ReduceWork reduceWork, ReduceSinkOperator reduceSink) {
LOG.debug("Setting up reduce sink: " + reduceSink
+ " with following reduce work: " + reduceWork.getName());
@@ -182,7 +167,7 @@ public class GenTezUtils {
public MapWork createMapWork(GenTezProcContext context, Operator<?> root,
TezWork tezWork, PrunedPartitionList partitions) throws SemanticException {
assert root.getParentOperators().isEmpty();
- MapWork mapWork = new MapWork(Utilities.MAPNAME + (++sequenceNumber));
+ MapWork mapWork = new MapWork(Utilities.MAPNAME + context.nextSequenceNumber());
LOG.debug("Adding map work (" + mapWork.getName() + ") for " + root);
// map work starts with table scan operators
@@ -213,7 +198,7 @@ public class GenTezUtils {
}
// removes any union operator and clones the plan
- public void removeUnionOperators(Configuration conf, GenTezProcContext context,
+ public static void removeUnionOperators(Configuration conf, GenTezProcContext context,
BaseWork work)
throws SemanticException {
@@ -354,7 +339,7 @@ public class GenTezUtils {
work.replaceRoots(replacementMap);
}
- public void processFileSink(GenTezProcContext context, FileSinkOperator fileSink)
+ public static void processFileSink(GenTezProcContext context, FileSinkOperator fileSink)
throws SemanticException {
ParseContext parseContext = context.parseContext;
@@ -393,8 +378,8 @@ public class GenTezUtils {
* @param procCtx
* @param event
*/
- public void processAppMasterEvent(GenTezProcContext procCtx, AppMasterEventOperator event) {
-
+ public static void processAppMasterEvent(
+ GenTezProcContext procCtx, AppMasterEventOperator event) {
if (procCtx.abandonedEventOperatorSet.contains(event)) {
// don't need this anymore
return;
@@ -444,7 +429,7 @@ public class GenTezUtils {
/**
* getEncosingWork finds the BaseWork any given operator belongs to.
*/
- public BaseWork getEnclosingWork(Operator<?> op, GenTezProcContext procCtx) {
+ public static BaseWork getEnclosingWork(Operator<?> op, GenTezProcContext procCtx) {
List<Operator<?>> ops = new ArrayList<Operator<?>>();
findRoots(op, ops);
for (Operator<?> r : ops) {
@@ -459,7 +444,7 @@ public class GenTezUtils {
/*
* findRoots returns all root operators (in ops) that result in operator op
*/
- private void findRoots(Operator<?> op, List<Operator<?>> ops) {
+ private static void findRoots(Operator<?> op, List<Operator<?>> ops) {
List<Operator<?>> parents = op.getParentOperators();
if (parents == null || parents.isEmpty()) {
ops.add(op);
@@ -474,7 +459,7 @@ public class GenTezUtils {
* Remove an operator branch. When we see a fork, we know it's time to do the removal.
* @param event the leaf node of which branch to be removed
*/
- public void removeBranch(AppMasterEventOperator event) {
+ public static void removeBranch(AppMasterEventOperator event) {
Operator<?> child = event;
Operator<?> curr = event;
@@ -485,4 +470,4 @@ public class GenTezUtils {
curr.removeChild(child);
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/be89eac6/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
index 6db8220..6b3e19d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
@@ -61,12 +61,8 @@ public class GenTezWork implements NodeProcessor {
static final private Log LOG = LogFactory.getLog(GenTezWork.class.getName());
- // instance of shared utils
- private GenTezUtils utils = null;
+ private final GenTezUtils utils;
- /**
- * Constructor takes utils as parameter to facilitate testing
- */
public GenTezWork(GenTezUtils utils) {
this.utils = utils;
}
@@ -130,7 +126,7 @@ public class GenTezWork implements NodeProcessor {
if (context.preceedingWork == null) {
work = utils.createMapWork(context, root, tezWork, null);
} else {
- work = utils.createReduceWork(context, root, tezWork);
+ work = GenTezUtils.createReduceWork(context, root, tezWork);
}
context.rootToWorkMap.put(root, work);
}
@@ -295,7 +291,7 @@ public class GenTezWork implements NodeProcessor {
// if unionWork is null, it means it is the first time. we need to
// create a union work object and add this work to it. Subsequent
// work should reference the union and not the actual work.
- unionWork = utils.createUnionWork(context, root, operator, tezWork);
+ unionWork = GenTezUtils.createUnionWork(context, root, operator, tezWork);
// finally connect the union work with work
connectUnionWorkWithWork(unionWork, work, tezWork, context);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/be89eac6/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
index f20393a..9503fa8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
@@ -178,7 +178,7 @@ public class TezCompiler extends TaskCompiler {
return;
}
- GenTezUtils.getUtils().removeBranch(victim);
+ GenTezUtils.removeBranch(victim);
// at this point we've found the fork in the op pipeline that has the pruning as a child plan.
LOG.info("Disabling dynamic pruning for: "
+ ((DynamicPruningEventDesc) victim.getConf()).getTableScan().toString()
@@ -319,10 +319,10 @@ public class TezCompiler extends TaskCompiler {
List<Task<MoveWork>> mvTask, Set<ReadEntity> inputs, Set<WriteEntity> outputs)
throws SemanticException {
- GenTezUtils.getUtils().resetSequenceNumber();
ParseContext tempParseContext = getParseContext(pCtx, rootTasks);
- GenTezWork genTezWork = new GenTezWork(GenTezUtils.getUtils());
+ GenTezUtils utils = new GenTezUtils();
+ GenTezWork genTezWork = new GenTezWork(utils);
GenTezProcContext procCtx = new GenTezProcContext(
conf, tempParseContext, mvTask, rootTasks, inputs, outputs);
@@ -351,7 +351,7 @@ public class TezCompiler extends TaskCompiler {
opRules.put(new RuleRegExp("Handle Potential Analyze Command",
TableScanOperator.getOperatorName() + "%"),
- new ProcessAnalyzeTable(GenTezUtils.getUtils()));
+ new ProcessAnalyzeTable(utils));
opRules.put(new RuleRegExp("Remember union",
UnionOperator.getOperatorName() + "%"),
@@ -371,19 +371,19 @@ public class TezCompiler extends TaskCompiler {
// we need to clone some operator plans and remove union operators still
for (BaseWork w: procCtx.workWithUnionOperators) {
- GenTezUtils.getUtils().removeUnionOperators(conf, procCtx, w);
+ GenTezUtils.removeUnionOperators(conf, procCtx, w);
}
// then we make sure the file sink operators are set up right
for (FileSinkOperator fileSink: procCtx.fileSinkSet) {
- GenTezUtils.getUtils().processFileSink(procCtx, fileSink);
+ GenTezUtils.processFileSink(procCtx, fileSink);
}
// and finally we hook up any events that need to be sent to the tez AM
LOG.debug("There are " + procCtx.eventOperatorSet.size() + " app master events.");
for (AppMasterEventOperator event : procCtx.eventOperatorSet) {
LOG.debug("Handling AppMasterEventOperator: " + event);
- GenTezUtils.getUtils().processAppMasterEvent(procCtx, event);
+ GenTezUtils.processAppMasterEvent(procCtx, event);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/be89eac6/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
index 37d856c..0bc9a46 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
@@ -38,6 +38,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
@@ -108,6 +109,9 @@ public class SessionState {
protected ClassLoader parentLoader;
+ // Session-scope compile lock.
+ private final ReentrantLock compileLock = new ReentrantLock();
+
/**
* current configuration.
*/
@@ -319,6 +323,10 @@ public class SessionState {
this.isSilent = isSilent;
}
+ public ReentrantLock getCompileLock() {
+ return compileLock;
+ }
+
public boolean getIsVerbose() {
return isVerbose;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/be89eac6/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index 9a20799..a600309 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -23,6 +23,7 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -76,17 +77,26 @@ import org.apache.hive.service.server.ThreadWithGarbageCleanup;
*
*/
public class HiveSessionImpl implements HiveSession {
+ private static final String FETCH_WORK_SERDE_CLASS =
+ "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
+ private static final Log LOG = LogFactory.getLog(HiveSessionImpl.class);
+
+ // Shared between threads (including SessionState!)
private final SessionHandle sessionHandle;
private String username;
private final String password;
- private HiveConf hiveConf;
+ private final HiveConf hiveConf;
+ // TODO: some SessionState internals are not thread safe. The compile-time internals are synced
+ // via session-scope or global compile lock. The run-time internals work by magic!
+ // They probably work because races are relatively unlikely and few tools run parallel
+ // queries from the same session.
+ // 1) OperationState should be refactored out of SessionState, and made thread-local.
+ // 2) Some parts of session state, like mrStats and vars, need proper synchronization.
private SessionState sessionState;
private String ipAddress;
- private static final String FETCH_WORK_SERDE_CLASS =
- "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
- private static final Log LOG = LogFactory.getLog(HiveSessionImpl.class);
private SessionManager sessionManager;
private OperationManager operationManager;
+ // Synchronized by locking on itself.
private final Set<OperationHandle> opHandleSet = new HashSet<OperationHandle>();
private boolean isOperationLogEnabled;
private File sessionLogDir;
@@ -393,7 +403,7 @@ public class HiveSessionImpl implements HiveSession {
OperationHandle opHandle = operation.getHandle();
try {
operation.run();
- opHandleSet.add(opHandle);
+ addOpHandle(opHandle);
return opHandle;
} catch (HiveSQLException e) {
// Refering to SQLOperation.java,there is no chance that a HiveSQLException throws and the asyn
@@ -416,7 +426,7 @@ public class HiveSessionImpl implements HiveSession {
OperationHandle opHandle = operation.getHandle();
try {
operation.run();
- opHandleSet.add(opHandle);
+ addOpHandle(opHandle);
return opHandle;
} catch (HiveSQLException e) {
operationManager.closeOperation(opHandle);
@@ -436,7 +446,7 @@ public class HiveSessionImpl implements HiveSession {
OperationHandle opHandle = operation.getHandle();
try {
operation.run();
- opHandleSet.add(opHandle);
+ addOpHandle(opHandle);
return opHandle;
} catch (HiveSQLException e) {
operationManager.closeOperation(opHandle);
@@ -457,7 +467,7 @@ public class HiveSessionImpl implements HiveSession {
OperationHandle opHandle = operation.getHandle();
try {
operation.run();
- opHandleSet.add(opHandle);
+ addOpHandle(opHandle);
return opHandle;
} catch (HiveSQLException e) {
operationManager.closeOperation(opHandle);
@@ -479,7 +489,7 @@ public class HiveSessionImpl implements HiveSession {
OperationHandle opHandle = operation.getHandle();
try {
operation.run();
- opHandleSet.add(opHandle);
+ addOpHandle(opHandle);
return opHandle;
} catch (HiveSQLException e) {
operationManager.closeOperation(opHandle);
@@ -499,7 +509,7 @@ public class HiveSessionImpl implements HiveSession {
OperationHandle opHandle = operation.getHandle();
try {
operation.run();
- opHandleSet.add(opHandle);
+ addOpHandle(opHandle);
return opHandle;
} catch (HiveSQLException e) {
operationManager.closeOperation(opHandle);
@@ -524,7 +534,7 @@ public class HiveSessionImpl implements HiveSession {
OperationHandle opHandle = operation.getHandle();
try {
operation.run();
- opHandleSet.add(opHandle);
+ addOpHandle(opHandle);
return opHandle;
} catch (HiveSQLException e) {
operationManager.closeOperation(opHandle);
@@ -534,6 +544,12 @@ public class HiveSessionImpl implements HiveSession {
}
}
+ private void addOpHandle(OperationHandle opHandle) {
+ synchronized (opHandleSet) {
+ opHandleSet.add(opHandle);
+ }
+ }
+
@Override
public OperationHandle getFunctions(String catalogName, String schemaName, String functionName)
throws HiveSQLException {
@@ -545,7 +561,7 @@ public class HiveSessionImpl implements HiveSession {
OperationHandle opHandle = operation.getHandle();
try {
operation.run();
- opHandleSet.add(opHandle);
+ addOpHandle(opHandle);
return opHandle;
} catch (HiveSQLException e) {
operationManager.closeOperation(opHandle);
@@ -560,10 +576,14 @@ public class HiveSessionImpl implements HiveSession {
try {
acquire(true);
// Iterate through the opHandles and close their operations
- for (OperationHandle opHandle : opHandleSet) {
+ List<OperationHandle> ops = null;
+ synchronized (opHandleSet) {
+ ops = new ArrayList<>(opHandleSet);
+ opHandleSet.clear();
+ }
+ for (OperationHandle opHandle : ops) {
operationManager.closeOperation(opHandle);
}
- opHandleSet.clear();
// Cleanup session log directory.
cleanupSessionLogDir();
HiveHistory hiveHist = sessionState.getHiveHistory();
@@ -630,7 +650,10 @@ public class HiveSessionImpl implements HiveSession {
@Override
public void closeExpiredOperations() {
- OperationHandle[] handles = opHandleSet.toArray(new OperationHandle[opHandleSet.size()]);
+ OperationHandle[] handles;
+ synchronized (opHandleSet) {
+ handles = opHandleSet.toArray(new OperationHandle[opHandleSet.size()]);
+ }
if (handles.length > 0) {
List<Operation> operations = operationManager.removeExpiredOperations(handles);
if (!operations.isEmpty()) {
@@ -648,7 +671,9 @@ public class HiveSessionImpl implements HiveSession {
acquire(false);
try {
for (Operation operation : operations) {
- opHandleSet.remove(operation.getHandle());
+ synchronized (opHandleSet) {
+ opHandleSet.remove(operation.getHandle());
+ }
try {
operation.close();
} catch (Exception e) {
@@ -675,7 +700,9 @@ public class HiveSessionImpl implements HiveSession {
acquire(true);
try {
operationManager.closeOperation(opHandle);
- opHandleSet.remove(opHandle);
+ synchronized (opHandleSet) {
+ opHandleSet.remove(opHandle);
+ }
} finally {
release(true);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/be89eac6/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java
index cd3c3f9..bf808f1 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java
@@ -37,7 +37,8 @@ import org.apache.hive.service.cli.thrift.TProtocolVersion;
/**
*
* HiveSessionImplwithUGI.
- * HiveSession with connecting user's UGI and delegation token if required
+ * HiveSession with connecting user's UGI and delegation token if required.
+ * Note: this object may be shared between threads in HS2.
*/
public class HiveSessionImplwithUGI extends HiveSessionImpl {
public static final String HS2TOKEN = "HiveServer2ImpersonationToken";
http://git-wip-us.apache.org/repos/asf/hive/blob/be89eac6/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
----------------------------------------------------------------------
diff --git a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
index b4d517f..c73d152 100644
--- a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
+++ b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
@@ -26,9 +26,18 @@ import static org.junit.Assert.fail;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hive.service.server.HiveServer2;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -38,6 +47,7 @@ import org.junit.Test;
*
*/
public abstract class CLIServiceTest {
+ private static final Log LOG = LogFactory.getLog(CLIServiceTest.class);
protected static CLIServiceClient client;
@@ -206,7 +216,7 @@ public abstract class CLIServiceTest {
HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT, TimeUnit.MILLISECONDS);
queryString = "SELECT NON_EXISTING_COLUMN FROM " + tableName;
try {
- runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.ERROR, longPollingTimeout);
+ runAsyncAndWait(sessionHandle, queryString, confOverlay, OperationState.ERROR, longPollingTimeout);
}
catch (HiveSQLException e) {
// expected error
@@ -218,7 +228,7 @@ public abstract class CLIServiceTest {
* Also check that the sqlState and errorCode should be set
*/
queryString = "CREATE TABLE NON_EXISTING_TAB (ID STRING) location 'invalid://localhost:10000/a/b/c'";
- opStatus = runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.ERROR, longPollingTimeout);
+ opStatus = runAsyncAndWait(sessionHandle, queryString, confOverlay, OperationState.ERROR, longPollingTimeout);
// sqlState, errorCode should be set
assertEquals(opStatus.getOperationException().getSQLState(), "08S01");
assertEquals(opStatus.getOperationException().getErrorCode(), 1);
@@ -226,21 +236,21 @@ public abstract class CLIServiceTest {
* Execute an async query with default config
*/
queryString = "SELECT ID+1 FROM " + tableName;
- runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.FINISHED, longPollingTimeout);
+ runAsyncAndWait(sessionHandle, queryString, confOverlay, OperationState.FINISHED, longPollingTimeout);
/**
* Execute an async query with long polling timeout set to 0
*/
longPollingTimeout = 0;
queryString = "SELECT ID+1 FROM " + tableName;
- runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.FINISHED, longPollingTimeout);
+ runAsyncAndWait(sessionHandle, queryString, confOverlay, OperationState.FINISHED, longPollingTimeout);
/**
* Execute an async query with long polling timeout set to 500 millis
*/
longPollingTimeout = 500;
queryString = "SELECT ID+1 FROM " + tableName;
- runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.FINISHED, longPollingTimeout);
+ runAsyncAndWait(sessionHandle, queryString, confOverlay, OperationState.FINISHED, longPollingTimeout);
/**
* Cancellation test
@@ -259,6 +269,92 @@ public abstract class CLIServiceTest {
client.closeSession(sessionHandle);
}
+
+ private void syncThreadStart(final CountDownLatch cdlIn, final CountDownLatch cdlOut) {
+ cdlIn.countDown();
+ try {
+ cdlOut.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testExecuteStatementParallel() throws Exception {
+ Map<String, String> confOverlay = new HashMap<String, String>();
+ String tableName = "TEST_EXEC_PARALLEL";
+ String columnDefinitions = "(ID STRING)";
+
+ // Open a session and set up the test data
+ SessionHandle sessionHandle = setupTestData(tableName, columnDefinitions, confOverlay);
+ assertNotNull(sessionHandle);
+
+ long longPollingTimeout = HiveConf.getTimeVar(new HiveConf(),
+ HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT, TimeUnit.MILLISECONDS);
+ confOverlay.put(
+ HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT.varname, longPollingTimeout + "ms");
+
+ int THREAD_COUNT = 10, QUERY_COUNT = 10;
+ // TODO: refactor this into an utility, LLAP tests use this pattern a lot
+ ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
+ CountDownLatch cdlIn = new CountDownLatch(THREAD_COUNT), cdlOut = new CountDownLatch(1);
+ @SuppressWarnings("unchecked")
+ Callable<Void>[] cs = (Callable<Void>[])new Callable[3];
+ // Create callables with different queries.
+ String query = "SELECT ID + %1$d FROM " + tableName;
+ cs[0] = createQueryCallable(
+ query, confOverlay, longPollingTimeout, QUERY_COUNT, cdlIn, cdlOut);
+ query = "SELECT t1.ID, SUM(t2.ID) + %1$d FROM " + tableName + " t1 CROSS JOIN "
+ + tableName + " t2 GROUP BY t1.ID HAVING t1.ID > 1";
+ cs[1] = createQueryCallable(
+ query, confOverlay, longPollingTimeout, QUERY_COUNT, cdlIn, cdlOut);
+ query = "SELECT b.a FROM (SELECT (t1.ID + %1$d) as a , t2.* FROM " + tableName
+ + " t1 INNER JOIN " + tableName + " t2 ON t1.ID = t2.ID WHERE t2.ID > 2) b";
+ cs[2] = createQueryCallable(
+ query, confOverlay, longPollingTimeout, QUERY_COUNT, cdlIn, cdlOut);
+
+ @SuppressWarnings("unchecked")
+ FutureTask<Void>[] tasks = (FutureTask<Void>[])new FutureTask[THREAD_COUNT];
+ for (int i = 0; i < THREAD_COUNT; ++i) {
+ tasks[i] = new FutureTask<Void>(cs[i % cs.length]);
+ executor.execute(tasks[i]);
+ }
+ try {
+ cdlIn.await(); // Wait for all threads to be ready.
+ cdlOut.countDown(); // Release them at the same time.
+ for (int i = 0; i < THREAD_COUNT; ++i) {
+ tasks[i].get();
+ }
+ } catch (Throwable t) {
+ throw new RuntimeException(t);
+ }
+
+ // Cleanup
+ client.executeStatement(sessionHandle, "DROP TABLE " + tableName, confOverlay);
+ client.closeSession(sessionHandle);
+ }
+
+ private Callable<Void> createQueryCallable(final String queryStringFormat,
+ final Map<String, String> confOverlay, final long longPollingTimeout,
+ final int queryCount, final CountDownLatch cdlIn, final CountDownLatch cdlOut) {
+ return new Callable<Void>() {
+ public Void call() throws Exception {
+ syncThreadStart(cdlIn, cdlOut);
+ SessionHandle sessionHandle = openSession(confOverlay);
+ OperationHandle[] hs = new OperationHandle[queryCount];
+ for (int i = 0; i < hs.length; ++i) {
+ String queryString = String.format(queryStringFormat, i);
+ LOG.info("Submitting " + i);
+ hs[i] = client.executeStatementAsync(sessionHandle, queryString, confOverlay);
+ }
+ for (int i = hs.length - 1; i >= 0; --i) {
+ waitForAsyncQuery(hs[i], OperationState.FINISHED, longPollingTimeout);
+ }
+ return null;
+ }
+ };
+ }
+
/**
* Sets up a test specific table with the given column definitions and config
* @param tableName
@@ -268,13 +364,27 @@ public abstract class CLIServiceTest {
*/
private SessionHandle setupTestData(String tableName, String columnDefinitions,
Map<String, String> confOverlay) throws Exception {
+ SessionHandle sessionHandle = openSession(confOverlay);
+ createTestTable(tableName, columnDefinitions, confOverlay, sessionHandle);
+ return sessionHandle;
+ }
+
+ private SessionHandle openSession(Map<String, String> confOverlay)
+ throws HiveSQLException {
SessionHandle sessionHandle = client.openSession("tom", "password", confOverlay);
assertNotNull(sessionHandle);
+ SessionState.get().setIsHiveServerQuery(true); // Pretend we are in HS2.
String queryString = "SET " + HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname
+ " = false";
client.executeStatement(sessionHandle, queryString, confOverlay);
+ return sessionHandle;
+ }
+ private void createTestTable(String tableName, String columnDefinitions,
+ Map<String, String> confOverlay, SessionHandle sessionHandle)
+ throws HiveSQLException {
+ String queryString;
// Drop the table if it exists
queryString = "DROP TABLE IF EXISTS " + tableName;
client.executeStatement(sessionHandle, queryString, confOverlay);
@@ -282,22 +392,27 @@ public abstract class CLIServiceTest {
// Create a test table
queryString = "CREATE TABLE " + tableName + columnDefinitions;
client.executeStatement(sessionHandle, queryString, confOverlay);
-
- return sessionHandle;
}
- private OperationStatus runQueryAsync(SessionHandle sessionHandle, String queryString,
+ private OperationStatus runAsyncAndWait(SessionHandle sessionHandle, String queryString,
Map<String, String> confOverlay, OperationState expectedState,
long longPollingTimeout) throws HiveSQLException {
// Timeout for the iteration in case of asynchronous execute
+ confOverlay.put(
+ HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT.varname, longPollingTimeout + "ms");
+ OperationHandle h = client.executeStatementAsync(sessionHandle, queryString, confOverlay);
+ return waitForAsyncQuery(h, expectedState, longPollingTimeout);
+ }
+
+
+ private OperationStatus waitForAsyncQuery(OperationHandle opHandle,
+ OperationState expectedState, long longPollingTimeout) throws HiveSQLException {
long testIterationTimeout = System.currentTimeMillis() + 100000;
long longPollingStart;
long longPollingEnd;
long longPollingTimeDelta;
OperationStatus opStatus = null;
OperationState state = null;
- confOverlay.put(HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT.varname, longPollingTimeout + "ms");
- OperationHandle opHandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay);
int count = 0;
while (true) {
// Break if iteration times out