You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jx...@apache.org on 2015/10/23 18:20:48 UTC
hive git commit: HIVE-12187: Release plan once a query is executed
Repository: hive
Updated Branches:
refs/heads/master 8e62edac3 -> 27ee7b559
HIVE-12187: Release plan once a query is executed
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/27ee7b55
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/27ee7b55
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/27ee7b55
Branch: refs/heads/master
Commit: 27ee7b5599ce88ccf334a6abc56fde967dca8dff
Parents: 8e62eda
Author: Jimmy Xiang <jx...@cloudera.com>
Authored: Thu Oct 8 11:31:59 2015 -0700
Committer: Jimmy Xiang <jx...@cloudera.com>
Committed: Fri Oct 23 09:19:54 2015 -0700
----------------------------------------------------------------------
.../java/org/apache/hadoop/hive/ql/Driver.java | 101 ++++++++++++-------
.../org/apache/hadoop/hive/ql/exec/Task.java | 8 ++
2 files changed, 73 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/27ee7b55/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 218b9c8..3a3fcf1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -141,6 +141,9 @@ public class Driver implements CommandProcessor {
private String SQLState;
private Throwable downstreamError;
+ private FetchTask fetchTask;
+ List<HiveLock> hiveLocks = new ArrayList<HiveLock>();
+
// A list of FileSinkOperators writing in an ACID compliant manner
private Set<FileSinkDesc> acidSinks;
@@ -371,9 +374,8 @@ public class Driver implements CommandProcessor {
//holder for parent command type/string when executing reentrant queries
QueryState queryState = new QueryState();
- if (plan != null) {
+ if (ctx != null) {
close();
- plan = null;
}
if (resetTaskIds) {
@@ -1042,14 +1044,11 @@ public class Driver implements CommandProcessor {
return acidSinks != null && !acidSinks.isEmpty();
}
/**
- * @param hiveLocks
- * list of hive locks to be released Release all the locks specified. If some of the
- * locks have already been released, ignore them
* @param commit if there is an open transaction and if true, commit,
* if false rollback. If there is no open transaction this parameter is ignored.
*
**/
- private void releaseLocksAndCommitOrRollback(List<HiveLock> hiveLocks, boolean commit)
+ private void releaseLocksAndCommitOrRollback(boolean commit)
throws LockException {
PerfLogger perfLogger = SessionState.getPerfLogger();
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RELEASE_LOCKS);
@@ -1066,15 +1065,41 @@ public class Driver implements CommandProcessor {
}
} else {
//since there is no tx, we only have locks for current query (if any)
- if (hiveLocks != null) {
+ if (ctx != null && ctx.getHiveLocks() != null) {
+ hiveLocks.addAll(ctx.getHiveLocks());
+ }
+ if (!hiveLocks.isEmpty()) {
txnMgr.getLockManager().releaseLocks(hiveLocks);
}
}
- ctx.setHiveLocks(null);
+ hiveLocks.clear();
+ if (ctx != null) {
+ ctx.setHiveLocks(null);
+ }
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.RELEASE_LOCKS);
}
+ /**
+ * Release some resources after a query is executed
+ * while keeping the result around.
+ */
+ private void releaseResources() {
+ if (plan != null) {
+ fetchTask = plan.getFetchTask();
+ if (fetchTask != null) {
+ fetchTask.setDriverContext(null);
+ fetchTask.setQueryPlan(null);
+ }
+ }
+
+ if (driverCxt != null) {
+ driverCxt.shutdown();
+ driverCxt = null;
+ }
+ plan = null;
+ }
+
@Override
public CommandProcessorResponse run(String command)
throws CommandNeedRetryException {
@@ -1088,7 +1113,13 @@ public class Driver implements CommandProcessor {
public CommandProcessorResponse run(String command, boolean alreadyCompiled)
throws CommandNeedRetryException {
- CommandProcessorResponse cpr = runInternal(command, alreadyCompiled);
+ CommandProcessorResponse cpr;
+ try {
+ cpr = runInternal(command, alreadyCompiled);
+ } finally {
+ releaseResources();
+ }
+
if(cpr.getResponseCode() == 0) {
return cpr;
}
@@ -1168,7 +1199,7 @@ public class Driver implements CommandProcessor {
}
if (ret != 0) {
try {
- releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), false);
+ releaseLocksAndCommitOrRollback(false);
} catch (LockException e) {
LOG.warn("Exception in releasing locks. "
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
@@ -1253,7 +1284,7 @@ public class Driver implements CommandProcessor {
if(plan.getAutoCommitValue() && !txnManager.getAutoCommit()) {
/*here, if there is an open txn, we want to commit it; this behavior matches
* https://docs.oracle.com/javase/6/docs/api/java/sql/Connection.html#setAutoCommit(boolean)*/
- releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), true);
+ releaseLocksAndCommitOrRollback(true);
txnManager.setAutoCommit(true);
}
else if(!plan.getAutoCommitValue() && txnManager.getAutoCommit()) {
@@ -1281,10 +1312,10 @@ public class Driver implements CommandProcessor {
//if needRequireLock is false, the release here will do nothing because there is no lock
try {
if(txnManager.getAutoCommit() || plan.getOperation() == HiveOperation.COMMIT) {
- releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), true);
+ releaseLocksAndCommitOrRollback(true);
}
else if(plan.getOperation() == HiveOperation.ROLLBACK) {
- releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), false);
+ releaseLocksAndCommitOrRollback(false);
}
else {
//txn (if there is one started) is not finished
@@ -1315,7 +1346,7 @@ public class Driver implements CommandProcessor {
private CommandProcessorResponse rollback(CommandProcessorResponse cpr) {
//console.printError(cpr.toString());
try {
- releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), false);
+ releaseLocksAndCommitOrRollback(false);
}
catch (LockException e) {
LOG.error("rollback() FAILED: " + cpr);//make sure not to loose
@@ -1761,7 +1792,7 @@ public class Driver implements CommandProcessor {
}
public boolean isFetchingTable() {
- return plan != null && plan.getFetchTask() != null;
+ return fetchTask != null;
}
@SuppressWarnings("unchecked")
@@ -1770,9 +1801,8 @@ public class Driver implements CommandProcessor {
throw new IOException("FAILED: Operation cancelled");
}
if (isFetchingTable()) {
- FetchTask ft = plan.getFetchTask();
- ft.setMaxRows(maxRows);
- return ft.fetch(res);
+ fetchTask.setMaxRows(maxRows);
+ return fetchTask.fetch(res);
}
if (resStream == null) {
@@ -1822,13 +1852,14 @@ public class Driver implements CommandProcessor {
}
public void resetFetch() throws IOException {
- if (plan != null && plan.getFetchTask() != null) {
+ if (isFetchingTable()) {
try {
- plan.getFetchTask().clearFetch();
+ fetchTask.clearFetch();
} catch (Exception e) {
throw new IOException("Error closing the current fetch task", e);
}
- plan.getFetchTask().initialize(conf, plan, null);
+ // FetchTask should not depend on the plan.
+ fetchTask.initialize(conf, null, null);
} else {
ctx.resetStream();
resStream = null;
@@ -1843,25 +1874,23 @@ public class Driver implements CommandProcessor {
this.tryCount = tryCount;
}
-
public int close() {
try {
- if (plan != null) {
- FetchTask fetchTask = plan.getFetchTask();
- if (null != fetchTask) {
- try {
- fetchTask.clearFetch();
- } catch (Exception e) {
- LOG.debug(" Exception while clearing the Fetch task ", e);
- }
+ if (fetchTask != null) {
+ try {
+ fetchTask.clearFetch();
+ } catch (Exception e) {
+ LOG.debug(" Exception while clearing the Fetch task ", e);
}
- }
- if (driverCxt != null) {
- driverCxt.shutdown();
- driverCxt = null;
+ fetchTask = null;
}
if (ctx != null) {
ctx.clear();
+ if (ctx.getHiveLocks() != null) {
+ hiveLocks.addAll(ctx.getHiveLocks());
+ ctx.setHiveLocks(null);
+ }
+ ctx = null;
}
if (null != resStream) {
try {
@@ -1884,9 +1913,9 @@ public class Driver implements CommandProcessor {
return;
}
destroyed = true;
- if (ctx != null) {
+ if (!hiveLocks.isEmpty()) {
try {
- releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), false);
+ releaseLocksAndCommitOrRollback(false);
} catch (LockException e) {
LOG.warn("Exception when releasing locking in destroy: " +
e.getMessage());
http://git-wip-us.apache.org/repos/asf/hive/blob/27ee7b55/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
index e584e6e..4e66f38 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
@@ -507,6 +507,14 @@ public abstract class Task<T extends Serializable> implements Serializable, Node
return queryPlan;
}
+ public DriverContext getDriverContext() {
+ return driverContext;
+ }
+
+ public void setDriverContext(DriverContext driverContext) {
+ this.driverContext = driverContext;
+ }
+
public void setQueryPlan(QueryPlan queryPlan) {
this.queryPlan = queryPlan;
}