You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2016/05/02 03:19:57 UTC

hive git commit: HIVE-13512 : Make initializing dag ids in TezWork thread safe for parallel compilation (Peter Slawski via Gopal V)

Repository: hive
Updated Branches:
  refs/heads/master b4b821e0a -> 13bc529f4


HIVE-13512 : Make initializing dag ids in TezWork thread safe for parallel compilation (Peter Slawski via Gopal V)

Signed-off-by: Ashutosh Chauhan <ha...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/13bc529f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/13bc529f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/13bc529f

Branch: refs/heads/master
Commit: 13bc529f44318bf4cfe97c2391dca3d461dc9ec7
Parents: b4b821e
Author: Peter Slawski <pe...@amazon.com>
Authored: Wed Apr 13 19:54:00 2016 -0800
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Sun May 1 18:17:00 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/ql/plan/TezWork.java |  9 ++-
 .../hive/ql/plan/TestTezWorkConcurrency.java    | 65 ++++++++++++++++++++
 2 files changed, 72 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/13bc529f/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java
index c6ef829..7a70e6b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java
@@ -29,6 +29,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
@@ -69,7 +70,7 @@ public class TezWork extends AbstractOperatorDesc {
 
   private static transient final Logger LOG = LoggerFactory.getLogger(TezWork.class);
 
-  private static int counter;
+  private static final AtomicInteger counter = new AtomicInteger(1);
   private final String dagId;
   private final String queryName;
   private final Set<BaseWork> roots = new HashSet<BaseWork>();
@@ -80,8 +81,12 @@ public class TezWork extends AbstractOperatorDesc {
       new HashMap<Pair<BaseWork, BaseWork>, TezEdgeProperty>();
   private final Map<BaseWork, VertexType> workVertexTypeMap = new HashMap<BaseWork, VertexType>();
 
+  public TezWork(String queryId) {
+    this(queryId, null);
+  }
+
   public TezWork(String queryId, Configuration conf) {
-    this.dagId = queryId + ":" + (++counter);
+    this.dagId = queryId + ":" + counter.getAndIncrement();
     String queryName = (conf != null) ? DagUtils.getUserSpecifiedDagName(conf) : null;
     if (queryName == null) {
       queryName = this.dagId;

http://git-wip-us.apache.org/repos/asf/hive/blob/13bc529f/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWorkConcurrency.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWorkConcurrency.java b/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWorkConcurrency.java
new file mode 100644
index 0000000..c59fd10
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWorkConcurrency.java
@@ -0,0 +1,65 @@
+package org.apache.hadoop.hive.ql.plan;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+
+import static org.junit.Assert.assertEquals;
+
+public final class TestTezWorkConcurrency {
+
+  @Test
+  public void ensureDagIdIsUnique() throws Exception {
+    final int threadCount = 5;
+    final CountDownLatch threadReadyToStartSignal = new CountDownLatch(threadCount);
+    final CountDownLatch startThreadSignal = new CountDownLatch(1);
+    final int numberOfTezWorkToCreatePerThread = 100;
+
+    List<FutureTask<Set<String>>> tasks = Lists.newArrayList();
+    for (int i = 0; i < threadCount; i++) {
+      tasks.add(new FutureTask<>(new Callable<Set<String>>() {
+        @Override
+        public Set<String> call() throws Exception {
+          threadReadyToStartSignal.countDown();
+          startThreadSignal.await();
+          return generateTezWorkDagIds(numberOfTezWorkToCreatePerThread);
+        }
+      }));
+    }
+    ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+    for (FutureTask<Set<String>> task : tasks) {
+      executor.execute(task);
+    }
+    threadReadyToStartSignal.await();
+    startThreadSignal.countDown();
+    Set<String> allTezWorkDagIds = getAllTezWorkDagIds(tasks);
+    assertEquals(threadCount * numberOfTezWorkToCreatePerThread, allTezWorkDagIds.size());
+  }
+
+  private static Set<String> generateTezWorkDagIds(int numberOfNames) {
+    Set<String> tezWorkIds = Sets.newHashSet();
+    for (int i = 0; i < numberOfNames; i++) {
+      TezWork work = new TezWork("query-id");
+      tezWorkIds.add(work.getDagId());
+    }
+    return tezWorkIds;
+  }
+
+  private static Set<String> getAllTezWorkDagIds(List<FutureTask<Set<String>>> tasks)
+      throws ExecutionException, InterruptedException {
+    Set<String> allTezWorkDagIds = Sets.newHashSet();
+    for (FutureTask<Set<String>> task : tasks) {
+      allTezWorkDagIds.addAll(task.get());
+    }
+    return allTezWorkDagIds;
+  }
+}