You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zh...@apache.org on 2017/04/25 00:00:51 UTC
tez git commit: TEZ-2049. Remove YARN references from Tez
AsyncDispatcher (zhiyuany)
Repository: tez
Updated Branches:
refs/heads/master bb4fb6471 -> ad8a80d2b
TEZ-2049. Remove YARN references from Tez AsyncDispatcher (zhiyuany)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ad8a80d2
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ad8a80d2
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ad8a80d2
Branch: refs/heads/master
Commit: ad8a80d2b4acbb481949c4899b406cc1cc9ddec4
Parents: bb4fb64
Author: Zhiyuan Yang <zh...@apache.org>
Authored: Mon Apr 24 16:58:42 2017 -0700
Committer: Zhiyuan Yang <zh...@apache.org>
Committed: Mon Apr 24 16:58:42 2017 -0700
----------------------------------------------------------------------
.../org/apache/tez/common/AsyncDispatcher.java | 17 ++++++++++++-----
.../tez/common/AsyncDispatcherConcurrent.java | 12 +++++++-----
.../java/org/apache/tez/dag/app/DAGAppMaster.java | 6 ++++--
3 files changed, 23 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/ad8a80d2/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
index ec5f6c7..3a59ff6 100644
--- a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
+++ b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
@@ -74,7 +74,7 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
protected final Map<Class<? extends Enum>, AsyncDispatcherConcurrent> concurrentEventDispatchers =
Maps.newHashMap();
- private boolean exitOnDispatchException;
+ private boolean exitOnDispatchException = false;
public AsyncDispatcher(String name) {
this(name, new LinkedBlockingQueue<Event>());
@@ -121,10 +121,6 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
@Override
protected void serviceInit(Configuration conf) throws Exception {
- // TODO TEZ-2049 remove YARN reference
- this.exitOnDispatchException =
- conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
- Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
super.serviceInit(conf);
}
@@ -225,6 +221,11 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
checkForExistingConcurrentDispatcher(eventType);
}
+ @VisibleForTesting
+ public void enableExitOnDispatchException() {
+ exitOnDispatchException = true;
+ }
+
/**
* Add an EventHandler for events handled inline on this dispatcher
*/
@@ -278,6 +279,9 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
LOG.info(
"Registering " + eventType + " for concurrent dispatch using: " + handler.getClass());
AsyncDispatcherConcurrent dispatcher = new AsyncDispatcherConcurrent(dispatcherName, numThreads);
+ if (exitOnDispatchException) {
+ dispatcher.enableExitOnDispatchException();
+ }
dispatcher.register(eventType, handler);
concurrentEventDispatchers.put(eventType, dispatcher);
addIfService(dispatcher);
@@ -292,6 +296,9 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
checkForExistingDispatchers(true, eventType);
LOG.info("Registering " + eventType + " with existing concurrent dispatch using: "
+ handler.getClass());
+ if (exitOnDispatchException) {
+ dispatcher.enableExitOnDispatchException();
+ }
dispatcher.register(eventType, handler);
concurrentEventDispatchers.put(eventType, dispatcher);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/ad8a80d2/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java
index 321ea8b..4a632f5 100644
--- a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java
+++ b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
@@ -77,7 +78,7 @@ public class AsyncDispatcherConcurrent extends CompositeService implements Dispa
protected final Map<Class<? extends Enum>, EventHandler> eventHandlers = Maps.newHashMap();
protected final Map<Class<? extends Enum>, AsyncDispatcherConcurrent> eventDispatchers =
Maps.newHashMap();
- private boolean exitOnDispatchException;
+ private boolean exitOnDispatchException = false;
AsyncDispatcherConcurrent(String name, int numThreads) {
super(name);
@@ -126,10 +127,6 @@ public class AsyncDispatcherConcurrent extends CompositeService implements Dispa
@Override
protected void serviceInit(Configuration conf) throws Exception {
- // TODO TEZ-2049 remove YARN reference
- this.exitOnDispatchException =
- conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
- Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
super.serviceInit(conf);
}
@@ -284,6 +281,11 @@ public class AsyncDispatcherConcurrent extends CompositeService implements Dispa
eventDispatchers.put(eventType, dispatcher);
}
+ @VisibleForTesting
+ public void enableExitOnDispatchException() {
+ this.exitOnDispatchException = true;
+ }
+
@Override
public EventHandler getEventHandler() {
return handlerInstance;
http://git-wip-us.apache.org/repos/asf/tez/blob/ad8a80d2/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index fc24f04..76d9fdb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -478,16 +478,18 @@ public class DAGAppMaster extends AbstractService {
}
}
+ dispatcher = createDispatcher();
+
if (isLocal) {
conf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS_DEFAULT);
+ } else {
+ dispatcher.enableExitOnDispatchException();
}
- conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, !isLocal);
String strAppId = this.appAttemptID.getApplicationId().toString();
this.tezSystemStagingDir = TezCommonUtils.getTezSystemStagingPath(conf, strAppId);
- dispatcher = createDispatcher();
context = new RunningAppContext(conf);
this.aclManager = new ACLManager(appMasterUgi.getShortUserName(), this.amConf);