You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2016/09/06 18:19:36 UTC

tez git commit: TEZ-2852. TestVertexImpl fails due to race in AsyncDispatcher. (Zhiyuan Yang via hitesh)

Repository: tez
Updated Branches:
  refs/heads/branch-0.7 55df029a8 -> cbd4eacb0


TEZ-2852. TestVertexImpl fails due to race in AsyncDispatcher. (Zhiyuan Yang via hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/cbd4eacb
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/cbd4eacb
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/cbd4eacb

Branch: refs/heads/branch-0.7
Commit: cbd4eacb09b6a8ec350723848c397be42ca524c3
Parents: 55df029
Author: Hitesh Shah <hi...@apache.org>
Authored: Tue Sep 6 11:15:51 2016 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Tue Sep 6 11:15:51 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/tez/common/AsyncDispatcher.java  |   4 +-
 .../org/apache/tez/common/DrainDispatcher.java  | 123 +++++++++++++++++++
 .../apache/tez/dag/app/dag/impl/TestCommit.java |   2 +-
 .../tez/dag/app/dag/impl/TestDAGImpl.java       |   2 +-
 .../tez/dag/app/dag/impl/TestTaskRecovery.java  |   2 +-
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |   2 +-
 .../dag/app/dag/impl/TestVertexRecovery.java    |   2 +-
 .../tez/dag/app/rm/node/TestAMNodeTracker.java  |   2 +-
 9 files changed, 133 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/cbd4eacb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3bba3e4..65da496 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-2852. TestVertexImpl fails due to race in AsyncDispatcher.
   TEZ-3326. Display JVM system properties in AM and task logs.
   TEZ-3009. Errors that occur during container task acquisition are not logged.
   TEZ-3413. ConcurrentModificationException in HistoryEventTimelineConversion for AppLaunchedEvent.

http://git-wip-us.apache.org/repos/asf/tez/blob/cbd4eacb/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
index 159ccd9..ec5f6c7 100644
--- a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
+++ b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.CompositeService;
@@ -215,7 +216,8 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
         "Multiple concurrent dispatchers cannot be registered for: " + eventType.getName());
   }
   
-  private void checkForExistingDispatchers(boolean checkHandler, Class<? extends Enum> eventType) {
+  @VisibleForTesting
+  protected void checkForExistingDispatchers(boolean checkHandler, Class<? extends Enum> eventType) {
     if (checkHandler) {
       checkForExistingHandler(eventType);
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/cbd4eacb/tez-common/src/test/java/org/apache/tez/common/DrainDispatcher.java
----------------------------------------------------------------------
diff --git a/tez-common/src/test/java/org/apache/tez/common/DrainDispatcher.java b/tez-common/src/test/java/org/apache/tez/common/DrainDispatcher.java
new file mode 100644
index 0000000..fd1fc0a
--- /dev/null
+++ b/tez-common/src/test/java/org/apache/tez/common/DrainDispatcher.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.common;
+
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class DrainDispatcher extends AsyncDispatcher {
+  static final String DEFAULT_NAME = "dispatcher";
+  private volatile boolean drained = false;
+  private volatile boolean stopped = false;
+  private final BlockingQueue<Event> queue;
+  private final Object mutex;
+  private static final Logger LOG = LoggerFactory.getLogger(DrainDispatcher.class);
+
+  public DrainDispatcher() {
+    this(DEFAULT_NAME, new LinkedBlockingQueue<Event>());
+  }
+
+  public DrainDispatcher(String name, BlockingQueue<Event> eventQueue) {
+    super(name, eventQueue);
+    this.queue = eventQueue;
+    this.mutex = this;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void register(Class<? extends Enum> eventType,
+                       EventHandler handler) {
+    /* check to see if we have a listener registered */
+    EventHandler<Event> registeredHandler = (EventHandler<Event>) eventHandlers.get(eventType);
+    checkForExistingDispatchers(false, eventType);
+    LOG.info("Registering " + eventType + " for " + handler.getClass());
+    if (registeredHandler == null) {
+      eventHandlers.put(eventType, handler);
+    } else if (!(registeredHandler instanceof MultiListenerHandler)){
+      /* for multiple listeners of an event add the multiple listener handler */
+      MultiListenerHandler multiHandler = new MultiListenerHandler();
+      multiHandler.addHandler(registeredHandler);
+      multiHandler.addHandler(handler);
+      eventHandlers.put(eventType, multiHandler);
+    } else {
+      /* already a multilistener, just add to it */
+      MultiListenerHandler multiHandler
+        = (MultiListenerHandler) registeredHandler;
+      multiHandler.addHandler(handler);
+    }
+  }
+
+  /**
+   * Busy loop waiting for all queued events to drain.
+   */
+  public void await() {
+    while (!drained) {
+      Thread.yield();
+    }
+  }
+
+  @Override
+  public Runnable createThread() {
+    return new Runnable() {
+      @Override
+      public void run() {
+        while (!stopped && !Thread.currentThread().isInterrupted()) {
+          synchronized (mutex) {
+            // !drained if dispatch queued new events on this dispatcher
+            drained = queue.isEmpty();
+          }
+          Event event;
+          try {
+            event = queue.take();
+          } catch (InterruptedException ie) {
+            return;
+          }
+          if (event != null) {
+            dispatch(event);
+          }
+        }
+      }
+    };
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public EventHandler getEventHandler() {
+    final EventHandler actual = super.getEventHandler();
+    return new EventHandler() {
+      @Override
+      public void handle(Event event) {
+        synchronized (mutex) {
+          actual.handle(event);
+          drained = false;
+        }
+      }
+    };
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    stopped = true;
+    super.serviceStop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/cbd4eacb/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
index 83421a2..464a370 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
@@ -45,10 +45,10 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.common.DrainDispatcher;
 import org.apache.tez.common.security.ACLManager;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.DataSinkDescriptor;

http://git-wip-us.apache.org/repos/asf/tez/blob/cbd4eacb/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 6d45433..52ce23f 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -38,6 +38,7 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.tez.common.DrainDispatcher;
 import org.apache.tez.common.counters.Limits;
 import org.apache.tez.common.counters.TezCounters;
 import org.slf4j.Logger;
@@ -47,7 +48,6 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;

http://git-wip-us.apache.org/repos/asf/tez/blob/cbd4eacb/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
index fbf815d..2c47624 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
@@ -35,9 +35,9 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.common.DrainDispatcher;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;

http://git-wip-us.apache.org/repos/asf/tez/blob/cbd4eacb/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 520d10f..da59539 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -50,6 +50,7 @@ import com.google.protobuf.ByteString;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.tez.common.DrainDispatcher;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.counters.Limits;
 import org.apache.tez.common.counters.TezCounters;
@@ -71,7 +72,6 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;

http://git-wip-us.apache.org/repos/asf/tez/blob/cbd4eacb/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
index bdb2377..79364ac 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
@@ -29,13 +29,13 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.tez.common.DrainDispatcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.common.counters.TezCounters;

http://git-wip-us.apache.org/repos/asf/tez/blob/cbd4eacb/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
index 0072f6a..4f0c182 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.mock;
 
 import java.util.List;
 
+import org.apache.tez.common.DrainDispatcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -34,7 +35,6 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.dag.api.TezConfiguration;