You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jx...@apache.org on 2015/10/23 18:20:48 UTC

hive git commit: HIVE-12187: Release plan once a query is executed

Repository: hive
Updated Branches:
  refs/heads/master 8e62edac3 -> 27ee7b559


HIVE-12187: Release plan once a query is executed


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/27ee7b55
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/27ee7b55
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/27ee7b55

Branch: refs/heads/master
Commit: 27ee7b5599ce88ccf334a6abc56fde967dca8dff
Parents: 8e62eda
Author: Jimmy Xiang <jx...@cloudera.com>
Authored: Thu Oct 8 11:31:59 2015 -0700
Committer: Jimmy Xiang <jx...@cloudera.com>
Committed: Fri Oct 23 09:19:54 2015 -0700

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hive/ql/Driver.java  | 101 ++++++++++++-------
 .../org/apache/hadoop/hive/ql/exec/Task.java    |   8 ++
 2 files changed, 73 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/27ee7b55/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 218b9c8..3a3fcf1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -141,6 +141,9 @@ public class Driver implements CommandProcessor {
   private String SQLState;
   private Throwable downstreamError;
 
+  private FetchTask fetchTask;
+  List<HiveLock> hiveLocks = new ArrayList<HiveLock>();
+
   // A list of FileSinkOperators writing in an ACID compliant manner
   private Set<FileSinkDesc> acidSinks;
 
@@ -371,9 +374,8 @@ public class Driver implements CommandProcessor {
     //holder for parent command type/string when executing reentrant queries
     QueryState queryState = new QueryState();
 
-    if (plan != null) {
+    if (ctx != null) {
       close();
-      plan = null;
     }
 
     if (resetTaskIds) {
@@ -1042,14 +1044,11 @@ public class Driver implements CommandProcessor {
     return acidSinks != null && !acidSinks.isEmpty();
   }
   /**
-   * @param hiveLocks
-   *          list of hive locks to be released Release all the locks specified. If some of the
-   *          locks have already been released, ignore them
    * @param commit if there is an open transaction and if true, commit,
    *               if false rollback.  If there is no open transaction this parameter is ignored.
    *
    **/
-  private void releaseLocksAndCommitOrRollback(List<HiveLock> hiveLocks, boolean commit)
+  private void releaseLocksAndCommitOrRollback(boolean commit)
       throws LockException {
     PerfLogger perfLogger = SessionState.getPerfLogger();
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RELEASE_LOCKS);
@@ -1066,15 +1065,41 @@ public class Driver implements CommandProcessor {
       }
     } else {
       //since there is no tx, we only have locks for current query (if any)
-      if (hiveLocks != null) {
+      if (ctx != null && ctx.getHiveLocks() != null) {
+        hiveLocks.addAll(ctx.getHiveLocks());
+      }
+      if (!hiveLocks.isEmpty()) {
         txnMgr.getLockManager().releaseLocks(hiveLocks);
       }
     }
-    ctx.setHiveLocks(null);
+    hiveLocks.clear();
+    if (ctx != null) {
+      ctx.setHiveLocks(null);
+    }
 
     perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.RELEASE_LOCKS);
   }
 
+  /**
+   * Release some resources after a query is executed
+   * while keeping the result around.
+   */
+  private void releaseResources() {
+    if (plan != null) {
+      fetchTask = plan.getFetchTask();
+      if (fetchTask != null) {
+        fetchTask.setDriverContext(null);
+        fetchTask.setQueryPlan(null);
+      }
+    }
+
+    if (driverCxt != null) {
+      driverCxt.shutdown();
+      driverCxt = null;
+    }
+    plan = null;
+  }
+
   @Override
   public CommandProcessorResponse run(String command)
       throws CommandNeedRetryException {
@@ -1088,7 +1113,13 @@ public class Driver implements CommandProcessor {
 
   public CommandProcessorResponse run(String command, boolean alreadyCompiled)
         throws CommandNeedRetryException {
-    CommandProcessorResponse cpr = runInternal(command, alreadyCompiled);
+    CommandProcessorResponse cpr;
+    try {
+      cpr = runInternal(command, alreadyCompiled);
+    } finally {
+      releaseResources();
+    }
+
     if(cpr.getResponseCode() == 0) {
       return cpr;
     }
@@ -1168,7 +1199,7 @@ public class Driver implements CommandProcessor {
     }
     if (ret != 0) {
       try {
-        releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), false);
+        releaseLocksAndCommitOrRollback(false);
       } catch (LockException e) {
         LOG.warn("Exception in releasing locks. "
             + org.apache.hadoop.util.StringUtils.stringifyException(e));
@@ -1253,7 +1284,7 @@ public class Driver implements CommandProcessor {
         if(plan.getAutoCommitValue() && !txnManager.getAutoCommit()) {
           /*here, if there is an open txn, we want to commit it; this behavior matches
           * https://docs.oracle.com/javase/6/docs/api/java/sql/Connection.html#setAutoCommit(boolean)*/
-          releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), true);
+          releaseLocksAndCommitOrRollback(true);
           txnManager.setAutoCommit(true);
         }
         else if(!plan.getAutoCommitValue() && txnManager.getAutoCommit()) {
@@ -1281,10 +1312,10 @@ public class Driver implements CommandProcessor {
     //if needRequireLock is false, the release here will do nothing because there is no lock
     try {
       if(txnManager.getAutoCommit() || plan.getOperation() == HiveOperation.COMMIT) {
-        releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), true);
+        releaseLocksAndCommitOrRollback(true);
       }
       else if(plan.getOperation() == HiveOperation.ROLLBACK) {
-        releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), false);
+        releaseLocksAndCommitOrRollback(false);
       }
       else {
         //txn (if there is one started) is not finished
@@ -1315,7 +1346,7 @@ public class Driver implements CommandProcessor {
   private CommandProcessorResponse rollback(CommandProcessorResponse cpr) {
     //console.printError(cpr.toString());
     try {
-      releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), false);
+      releaseLocksAndCommitOrRollback(false);
     }
     catch (LockException e) {
       LOG.error("rollback() FAILED: " + cpr);//make sure not to loose 
@@ -1761,7 +1792,7 @@ public class Driver implements CommandProcessor {
   }
 
   public boolean isFetchingTable() {
-    return plan != null && plan.getFetchTask() != null;
+    return fetchTask != null;
   }
 
   @SuppressWarnings("unchecked")
@@ -1770,9 +1801,8 @@ public class Driver implements CommandProcessor {
       throw new IOException("FAILED: Operation cancelled");
     }
     if (isFetchingTable()) {
-      FetchTask ft = plan.getFetchTask();
-      ft.setMaxRows(maxRows);
-      return ft.fetch(res);
+      fetchTask.setMaxRows(maxRows);
+      return fetchTask.fetch(res);
     }
 
     if (resStream == null) {
@@ -1822,13 +1852,14 @@ public class Driver implements CommandProcessor {
   }
 
   public void resetFetch() throws IOException {
-    if (plan != null && plan.getFetchTask() != null) {
+    if (isFetchingTable()) {
       try {
-        plan.getFetchTask().clearFetch();
+        fetchTask.clearFetch();
       } catch (Exception e) {
         throw new IOException("Error closing the current fetch task", e);
       }
-      plan.getFetchTask().initialize(conf, plan, null);
+      // FetchTask should not depend on the plan.
+      fetchTask.initialize(conf, null, null);
     } else {
       ctx.resetStream();
       resStream = null;
@@ -1843,25 +1874,23 @@ public class Driver implements CommandProcessor {
     this.tryCount = tryCount;
   }
 
-
   public int close() {
     try {
-      if (plan != null) {
-        FetchTask fetchTask = plan.getFetchTask();
-        if (null != fetchTask) {
-          try {
-            fetchTask.clearFetch();
-          } catch (Exception e) {
-            LOG.debug(" Exception while clearing the Fetch task ", e);
-          }
+      if (fetchTask != null) {
+        try {
+          fetchTask.clearFetch();
+        } catch (Exception e) {
+          LOG.debug(" Exception while clearing the Fetch task ", e);
         }
-      }
-      if (driverCxt != null) {
-        driverCxt.shutdown();
-        driverCxt = null;
+        fetchTask = null;
       }
       if (ctx != null) {
         ctx.clear();
+        if (ctx.getHiveLocks() != null) {
+          hiveLocks.addAll(ctx.getHiveLocks());
+          ctx.setHiveLocks(null);
+        }
+        ctx = null;
       }
       if (null != resStream) {
         try {
@@ -1884,9 +1913,9 @@ public class Driver implements CommandProcessor {
       return;
     }
     destroyed = true;
-    if (ctx != null) {
+    if (!hiveLocks.isEmpty()) {
       try {
-        releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), false);
+        releaseLocksAndCommitOrRollback(false);
       } catch (LockException e) {
         LOG.warn("Exception when releasing locking in destroy: " +
             e.getMessage());

http://git-wip-us.apache.org/repos/asf/hive/blob/27ee7b55/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
index e584e6e..4e66f38 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
@@ -507,6 +507,14 @@ public abstract class Task<T extends Serializable> implements Serializable, Node
     return queryPlan;
   }
 
+  public DriverContext getDriverContext() {
+    return driverContext;
+  }
+
+  public void setDriverContext(DriverContext driverContext) {
+    this.driverContext = driverContext;
+  }
+
   public void setQueryPlan(QueryPlan queryPlan) {
     this.queryPlan = queryPlan;
   }