You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by jn...@apache.org on 2017/03/02 20:59:54 UTC

[27/27] drill git commit: DRILL-5287: Provide option to skip updates of ephemeral state changes in Zookeeper

DRILL-5287: Provide option to skip updates of ephemeral state changes in Zookeeper

close #758


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/7ebb985e
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/7ebb985e
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/7ebb985e

Branch: refs/heads/master
Commit: 7ebb985edc823692673a42276b4e2a80fd1f256c
Parents: 2b5a6f0
Author: Padma Penumarthy <pp...@yahoo.com>
Authored: Tue Feb 21 13:20:57 2017 -0800
Committer: Jinfeng Ni <jn...@apache.org>
Committed: Thu Mar 2 10:50:24 2017 -0800

----------------------------------------------------------------------
 .../org/apache/drill/exec/ExecConstants.java    |  4 ++++
 .../server/options/SystemOptionManager.java     |  3 ++-
 .../drill/exec/work/foreman/QueryManager.java   | 20 +++++++++++++++-----
 3 files changed, 21 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/7ebb985e/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 4f0f4d9..da3a312 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -441,4 +441,8 @@ public interface ExecConstants {
   String USE_DYNAMIC_UDFS_KEY = "exec.udf.use_dynamic";
   BooleanValidator USE_DYNAMIC_UDFS = new BooleanValidator(USE_DYNAMIC_UDFS_KEY, true);
 
+
+  String QUERY_TRANSIENT_STATE_UPDATE_KEY = "exec.query.progress.update";
+  BooleanValidator QUERY_TRANSIENT_STATE_UPDATE = new BooleanValidator(QUERY_TRANSIENT_STATE_UPDATE_KEY, true);
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/7ebb985e/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 4a846c0..fa73e06 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -166,7 +166,8 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
       ExecConstants.EXTERNAL_SORT_DISABLE_MANAGED_OPTION,
       ExecConstants.ENABLE_QUERY_PROFILE_VALIDATOR,
       ExecConstants.QUERY_PROFILE_DEBUG_VALIDATOR,
-      ExecConstants.USE_DYNAMIC_UDFS
+      ExecConstants.USE_DYNAMIC_UDFS,
+      ExecConstants.QUERY_TRANSIENT_STATE_UPDATE
     };
     final Map<String, OptionValidator> tmp = new HashMap<>();
     for (final OptionValidator validator : validators) {

http://git-wip-us.apache.org/repos/asf/drill/blob/7ebb985e/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index c3bde6e..7305025 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.coord.store.TransientStore;
 import org.apache.drill.exec.coord.store.TransientStoreConfig;
@@ -109,6 +110,9 @@ public class QueryManager implements AutoCloseable {
   // How many fragments have finished their execution.
   private final AtomicInteger finishedFragments = new AtomicInteger(0);
 
+  // Is the query saved in transient store
+  private boolean inTransientStore;
+
   public QueryManager(final QueryId queryId, final RunQuery runQuery, final PersistentStoreProvider storeProvider,
       final ClusterCoordinator coordinator, final Foreman foreman) {
     this.queryId =  queryId;
@@ -282,13 +286,21 @@ public class QueryManager implements AutoCloseable {
     }
   }
 
-  QueryState updateEphemeralState(final QueryState queryState) {
-    switch (queryState) {
+  void updateEphemeralState(final QueryState queryState) {
+      // If query is already in zk transient store, ignore the transient state update option.
+      // Else, they will not be removed from transient store upon completion.
+      if (!inTransientStore &&
+          !foreman.getQueryContext().getOptions().getOption(ExecConstants.QUERY_TRANSIENT_STATE_UPDATE)) {
+        return;
+      }
+
+      switch (queryState) {
       case ENQUEUED:
       case STARTING:
       case RUNNING:
       case CANCELLATION_REQUESTED:
         transientProfiles.put(stringQueryId, getQueryInfo());  // store as ephemeral query profile.
+        inTransientStore = true;
         break;
 
       case COMPLETED:
@@ -296,17 +308,15 @@ public class QueryManager implements AutoCloseable {
       case FAILED:
         try {
           transientProfiles.remove(stringQueryId);
+          inTransientStore = false;
         } catch(final Exception e) {
           logger.warn("Failure while trying to delete the estore profile for this query.", e);
         }
-
         break;
 
       default:
         throw new IllegalStateException("unrecognized queryState " + queryState);
     }
-
-    return queryState;
   }
 
   void writeFinalProfile(UserException ex) {