You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2016/05/02 03:19:57 UTC
hive git commit: HIVE-13512 : Make initializing dag ids in TezWork
thread safe for parallel compilation (Peter Slawski via Gopal V)
Repository: hive
Updated Branches:
refs/heads/master b4b821e0a -> 13bc529f4
HIVE-13512 : Make initializing dag ids in TezWork thread safe for parallel compilation (Peter Slawski via Gopal V)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/13bc529f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/13bc529f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/13bc529f
Branch: refs/heads/master
Commit: 13bc529f44318bf4cfe97c2391dca3d461dc9ec7
Parents: b4b821e
Author: Peter Slawski <pe...@amazon.com>
Authored: Wed Apr 13 19:54:00 2016 -0800
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Sun May 1 18:17:00 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/ql/plan/TezWork.java | 9 ++-
.../hive/ql/plan/TestTezWorkConcurrency.java | 65 ++++++++++++++++++++
2 files changed, 72 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/13bc529f/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java
index c6ef829..7a70e6b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java
@@ -29,6 +29,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
@@ -69,7 +70,7 @@ public class TezWork extends AbstractOperatorDesc {
private static transient final Logger LOG = LoggerFactory.getLogger(TezWork.class);
- private static int counter;
+ private static final AtomicInteger counter = new AtomicInteger(1);
private final String dagId;
private final String queryName;
private final Set<BaseWork> roots = new HashSet<BaseWork>();
@@ -80,8 +81,12 @@ public class TezWork extends AbstractOperatorDesc {
new HashMap<Pair<BaseWork, BaseWork>, TezEdgeProperty>();
private final Map<BaseWork, VertexType> workVertexTypeMap = new HashMap<BaseWork, VertexType>();
+ public TezWork(String queryId) {
+ this(queryId, null);
+ }
+
public TezWork(String queryId, Configuration conf) {
- this.dagId = queryId + ":" + (++counter);
+ this.dagId = queryId + ":" + counter.getAndIncrement();
String queryName = (conf != null) ? DagUtils.getUserSpecifiedDagName(conf) : null;
if (queryName == null) {
queryName = this.dagId;
http://git-wip-us.apache.org/repos/asf/hive/blob/13bc529f/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWorkConcurrency.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWorkConcurrency.java b/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWorkConcurrency.java
new file mode 100644
index 0000000..c59fd10
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWorkConcurrency.java
@@ -0,0 +1,65 @@
+package org.apache.hadoop.hive.ql.plan;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+
+import static org.junit.Assert.assertEquals;
+
+public final class TestTezWorkConcurrency {
+
+ @Test
+ public void ensureDagIdIsUnique() throws Exception {
+ final int threadCount = 5;
+ final CountDownLatch threadReadyToStartSignal = new CountDownLatch(threadCount);
+ final CountDownLatch startThreadSignal = new CountDownLatch(1);
+ final int numberOfTezWorkToCreatePerThread = 100;
+
+ List<FutureTask<Set<String>>> tasks = Lists.newArrayList();
+ for (int i = 0; i < threadCount; i++) {
+ tasks.add(new FutureTask<>(new Callable<Set<String>>() {
+ @Override
+ public Set<String> call() throws Exception {
+ threadReadyToStartSignal.countDown();
+ startThreadSignal.await();
+ return generateTezWorkDagIds(numberOfTezWorkToCreatePerThread);
+ }
+ }));
+ }
+ ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+ for (FutureTask<Set<String>> task : tasks) {
+ executor.execute(task);
+ }
+ threadReadyToStartSignal.await();
+ startThreadSignal.countDown();
+ Set<String> allTezWorkDagIds = getAllTezWorkDagIds(tasks);
+ assertEquals(threadCount * numberOfTezWorkToCreatePerThread, allTezWorkDagIds.size());
+ }
+
+ private static Set<String> generateTezWorkDagIds(int numberOfNames) {
+ Set<String> tezWorkIds = Sets.newHashSet();
+ for (int i = 0; i < numberOfNames; i++) {
+ TezWork work = new TezWork("query-id");
+ tezWorkIds.add(work.getDagId());
+ }
+ return tezWorkIds;
+ }
+
+ private static Set<String> getAllTezWorkDagIds(List<FutureTask<Set<String>>> tasks)
+ throws ExecutionException, InterruptedException {
+ Set<String> allTezWorkDagIds = Sets.newHashSet();
+ for (FutureTask<Set<String>> task : tasks) {
+ allTezWorkDagIds.addAll(task.get());
+ }
+ return allTezWorkDagIds;
+ }
+}