You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zh...@apache.org on 2017/04/25 00:00:51 UTC

tez git commit: TEZ-2049. Remove YARN references from Tez AsyncDispatcher (zhiyuany)

Repository: tez
Updated Branches:
  refs/heads/master bb4fb6471 -> ad8a80d2b


TEZ-2049. Remove YARN references from Tez AsyncDispatcher (zhiyuany)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ad8a80d2
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ad8a80d2
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ad8a80d2

Branch: refs/heads/master
Commit: ad8a80d2b4acbb481949c4899b406cc1cc9ddec4
Parents: bb4fb64
Author: Zhiyuan Yang <zh...@apache.org>
Authored: Mon Apr 24 16:58:42 2017 -0700
Committer: Zhiyuan Yang <zh...@apache.org>
Committed: Mon Apr 24 16:58:42 2017 -0700

----------------------------------------------------------------------
 .../org/apache/tez/common/AsyncDispatcher.java     | 17 ++++++++++++-----
 .../tez/common/AsyncDispatcherConcurrent.java      | 12 +++++++-----
 .../java/org/apache/tez/dag/app/DAGAppMaster.java  |  6 ++++--
 3 files changed, 23 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/ad8a80d2/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
index ec5f6c7..3a59ff6 100644
--- a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
+++ b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
@@ -74,7 +74,7 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
   protected final Map<Class<? extends Enum>, AsyncDispatcherConcurrent> concurrentEventDispatchers = 
       Maps.newHashMap();
   
-  private boolean exitOnDispatchException;
+  private boolean exitOnDispatchException = false;
 
   public AsyncDispatcher(String name) {
     this(name, new LinkedBlockingQueue<Event>());
@@ -121,10 +121,6 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
 
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
-    // TODO TEZ-2049 remove YARN reference
-    this.exitOnDispatchException =
-        conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
-          Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
     super.serviceInit(conf);
   }
 
@@ -225,6 +221,11 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
     checkForExistingConcurrentDispatcher(eventType);
   }
 
+  @VisibleForTesting
+  public void enableExitOnDispatchException() {
+    exitOnDispatchException = true;
+  }
+
   /**
    * Add an EventHandler for events handled inline on this dispatcher
    */
@@ -278,6 +279,9 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
     LOG.info(
           "Registering " + eventType + " for concurrent dispatch using: " + handler.getClass());
     AsyncDispatcherConcurrent dispatcher = new AsyncDispatcherConcurrent(dispatcherName, numThreads);
+    if (exitOnDispatchException) {
+      dispatcher.enableExitOnDispatchException();
+    }
     dispatcher.register(eventType, handler);
     concurrentEventDispatchers.put(eventType, dispatcher);
     addIfService(dispatcher);
@@ -292,6 +296,9 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
     checkForExistingDispatchers(true, eventType);
     LOG.info("Registering " + eventType + " with existing concurrent dispatch using: "
           + handler.getClass());
+    if (exitOnDispatchException) {
+      dispatcher.enableExitOnDispatchException();
+    }
     dispatcher.register(eventType, handler);
     concurrentEventDispatchers.put(eventType, dispatcher);
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/ad8a80d2/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java
index 321ea8b..4a632f5 100644
--- a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java
+++ b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.CompositeService;
@@ -77,7 +78,7 @@ public class AsyncDispatcherConcurrent extends CompositeService implements Dispa
   protected final Map<Class<? extends Enum>, EventHandler> eventHandlers = Maps.newHashMap();
   protected final Map<Class<? extends Enum>, AsyncDispatcherConcurrent> eventDispatchers = 
       Maps.newHashMap();
-  private boolean exitOnDispatchException;
+  private boolean exitOnDispatchException = false;
 
   AsyncDispatcherConcurrent(String name, int numThreads) {
     super(name);
@@ -126,10 +127,6 @@ public class AsyncDispatcherConcurrent extends CompositeService implements Dispa
   
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
-    // TODO TEZ-2049 remove YARN reference
-    this.exitOnDispatchException =
-        conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
-          Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
     super.serviceInit(conf);
   }
 
@@ -284,6 +281,11 @@ public class AsyncDispatcherConcurrent extends CompositeService implements Dispa
     eventDispatchers.put(eventType, dispatcher);
   }
 
+  @VisibleForTesting
+  public void enableExitOnDispatchException() {
+    this.exitOnDispatchException = true;
+  }
+
   @Override
   public EventHandler getEventHandler() {
     return handlerInstance;

http://git-wip-us.apache.org/repos/asf/tez/blob/ad8a80d2/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index fc24f04..76d9fdb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -478,16 +478,18 @@ public class DAGAppMaster extends AbstractService {
       }
     }
 
+    dispatcher = createDispatcher();
+
     if (isLocal) {
        conf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
        conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
            TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS_DEFAULT);
+    } else {
+      dispatcher.enableExitOnDispatchException();
     }
-    conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, !isLocal);
     String strAppId = this.appAttemptID.getApplicationId().toString();
     this.tezSystemStagingDir = TezCommonUtils.getTezSystemStagingPath(conf, strAppId);
 
-    dispatcher = createDispatcher();
     context = new RunningAppContext(conf);
     this.aclManager = new ACLManager(appMasterUgi.getShortUserName(), this.amConf);