You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2016/09/06 18:19:36 UTC
tez git commit: TEZ-2852. TestVertexImpl fails due to race in
AsyncDispatcher. (Zhiyuan Yang via hitesh)
Repository: tez
Updated Branches:
refs/heads/branch-0.7 55df029a8 -> cbd4eacb0
TEZ-2852. TestVertexImpl fails due to race in AsyncDispatcher. (Zhiyuan Yang via hitesh)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/cbd4eacb
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/cbd4eacb
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/cbd4eacb
Branch: refs/heads/branch-0.7
Commit: cbd4eacb09b6a8ec350723848c397be42ca524c3
Parents: 55df029
Author: Hitesh Shah <hi...@apache.org>
Authored: Tue Sep 6 11:15:51 2016 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Tue Sep 6 11:15:51 2016 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/tez/common/AsyncDispatcher.java | 4 +-
.../org/apache/tez/common/DrainDispatcher.java | 123 +++++++++++++++++++
.../apache/tez/dag/app/dag/impl/TestCommit.java | 2 +-
.../tez/dag/app/dag/impl/TestDAGImpl.java | 2 +-
.../tez/dag/app/dag/impl/TestTaskRecovery.java | 2 +-
.../tez/dag/app/dag/impl/TestVertexImpl.java | 2 +-
.../dag/app/dag/impl/TestVertexRecovery.java | 2 +-
.../tez/dag/app/rm/node/TestAMNodeTracker.java | 2 +-
9 files changed, 133 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/cbd4eacb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3bba3e4..65da496 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2852. TestVertexImpl fails due to race in AsyncDispatcher.
TEZ-3326. Display JVM system properties in AM and task logs.
TEZ-3009. Errors that occur during container task acquisition are not logged.
TEZ-3413. ConcurrentModificationException in HistoryEventTimelineConversion for AppLaunchedEvent.
http://git-wip-us.apache.org/repos/asf/tez/blob/cbd4eacb/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
index 159ccd9..ec5f6c7 100644
--- a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
+++ b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
@@ -215,7 +216,8 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
"Multiple concurrent dispatchers cannot be registered for: " + eventType.getName());
}
- private void checkForExistingDispatchers(boolean checkHandler, Class<? extends Enum> eventType) {
+ @VisibleForTesting
+ protected void checkForExistingDispatchers(boolean checkHandler, Class<? extends Enum> eventType) {
if (checkHandler) {
checkForExistingHandler(eventType);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/cbd4eacb/tez-common/src/test/java/org/apache/tez/common/DrainDispatcher.java
----------------------------------------------------------------------
diff --git a/tez-common/src/test/java/org/apache/tez/common/DrainDispatcher.java b/tez-common/src/test/java/org/apache/tez/common/DrainDispatcher.java
new file mode 100644
index 0000000..fd1fc0a
--- /dev/null
+++ b/tez-common/src/test/java/org/apache/tez/common/DrainDispatcher.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.common;
+
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class DrainDispatcher extends AsyncDispatcher {
+ static final String DEFAULT_NAME = "dispatcher";
+ private volatile boolean drained = false;
+ private volatile boolean stopped = false;
+ private final BlockingQueue<Event> queue;
+ private final Object mutex;
+ private static final Logger LOG = LoggerFactory.getLogger(DrainDispatcher.class);
+
+ public DrainDispatcher() {
+ this(DEFAULT_NAME, new LinkedBlockingQueue<Event>());
+ }
+
+ public DrainDispatcher(String name, BlockingQueue<Event> eventQueue) {
+ super(name, eventQueue);
+ this.queue = eventQueue;
+ this.mutex = this;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void register(Class<? extends Enum> eventType,
+ EventHandler handler) {
+ /* check to see if we have a listener registered */
+ EventHandler<Event> registeredHandler = (EventHandler<Event>) eventHandlers.get(eventType);
+ checkForExistingDispatchers(false, eventType);
+ LOG.info("Registering " + eventType + " for " + handler.getClass());
+ if (registeredHandler == null) {
+ eventHandlers.put(eventType, handler);
+ } else if (!(registeredHandler instanceof MultiListenerHandler)){
+ /* for multiple listeners of an event add the multiple listener handler */
+ MultiListenerHandler multiHandler = new MultiListenerHandler();
+ multiHandler.addHandler(registeredHandler);
+ multiHandler.addHandler(handler);
+ eventHandlers.put(eventType, multiHandler);
+ } else {
+ /* already a multilistener, just add to it */
+ MultiListenerHandler multiHandler
+ = (MultiListenerHandler) registeredHandler;
+ multiHandler.addHandler(handler);
+ }
+ }
+
+ /**
+ * Busy loop waiting for all queued events to drain.
+ */
+ public void await() {
+ while (!drained) {
+ Thread.yield();
+ }
+ }
+
+ @Override
+ public Runnable createThread() {
+ return new Runnable() {
+ @Override
+ public void run() {
+ while (!stopped && !Thread.currentThread().isInterrupted()) {
+ synchronized (mutex) {
+ // !drained if dispatch queued new events on this dispatcher
+ drained = queue.isEmpty();
+ }
+ Event event;
+ try {
+ event = queue.take();
+ } catch (InterruptedException ie) {
+ return;
+ }
+ if (event != null) {
+ dispatch(event);
+ }
+ }
+ }
+ };
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public EventHandler getEventHandler() {
+ final EventHandler actual = super.getEventHandler();
+ return new EventHandler() {
+ @Override
+ public void handle(Event event) {
+ synchronized (mutex) {
+ actual.handle(event);
+ drained = false;
+ }
+ }
+ };
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ stopped = true;
+ super.serviceStop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/cbd4eacb/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
index 83421a2..464a370 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
@@ -45,10 +45,10 @@ import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.common.DrainDispatcher;
import org.apache.tez.common.security.ACLManager;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSinkDescriptor;
http://git-wip-us.apache.org/repos/asf/tez/blob/cbd4eacb/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 6d45433..52ce23f 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -38,6 +38,7 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang.StringUtils;
+import org.apache.tez.common.DrainDispatcher;
import org.apache.tez.common.counters.Limits;
import org.apache.tez.common.counters.TezCounters;
import org.slf4j.Logger;
@@ -47,7 +48,6 @@ import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
http://git-wip-us.apache.org/repos/asf/tez/blob/cbd4eacb/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
index fbf815d..2c47624 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
@@ -35,9 +35,9 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.common.DrainDispatcher;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
http://git-wip-us.apache.org/repos/asf/tez/blob/cbd4eacb/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 520d10f..da59539 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -50,6 +50,7 @@ import com.google.protobuf.ByteString;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.tez.common.DrainDispatcher;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.counters.Limits;
import org.apache.tez.common.counters.TezCounters;
@@ -71,7 +72,6 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
http://git-wip-us.apache.org/repos/asf/tez/blob/cbd4eacb/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
index bdb2377..79364ac 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
@@ -29,13 +29,13 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import org.apache.tez.common.DrainDispatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.common.counters.TezCounters;
http://git-wip-us.apache.org/repos/asf/tez/blob/cbd4eacb/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
index 0072f6a..4f0c182 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.mock;
import java.util.List;
+import org.apache.tez.common.DrainDispatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -34,7 +35,6 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.dag.api.TezConfiguration;