You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ha...@apache.org on 2017/04/20 09:16:50 UTC
tez git commit: TEZ-1187. Add a framework ExecutorService which
shares threads (harishjp)
Repository: tez
Updated Branches:
refs/heads/master fb0e45bf7 -> a5179d649
TEZ-1187. Add a framework ExecutorService which shares threads (harishjp)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a5179d64
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a5179d64
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a5179d64
Branch: refs/heads/master
Commit: a5179d64937244e21694560f8d52d320ebca20c7
Parents: fb0e45b
Author: Harish JP <ha...@gmail.com>
Authored: Thu Apr 20 14:45:44 2017 +0530
Committer: Harish JP <ha...@gmail.com>
Committed: Thu Apr 20 14:45:44 2017 +0530
----------------------------------------------------------------------
.../apache/tez/dag/api/TezConfiguration.java | 18 +
.../org/apache/tez/runtime/api/TaskContext.java | 17 +-
.../org/apache/tez/common/TezExecutors.java | 52 +++
.../apache/tez/common/TezSharedExecutor.java | 338 +++++++++++++++++++
.../tez/common/TestTezSharedExecutor.java | 256 ++++++++++++++
.../tez/service/impl/ContainerRunnerImpl.java | 15 +-
.../apache/tez/service/impl/TezTestService.java | 8 +-
.../tez/mapreduce/output/TestMROutput.java | 15 +-
.../tez/mapreduce/processor/MapUtils.java | 5 +-
.../processor/map/TestMapProcessor.java | 29 +-
.../processor/reduce/TestReduceProcessor.java | 7 +-
.../runtime/LogicalIOProcessorRuntimeTask.java | 14 +-
.../runtime/api/impl/TezInputContextImpl.java | 7 +-
.../runtime/api/impl/TezOutputContextImpl.java | 7 +-
.../api/impl/TezProcessorContextImpl.java | 7 +-
.../runtime/api/impl/TezTaskContextImpl.java | 13 +-
.../org/apache/tez/runtime/task/TezChild.java | 6 +-
.../apache/tez/runtime/task/TezTaskRunner2.java | 31 +-
.../TestLogicalIOProcessorRuntimeTask.java | 12 +-
.../runtime/api/impl/TestProcessorContext.java | 14 +-
.../tez/runtime/task/TestTaskExecution2.java | 14 +-
.../tez/runtime/task/TestTezTaskRunner2.java | 8 +-
.../output/TestOnFileUnorderedKVOutput.java | 23 +-
23 files changed, 839 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 40f84e6..c0179f8 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -1790,4 +1790,22 @@ public class TezConfiguration extends Configuration {
TEZ_PREFIX + "am.client.heartbeat.poll.interval.millis";
public static final int TEZ_AM_CLIENT_HEARTBEAT_POLL_INTERVAL_MILLIS_DEFAULT = -1;
+ /**
+ * Int value. Minimum number of threads to be allocated by TezSharedExecutor.
+ */
+ @Private
+ @ConfigurationScope(Scope.AM)
+ public static final String TEZ_SHARED_EXECUTOR_MIN_THREADS = "tez.shared-executor.min-threads";
+ public static final int TEZ_SHARED_EXECUTOR_MIN_THREADS_DEFAULT = 0;
+
+ /**
+ * Int value. Maximum number of threads to be allocated by TezSharedExecutor. If value is negative
+ * then Integer.MAX_VALUE is used as the limit.
+ * Default: Integer.MAX_VALUE.
+ */
+ @Private
+ @ConfigurationScope(Scope.AM)
+ public static final String TEZ_SHARED_EXECUTOR_MAX_THREADS = "tez.shared-executor.max-threads";
+ public static final int TEZ_SHARED_EXECUTOR_MAX_THREADS_DEFAULT = -1;
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java
index b5e42bc..dd2951a 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java
@@ -20,6 +20,7 @@ package org.apache.tez.runtime.api;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.concurrent.ExecutorService;
import javax.annotation.Nullable;
@@ -234,5 +235,19 @@ public interface TaskContext {
* @return the execution context
*/
public ExecutionContext getExecutionContext();
-
+
+ /**
+ * Create a new ExecutorService with the given parallelism and thread name format. The parallelism
+ * might not be guaranteed. The service returned works with tez framework, currently it provides
+ * thread reuse across tasks.
+ * Note: This is an unstable api, and is not recommended to be used by external users. Please wait
+ * until API and code is stablized by use in Tez processors, input and outputs.
+ * @param parallelism The expected parallelism for for this ExecutorService.
+ * @param threadNameFormat The thread name format, format will be given one parameter, threadId.
+ * @return An ExecutorService instance.
+ */
+ @Private
+ @Unstable
+ public ExecutorService createTezFrameworkExecutorService(
+ int parallelism, String threadNameFormat);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-common/src/main/java/org/apache/tez/common/TezExecutors.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezExecutors.java b/tez-common/src/main/java/org/apache/tez/common/TezExecutors.java
new file mode 100644
index 0000000..a74c8ad
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/common/TezExecutors.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * Interface to capture factory of ExecutorService.
+ */
+@Private
+@Unstable
+public interface TezExecutors {
+
+ /**
+ * Create a ExecutorService with the given parameters.
+ *
+ * @param parallelism Represents total number of tasks to be executed in parallel.
+ * @param threadNameFormat The name the thread should take when executing tasks from this executor
+ * @return An ExecutorService.
+ */
+ ExecutorService createExecutorService(int parallelism, String threadNameFormat);
+
+ /**
+ * Shutdown all the ExecutorService created using this factory.
+ */
+ void shutdown();
+
+ /**
+ * Shutdown all the ExecutorService created using this factory. It will discard any tasks which
+ * are not running and interrupt the running tasks.
+ */
+ void shutdownNow();
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-common/src/main/java/org/apache/tez/common/TezSharedExecutor.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezSharedExecutor.java b/tez-common/src/main/java/org/apache/tez/common/TezSharedExecutor.java
new file mode 100644
index 0000000..93bf3cc
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/common/TezSharedExecutor.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.TezConfiguration;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * An ExecutorService factory which shares threads between executors created using this service.
+ */
+@Private
+@Unstable
+public class TezSharedExecutor implements TezExecutors {
+
+ // The shared executor service which will be used to execute all the tasks.
+ private final ThreadPoolExecutor service;
+
+ private final DelayedExecutionPoller poller;
+
+ public TezSharedExecutor(Configuration conf) {
+ // The default value is 0. We could start with a few threads so that thread pool is never empty.
+ int minThreads = conf.getInt(TezConfiguration.TEZ_SHARED_EXECUTOR_MIN_THREADS,
+ TezConfiguration.TEZ_SHARED_EXECUTOR_MIN_THREADS_DEFAULT);
+ // The default value is Integer.MAX_VALUE, but ExecutorServiceInternal will do the rate limiting
+ // of total numbers of tasks and hence the num threads will be bounded.
+ int maxThreads = conf.getInt(TezConfiguration.TEZ_SHARED_EXECUTOR_MAX_THREADS,
+ TezConfiguration.TEZ_SHARED_EXECUTOR_MAX_THREADS_DEFAULT);
+ if (maxThreads < 0) {
+ maxThreads = Integer.MAX_VALUE;
+ }
+ this.service = new ThreadPoolExecutor(
+ minThreads, maxThreads,
+ // The timeout is to give thread a chance to be re-used instead of being cleaned up.
+ 60, TimeUnit.SECONDS,
+ new SynchronousQueue<Runnable>(),
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("TezSharedExecutor: %d").build());
+
+ // Setup polling thread to pick new tasks from the underlying executors.
+ poller = new DelayedExecutionPoller(service);
+ poller.start();
+ }
+
+ public ExecutorService createExecutorService(int poolSize, String threadName) {
+ return new ExecutorServiceInternal(poolSize, threadName);
+ }
+
+ // Should we allow a shared service shutdown, once this shutdown is complete, all the executors
+ // are in shutdown mode and will throw exception if we try to submit new tasks. And already
+ // submitted tasks in the ExecutorServiceInternal which are not yet submitted to the shared
+ // service will not be executed. That break contracts, we can fix this by tracking that the
+ // service is shutdown and wait until all the dependent.
+ public void shutdown() {
+ service.shutdown();
+ poller.interrupt();
+ }
+
+ public void shutdownNow() {
+ service.shutdownNow();
+ poller.interrupt();
+ }
+
+ @Override
+ protected void finalize() {
+ this.shutdown();
+ }
+
+ private static class DelayedExecutionPoller extends Thread {
+ // Store service reference in this static class to prevent a reference of TezSharedExecutor from
+ // being held inside a non static class which prevents cleanup via GC.
+ private final ThreadPoolExecutor service;
+
+ // A queue which contains instances which have tasks to be executed.
+ private final LinkedBlockingQueue<ExecutorServiceInternal> executeQueue =
+ new LinkedBlockingQueue<>();
+
+ DelayedExecutionPoller(ThreadPoolExecutor service) {
+ super("DelayedExecutionPoller");
+ this.setDaemon(true);
+ this.service = service;
+ }
+
+ void add(ExecutorServiceInternal es) {
+ executeQueue.add(es);
+ }
+
+ @Override
+ public void run() {
+ while (!service.isShutdown()) {
+ try {
+ executeQueue.take().tryExecute();
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ }
+
+ /*
+ * The internal shared executor service which delegates all the execution to the shared service.
+ * It allows managing a given instance of ExecutorService independently of other instances created
+ * in the same service.
+ *
+ * - It stores a queue of submitted tasks and submits only the configured poolSize number of tasks
+ * into the shared executor service.
+ * - Stores a list of futures used implement shutdownNow and awaitTermination.
+ */
+ private class ExecutorServiceInternal extends AbstractExecutorService {
+ // This contains all the tasks which are submitted through this ExecutorService and has not
+ // finished, we use this to implement shutdownNow and awaitForTermination.
+ // Note: This should have been an Set, but we do not have a concurrent set.
+ private final ConcurrentHashMap<ManagedFutureTask<?>, Boolean> futures =
+ new ConcurrentHashMap<>();
+
+ // Number of tasks currently submitted by this executor to the common executor service.
+ private final AtomicInteger numTasksSubmitted = new AtomicInteger();
+
+ // The list of pending tasks to be submitted on behalf of this service.
+ private final LinkedBlockingQueue<ManagedFutureTask<?>> pendingTasks =
+ new LinkedBlockingQueue<>();
+
+ // Set to 0 when shutdown is complete, a CountDownLatch is used to enable wait for shutdown in
+ // awaitTermination.
+ private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+ // The thread name to be used for threads executing tasks of this executor.
+ private final String threadName;
+
+ // Total number of threads to be used.
+ private final int poolSize;
+
+ ExecutorServiceInternal(int poolSize, String threadName) {
+ Preconditions.checkArgument(poolSize > 0, "Expected poolSize > 0");
+ this.threadName = threadName;
+ this.poolSize = poolSize;
+ }
+
+ // A FutureTask which we will use to wrap all the runnable and callable. It adds and removes
+ // from the futures set above. And also notifies TezSharedExecutor to pick new tasks from the
+ // current ExecutorServiceInternal instance.
+ private class ManagedFutureTask<V> extends FutureTask<V> {
+ // Set to true if this task was submitted to the shared ExecutorService.
+ private boolean submitted = false;
+
+ ManagedFutureTask(Runnable runnable, V value) {
+ super(runnable, value);
+ addFuture(this);
+ }
+
+ ManagedFutureTask(Callable<V> callable) {
+ super(callable);
+ addFuture(this);
+ }
+
+ @Override
+ public void run() {
+ Thread thisThread = Thread.currentThread();
+ String savedThreadName = null;
+ if (threadName != null) {
+ savedThreadName = thisThread.getName();
+ thisThread.setName(String.format(threadName, thisThread.getId()));
+ }
+ try {
+ super.run();
+ } finally {
+ if (threadName != null) {
+ thisThread.setName(savedThreadName);
+ }
+ }
+ }
+
+ // There is a race b/w cancel and submit hence the synchronization.
+ synchronized void submit() {
+ submitted = true;
+ service.execute(this);
+ }
+
+ @Override
+ public void done() {
+ removeFuture(this);
+ synchronized (this) {
+ if (submitted) { // Decrement only if this task was submitted.
+ numTasksSubmitted.decrementAndGet();
+ }
+ }
+ // Add internal executor service to poller to schedule another task if available.
+ // We do this instead of invoking tryExecute here, to give a chance for this thread to be
+ // reused. But its still possible that a new thread is created.
+ poller.add(ExecutorServiceInternal.this);
+ }
+ }
+
+ private void addFuture(ManagedFutureTask<?> future) {
+ futures.put(future, Boolean.TRUE);
+ // If already shutdown, reject this task.
+ if (isShutdown()) {
+ service.getRejectedExecutionHandler().rejectedExecution(future, service);
+ }
+ }
+
+ private void removeFuture(ManagedFutureTask<?> future) {
+ futures.remove(future);
+ }
+
+ // Return our internal future task so that all the tasks submitted are tracked and cleaned up.
+ @SuppressWarnings("unchecked")
+ @Override
+ protected <T> ManagedFutureTask<T> newTaskFor(Runnable runnable, T value) {
+ if (runnable instanceof ManagedFutureTask) {
+ return (ManagedFutureTask<T>)runnable;
+ }
+ return new ManagedFutureTask<T>(runnable, value);
+ }
+
+ @Override
+ protected <T> ManagedFutureTask<T> newTaskFor(Callable<T> callable) {
+ return new ManagedFutureTask<T>(callable);
+ }
+
+ @Override
+ public void shutdown() {
+ shutdownLatch.countDown();
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ shutdownLatch.countDown();
+ List<Runnable> pending = new ArrayList<>(pendingTasks.size());
+ pendingTasks.drainTo(pending);
+ // cancel all futures, interrupt if its running.
+ for (ManagedFutureTask<?> future : futures.keySet()) {
+ future.cancel(true);
+ }
+ return pending;
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return shutdownLatch.getCount() == 0 || service.isShutdown();
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return isShutdown() && futures.isEmpty();
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ long deadline = System.nanoTime() + unit.toNanos(timeout);
+ // Wait for shutdown to be invoked.
+ if (!shutdownLatch.await(timeout, unit)) {
+ return false;
+ }
+ // Wait for the remaining futures to finish.
+ for (ManagedFutureTask<?> future : futures.keySet()) {
+ long nanosLeft = deadline - System.nanoTime();
+ if (nanosLeft <= 0) {
+ return false;
+ }
+ try {
+ future.get(nanosLeft, TimeUnit.NANOSECONDS);
+ } catch (ExecutionException | CancellationException ignore) {
+ } catch (TimeoutException e) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ // Submit a task if task is available and poolSize has not been reached.
+ private void tryExecute() {
+ while (!pendingTasks.isEmpty()) {
+ int numTasks = numTasksSubmitted.get();
+ if (numTasks >= poolSize) {
+ return;
+ }
+ if (numTasksSubmitted.compareAndSet(numTasks, numTasks + 1)) {
+ ManagedFutureTask<?> task = pendingTasks.poll();
+ // This breaks a contract unfortunately. If a task is submitted and it ends up in a
+ // queue and then the shared service is shutdown then this job cannot be executed, which
+ // is not the contract, ideally it should execute the task.
+ if (task == null || task.isCancelled() || service.isShutdown()) {
+ numTasksSubmitted.decrementAndGet();
+ } else {
+ task.submit();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ this.pendingTasks.add(newTaskFor(command, null));
+ this.tryExecute();
+ }
+
+ @Override
+ protected void finalize() {
+ this.shutdown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-common/src/test/java/org/apache/tez/common/TestTezSharedExecutor.java
----------------------------------------------------------------------
diff --git a/tez-common/src/test/java/org/apache/tez/common/TestTezSharedExecutor.java b/tez-common/src/test/java/org/apache/tez/common/TestTezSharedExecutor.java
new file mode 100644
index 0000000..8d87846
--- /dev/null
+++ b/tez-common/src/test/java/org/apache/tez/common/TestTezSharedExecutor.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestTezSharedExecutor {
+
+ private static class Sleep implements Runnable {
+ private final long sleepTime;
+ Sleep(long sleepTime) {
+ this.sleepTime = sleepTime;
+ }
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ private static class Wait implements Runnable {
+ private final Object ref;
+ Wait(Object ref) {
+ this.ref = ref == null ? this : ref;
+ }
+ @Override
+ public void run() {
+ try {
+ synchronized (ref) {
+ ref.wait();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ private static class Counter implements Runnable {
+ private final AtomicInteger counter;
+ Counter(ConcurrentHashMap<String, AtomicInteger> map, String tag) {
+ if (!map.contains(tag)) {
+ map.putIfAbsent(tag, new AtomicInteger(0));
+ }
+ this.counter = map.get(tag);
+ }
+ @Override
+ public void run() {
+ counter.getAndIncrement();
+ }
+ }
+
+ private static class Appender<T> implements Runnable {
+ private final Collection<T> collection;
+ private final T obj;
+ Appender(Collection<T> collection, T obj) {
+ this.collection = collection;
+ this.obj = obj;
+ }
+ @Override
+ public void run() {
+ collection.add(obj);
+ }
+ }
+
+ private static class Runner implements Runnable {
+ private Runnable[] runnables;
+ Runner(Runnable ... runnables) {
+ this.runnables = runnables;
+ }
+ @Override
+ public void run() {
+ for (Runnable runnable : runnables) {
+ runnable.run();
+ }
+ }
+ }
+
+ private void _notify(Object obj) {
+ synchronized (obj) {
+ obj.notify();
+ }
+ }
+
+ private TezSharedExecutor sharedExecutor;
+
+ @Before
+ public void setup() {
+ sharedExecutor = new TezSharedExecutor(new Configuration());
+ }
+
+ @After
+ public void cleanup() {
+ sharedExecutor.shutdownNow();
+ sharedExecutor = null;
+ }
+
+ @Test(timeout=2000)
+ public void testSimpleExecution() throws Exception {
+ ConcurrentHashMap<String, AtomicInteger> map = new ConcurrentHashMap<>();
+
+ ExecutorService service = sharedExecutor.createExecutorService(1, "simple-test");
+
+ // Test runnable
+ service.submit(new Counter(map, "test")).get();
+ Assert.assertEquals(1, map.get("test").get());
+
+ // Test runnable with a result
+ final Object expected = new Object();
+ Object val = service.submit(new Counter(map, "test"), expected).get();
+ Assert.assertEquals(expected, val);
+ Assert.assertEquals(2, map.get("test").get());
+
+ // Test callable.
+ val = service.submit(new Callable<Object>() {
+ @Override
+ public Object call() throws Exception {
+ return expected;
+ }
+ }).get();
+ Assert.assertEquals(expected, val);
+
+ // Tasks should be rejected after a shutdown.
+ service.shutdown();
+
+ try {
+ service.submit(new Counter(map, "test"));
+ Assert.fail("Expected rejected execution exception.");
+ } catch (RejectedExecutionException e) {
+ }
+ }
+
+ @Test(timeout=5000)
+ public void testAwaitTermination() throws Exception {
+ ExecutorService service = sharedExecutor.createExecutorService(1, "await-termination");
+
+ final Runnable runnable = new Wait(null);
+ service.submit(runnable);
+ service.shutdown();
+
+ // No notify sent hence it should fail.
+ Assert.assertFalse(service.awaitTermination(100, TimeUnit.MILLISECONDS));
+ Assert.assertFalse(service.isTerminated());
+ Assert.assertTrue(service.isShutdown());
+
+ Timer timer = new Timer(true);
+ timer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ _notify(runnable);
+ }
+ }, 100);
+
+ // Highly unlikely that there are intermittent failures, but a possiblity :-(.
+ Assert.assertTrue(service.awaitTermination(1, TimeUnit.SECONDS));
+ Assert.assertTrue(service.isTerminated());
+ Assert.assertTrue(service.isShutdown());
+
+ timer.cancel();
+ }
+
+ @Test(timeout=2000)
+ public void testSerialExecution() throws Exception {
+ ExecutorService service = sharedExecutor.createExecutorService(1, "serial-test");
+
+ // Since it is serial we should never get concurrent modification exception too.
+ List<Integer> list = new ArrayList<>();
+ Future<?> f1 = service.submit(new Wait(list));
+ List<Future<?>> futures = new ArrayList<>();
+ for (int i = 0; i < 10; ++i) {
+ futures.add(service.submit(new Appender<Integer>(list, i)));
+ }
+
+ // This shutdown does not prevent already submitted tasks from completing.
+ service.shutdown();
+
+ // Until we notify nothing moves forward.
+ Assert.assertEquals(0, list.size());
+ _notify(list);
+ f1.get();
+
+ // Wait for all futures to finish.
+ for (Future<?> f : futures) {
+ f.get();
+ }
+ Assert.assertEquals(10, list.size());
+ Assert.assertEquals(Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), list);
+ }
+
+ @Test(timeout=5000)
+ public void testParallelExecution() throws Exception {
+ ConcurrentHashMap<String, AtomicInteger> map = new ConcurrentHashMap<>();
+
+ List<Future<?>> futures = new ArrayList<>();
+ ExecutorService[] services = {
+ sharedExecutor.createExecutorService(2, "parallel-1"),
+ sharedExecutor.createExecutorService(2, "parallel-2")
+ };
+ int[] expectedCounts = {0, 0};
+ Random random = new Random();
+ for (int i = 0; i < 200; ++i) {
+ int serviceIndex = random.nextInt(2);
+ expectedCounts[serviceIndex] += 1;
+ futures.add(services[serviceIndex].submit(
+ new Runner(new Sleep(10), new Counter(map, "test" + serviceIndex))));
+ }
+ for (Future<?> future : futures) {
+ future.get();
+ }
+ Assert.assertEquals(expectedCounts[0], map.get("test0").get());
+ Assert.assertEquals(expectedCounts[1], map.get("test1").get());
+
+ // Even if one service is shutdown the other should work.
+ services[0].shutdown();
+ services[1].submit(new Counter(map, "test1")).get();
+ Assert.assertEquals(expectedCounts[1] + 1, map.get("test1").get());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
index f9de995..3317bbf 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezExecutors;
import org.apache.tez.common.TezTaskUmbilicalProtocol;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.TokenCache;
@@ -90,6 +91,8 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
private final Map<String, String> localEnv = new HashMap<String, String>();
private volatile FileSystem localFs;
private final long memoryPerExecutor;
+
+ private final TezExecutors sharedExecutor;
// TODO Support for removing queued containers, interrupting / killing specific containers - when preemption is supported
@@ -97,7 +100,8 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
public ContainerRunnerImpl(int numExecutors, String[] localDirsBase,
AtomicReference<InetSocketAddress> localAddress,
- long totalMemoryAvailableBytes) {
+ long totalMemoryAvailableBytes,
+ TezExecutors sharedExecutor) {
super("ContainerRunnerImpl");
Preconditions.checkState(numExecutors > 0,
"Invalid number of executors: " + numExecutors + ". Must be > 0");
@@ -117,6 +121,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
"memoryPerExecutorDerived=" + memoryPerExecutor +
", numExecutors=" + numExecutors
);
+ this.sharedExecutor = sharedExecutor;
}
@Override
@@ -262,7 +267,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
ShuffleHandler.get().registerApplication(request.getApplicationIdString(), jobToken, request.getUser());
TaskRunnerCallable callable = new TaskRunnerCallable(request, new Configuration(getConfig()),
new ExecutionContextImpl(localAddress.get().getHostName()), env, localDirs,
- workingDir, credentials, memoryPerExecutor);
+ workingDir, credentials, memoryPerExecutor, sharedExecutor);
ListenableFuture<ContainerExecutionResult> future = executorService.submit(callable);
Futures.addCallback(future, new TaskRunnerCallback(request, callable));
}
@@ -385,12 +390,13 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
private volatile TezTaskRunner2 taskRunner;
private volatile TaskReporter taskReporter;
private TezTaskUmbilicalProtocol umbilical;
+ private final TezExecutors sharedExecutor;
TaskRunnerCallable(SubmitWorkRequestProto request, Configuration conf,
ExecutionContext executionContext, Map<String, String> envMap,
String[] localDirs, String workingDir, Credentials credentials,
- long memoryAvailable) {
+ long memoryAvailable, TezExecutors sharedExecutor) {
this.request = request;
this.conf = conf;
this.executionContext = executionContext;
@@ -405,6 +411,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
.setDaemon(true)
.setNameFormat("TezTaskRunner_" + request.getTaskSpec().getTaskAttemptIdString()).build());
executor = MoreExecutors.listeningDecorator(executorReal);
+ this.sharedExecutor = sharedExecutor;
}
@Override
@@ -452,7 +459,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
request.getAppAttemptNumber(),
serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor, objectRegistry,
pid,
- executionContext, memoryAvailable, false, new DefaultHadoopShim());
+ executionContext, memoryAvailable, false, new DefaultHadoopShim(), sharedExecutor);
boolean shouldDie;
try {
http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java
index 322be00..17eb88c 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java
@@ -23,6 +23,8 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.common.TezExecutors;
+import org.apache.tez.common.TezSharedExecutor;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.service.ContainerRunner;
import org.apache.tez.shufflehandler.ShuffleHandler;
@@ -46,6 +48,8 @@ public class TezTestService extends AbstractService implements ContainerRunner {
private final AtomicReference<InetSocketAddress> address = new AtomicReference<InetSocketAddress>();
+ private final TezExecutors sharedExecutor;
+
public TezTestService(Configuration conf, int numExecutors, long memoryAvailable, String[] localDirs) {
super(TezTestService.class.getSimpleName());
this.numExecutors = numExecutors;
@@ -73,8 +77,9 @@ public class TezTestService extends AbstractService implements ContainerRunner {
this.shuffleHandlerConf.set(ShuffleHandler.SHUFFLE_HANDLER_LOCAL_DIRS, StringUtils.arrayToString(localDirs));
this.server = new TezTestServiceProtocolServerImpl(this, address);
+ this.sharedExecutor = new TezSharedExecutor(conf);
this.containerRunner = new ContainerRunnerImpl(numExecutors, localDirs, address,
- memoryAvailableBytes);
+ memoryAvailableBytes, sharedExecutor);
}
@Override
@@ -95,6 +100,7 @@ public class TezTestService extends AbstractService implements ContainerRunner {
containerRunner.stop();
server.stop();
ShuffleHandler.get().stop();
+ sharedExecutor.shutdownNow();
}
public InetSocketAddress getListenerAddress() {
http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
index 8b52cc9..f3403e6 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
@@ -46,6 +46,8 @@ import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.TezExecutors;
+import org.apache.tez.common.TezSharedExecutor;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DataSinkDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -235,7 +237,7 @@ public class TestMROutput {
public static LogicalIOProcessorRuntimeTask createLogicalTask(
Configuration conf,
TezUmbilical umbilical, String dagName,
- String vertexName) throws Exception {
+ String vertexName, TezExecutors sharedExecutor) throws Exception {
ProcessorDescriptor procDesc = ProcessorDescriptor.create(TestProcessor.class.getName());
List<InputSpec> inputSpecs = Lists.newLinkedList();
List<OutputSpec> outputSpecs = Lists.newLinkedList();
@@ -263,9 +265,9 @@ public class TestMROutput {
null,
new HashMap<String, String>(),
HashMultimap.<String, String>create(), null, "", new ExecutionContextImpl("localhost"),
- Runtime.getRuntime().maxMemory(), true, new DefaultHadoopShim());
+ Runtime.getRuntime().maxMemory(), true, new DefaultHadoopShim(), sharedExecutor);
}
-
+
public static class TestOutputCommitter extends OutputCommitter {
@Override
@@ -395,10 +397,13 @@ public class TestMROutput {
@Ignore
@Test
public void testPerf() throws Exception {
- LogicalIOProcessorRuntimeTask task = createLogicalTask(new Configuration(),
- new TestUmbilical(), "dag", "vertex");
+ Configuration conf = new Configuration();
+ TezSharedExecutor sharedExecutor = new TezSharedExecutor(conf);
+ LogicalIOProcessorRuntimeTask task = createLogicalTask(conf, new TestUmbilical(), "dag",
+ "vertex", sharedExecutor);
task.initialize();
task.run();
task.close();
+ sharedExecutor.shutdownNow();
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
index b69dc0c..29f9ca9 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
import org.apache.tez.common.MRFrameworkConfigs;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.common.TezSharedExecutor;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.mapreduce.TezTestUtils;
@@ -200,7 +201,7 @@ public class MapUtils {
JobConf jobConf, int mapId, Path mapInput,
TezUmbilical umbilical, String dagName,
String vertexName, List<InputSpec> inputSpecs,
- List<OutputSpec> outputSpecs) throws Exception {
+ List<OutputSpec> outputSpecs, TezSharedExecutor sharedExecutor) throws Exception {
jobConf.setInputFormat(SequenceFileInputFormat.class);
ProcessorDescriptor mapProcessorDesc = ProcessorDescriptor.create(
@@ -234,7 +235,7 @@ public class MapUtils {
serviceConsumerMetadata,
envMap,
HashMultimap.<String, String>create(), null, "", new ExecutionContextImpl("localhost"),
- Runtime.getRuntime().maxMemory(), true, new DefaultHadoopShim());
+ Runtime.getRuntime().maxMemory(), true, new DefaultHadoopShim(), sharedExecutor);
return task;
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index 7c5e2a7..eb30841 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@ -21,22 +21,10 @@ package org.apache.tez.mapreduce.processor.map;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
-import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.db.FloatSplitter;
-import org.apache.hadoop.mapreduce.split.JobSplit;
-import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,6 +40,7 @@ import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.tez.common.MRFrameworkConfigs;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.common.TezSharedExecutor;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.UserPayload;
@@ -86,8 +75,6 @@ public class TestMapProcessor {
private static FileSystem localFs = null;
private static Path workDir = null;
static float progressUpdate = 0.0f;
- final private static FsPermission JOB_FILE_PERMISSION = FsPermission
- .createImmutable((short) 0644);
static {
try {
defaultConf.set("fs.defaultFS", "file:///");
@@ -163,15 +150,17 @@ public class TestMapProcessor {
OutputDescriptor.create(OrderedPartitionedKVOutput.class.getName())
.setUserPayload(TezUtils.createUserPayloadFromConf(jobConf)), 1);
+ TezSharedExecutor sharedExecutor = new TezSharedExecutor(jobConf);
LogicalIOProcessorRuntimeTask task = MapUtils.createLogicalTask(localFs, workDir, jobConf, 0,
new Path(workDir, "map0"), new TestUmbilical(), dagName, vertexName,
- Collections.singletonList(mapInputSpec),
- Collections.singletonList(mapOutputSpec));
-
+ Collections.singletonList(mapInputSpec), Collections.singletonList(mapOutputSpec),
+ sharedExecutor);
+
task.initialize();
task.run();
task.close();
-
+ sharedExecutor.shutdownNow();
+
OutputContext outputContext = task.getOutputContexts().iterator().next();
TezTaskOutput mapOutputs = new TezTaskOutputFiles(jobConf, outputContext.getUniqueIdentifier());
@@ -236,11 +225,12 @@ public class TestMapProcessor {
OutputDescriptor.create(OrderedPartitionedKVOutput.class.getName())
.setUserPayload(TezUtils.createUserPayloadFromConf(jobConf)), 1);
+ TezSharedExecutor sharedExecutor = new TezSharedExecutor(jobConf);
final LogicalIOProcessorRuntimeTask task = MapUtils.createLogicalTask
(localFs, workDir, jobConf, 0,
new Path(workDir, "map0"), new TestUmbilical(), dagName, vertexName,
Collections.singletonList(mapInputSpec),
- Collections.singletonList(mapOutputSpec));
+ Collections.singletonList(mapOutputSpec), sharedExecutor);
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
Thread monitorProgress = new Thread(new Runnable() {
@@ -259,5 +249,6 @@ public class TestMapProcessor {
Assert.assertTrue("Progress Updates should be captured!",
progressUpdate > 0.0f && progressUpdate < 1.0f);
task.close();
+ sharedExecutor.shutdownNow();
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index ca3792f..42ea4f7 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
import org.apache.tez.common.MRFrameworkConfigs;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.common.TezSharedExecutor;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.api.InputDescriptor;
@@ -155,10 +156,11 @@ public class TestReduceProcessor {
TestUmbilical testUmbilical = new TestUmbilical();
+ TezSharedExecutor sharedExecutor = new TezSharedExecutor(jobConf);
LogicalIOProcessorRuntimeTask mapTask = MapUtils.createLogicalTask(localFs, workDir, jobConf, 0,
mapInput, testUmbilical, dagName, mapVertexName,
Collections.singletonList(mapInputSpec),
- Collections.singletonList(mapOutputSpec));
+ Collections.singletonList(mapOutputSpec), sharedExecutor);
mapTask.initialize();
mapTask.run();
@@ -225,7 +227,7 @@ public class TestReduceProcessor {
serviceConsumerMetadata,
serviceProviderEnvMap,
HashMultimap.<String, String>create(), null, "", new ExecutionContextImpl("localhost"),
- Runtime.getRuntime().maxMemory(), true, new DefaultHadoopShim());
+ Runtime.getRuntime().maxMemory(), true, new DefaultHadoopShim(), sharedExecutor);
List<Event> destEvents = new LinkedList<Event>();
destEvents.add(dme);
@@ -235,6 +237,7 @@ public class TestReduceProcessor {
sortedOut.handleEvents(destEvents);
task.run();
task.close();
+ sharedExecutor.shutdownNow();
// MRTask mrTask = (MRTask)t.getProcessor();
// TODO NEWTEZ Verify the partitioner has not been created
http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index e49791f..5c2ab77 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.RunnableWithNdc;
+import org.apache.tez.common.TezExecutors;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -158,13 +159,15 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
private final boolean initializeProcessorFirst;
private final boolean initializeProcessorIOSerially;
+ private final TezExecutors sharedExecutor;
public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, int appAttemptNumber,
Configuration tezConf, String[] localDirs, TezUmbilical tezUmbilical,
Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> envMap,
Multimap<String, String> startedInputsMap, ObjectRegistry objectRegistry,
String pid, ExecutionContext ExecutionContext, long memAvailable,
- boolean updateSysCounters, HadoopShim hadoopShim) throws IOException {
+ boolean updateSysCounters, HadoopShim hadoopShim,
+ TezExecutors sharedExecutor) throws IOException {
// Note: If adding any fields here, make sure they're cleaned up in the cleanupContext method.
// TODO Remove jobToken from here post TEZ-421
super(taskSpec, tezConf, tezUmbilical, pid, updateSysCounters);
@@ -217,6 +220,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
this.hadoopShim = hadoopShim;
this.maxEventBacklog = tezConf.getInt(TezConfiguration.TEZ_TASK_MAX_EVENT_BACKLOG,
TezConfiguration.TEZ_TASK_MAX_EVENT_BACKLOG_DEFAULT);
+ this.sharedExecutor = sharedExecutor;
}
/**
@@ -596,7 +600,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
inputSpec.getInputDescriptor().getUserPayload(), this,
serviceConsumerMetadata, envMap, initialMemoryDistributor,
inputSpec.getInputDescriptor(), inputMap, inputReadyTracker, objectRegistry,
- ExecutionContext, memAvailable);
+ ExecutionContext, memAvailable, sharedExecutor);
return inputContext;
}
@@ -611,7 +615,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
outputSpec.getOutputDescriptor().getUserPayload(), this,
serviceConsumerMetadata, envMap, initialMemoryDistributor,
outputSpec.getOutputDescriptor(), objectRegistry, ExecutionContext,
- memAvailable);
+ memAvailable, sharedExecutor);
return outputContext;
}
@@ -622,8 +626,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
taskSpec.getVertexParallelism(),
taskSpec.getTaskAttemptID(),
processorDescriptor.getUserPayload(), this,
- serviceConsumerMetadata, envMap, initialMemoryDistributor,
- processorDescriptor, inputReadyTracker, objectRegistry, ExecutionContext, memAvailable);
+ serviceConsumerMetadata, envMap, initialMemoryDistributor, processorDescriptor,
+ inputReadyTracker, objectRegistry, ExecutionContext, memAvailable, sharedExecutor);
return processorContext;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
index afb78d9..15a6485 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
@@ -32,6 +32,7 @@ import javax.annotation.Nullable;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezExecutors;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
@@ -94,12 +95,13 @@ public class TezInputContextImpl extends TezTaskContextImpl
Map<String, String> auxServiceEnv, MemoryDistributor memDist,
InputDescriptor inputDescriptor, Map<String, LogicalInput> inputs,
InputReadyTracker inputReadyTracker, ObjectRegistry objectRegistry,
- ExecutionContext ExecutionContext, long memAvailable) {
+ ExecutionContext ExecutionContext, long memAvailable,
+ TezExecutors sharedExecutor) {
super(conf, workDirs, appAttemptNumber, dagName, taskVertexName,
vertexParallelism, taskAttemptID, wrapCounters(runtimeTask,
taskVertexName, sourceVertexName, conf), runtimeTask, tezUmbilical,
serviceConsumerMetadata, auxServiceEnv, memDist, inputDescriptor,
- objectRegistry, ExecutionContext, memAvailable);
+ objectRegistry, ExecutionContext, memAvailable, sharedExecutor);
checkNotNull(inputIndex, "inputIndex is null");
checkNotNull(sourceVertexName, "sourceVertexName is null");
checkNotNull(inputs, "input map is null");
@@ -153,7 +155,6 @@ public class TezInputContextImpl extends TezTaskContextImpl
return sourceVertexName;
}
- @SuppressWarnings("deprecation")
@Override
public void fatalError(Throwable exception, String message) {
super.signalFatalError(exception, message, sourceInfo);
http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
index 1bd78d3..41e8d41 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
@@ -32,6 +32,7 @@ import javax.annotation.Nullable;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezExecutors;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
@@ -89,12 +90,13 @@ public class TezOutputContextImpl extends TezTaskContextImpl
Map<String, ByteBuffer> serviceConsumerMetadata,
Map<String, String> auxServiceEnv, MemoryDistributor memDist,
OutputDescriptor outputDescriptor, ObjectRegistry objectRegistry,
- ExecutionContext ExecutionContext, long memAvailable) {
+ ExecutionContext executionContext, long memAvailable, TezExecutors sharedExecutor) {
super(conf, workDirs, appAttemptNumber, dagName, taskVertexName,
vertexParallelism, taskAttemptID,
wrapCounters(runtimeTask, taskVertexName, destinationVertexName, conf),
runtimeTask, tezUmbilical, serviceConsumerMetadata,
- auxServiceEnv, memDist, outputDescriptor, objectRegistry, ExecutionContext, memAvailable);
+ auxServiceEnv, memDist, outputDescriptor, objectRegistry, executionContext, memAvailable,
+ sharedExecutor);
checkNotNull(outputIndex, "outputIndex is null");
checkNotNull(destinationVertexName, "destinationVertexName is null");
this.userPayload = userPayload;
@@ -138,7 +140,6 @@ public class TezOutputContextImpl extends TezTaskContextImpl
return destinationVertexName;
}
- @SuppressWarnings("deprecation")
@Override
public void fatalError(Throwable exception, String message) {
super.signalFatalError(exception, message, sourceInfo);
http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
index d03f48e..beae693 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
@@ -32,6 +32,7 @@ import java.util.Map;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezExecutors;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -63,10 +64,11 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements Proce
Map<String, ByteBuffer> serviceConsumerMetadata,
Map<String, String> auxServiceEnv, MemoryDistributor memDist,
ProcessorDescriptor processorDescriptor, InputReadyTracker inputReadyTracker, ObjectRegistry objectRegistry,
- ExecutionContext ExecutionContext, long memAvailable) {
+ ExecutionContext ExecutionContext, long memAvailable, TezExecutors sharedExecutor) {
super(conf, workDirs, appAttemptNumber, dagName, vertexName, vertexParallelism, taskAttemptID,
runtimeTask.addAndGetTezCounter(vertexName), runtimeTask, tezUmbilical, serviceConsumerMetadata,
- auxServiceEnv, memDist, processorDescriptor, objectRegistry, ExecutionContext, memAvailable);
+ auxServiceEnv, memDist, processorDescriptor, objectRegistry, ExecutionContext, memAvailable,
+ sharedExecutor);
checkNotNull(inputReadyTracker, "inputReadyTracker is null");
this.userPayload = userPayload;
this.sourceInfo = new EventMetaData(EventProducerConsumerType.PROCESSOR,
@@ -98,7 +100,6 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements Proce
}
}
- @SuppressWarnings("deprecation")
@Override
public void fatalError(Throwable exception, String message) {
super.signalFatalError(exception, message, sourceInfo);
http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
index 35abd1e..5a6a405 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
@@ -25,6 +25,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
@@ -33,6 +34,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
+import org.apache.tez.common.TezExecutors;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.EntityDescriptor;
import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -67,6 +69,7 @@ public abstract class TezTaskContextImpl implements TaskContext, Closeable {
private final int vertexParallelism;
private final ExecutionContext ExecutionContext;
private final long memAvailable;
+ private final TezExecutors sharedExecutor;
@Private
public TezTaskContextImpl(Configuration conf, String[] workDirs, int appAttemptNumber,
@@ -75,7 +78,7 @@ public abstract class TezTaskContextImpl implements TaskContext, Closeable {
TezUmbilical tezUmbilical, Map<String, ByteBuffer> serviceConsumerMetadata,
Map<String, String> auxServiceEnv, MemoryDistributor memDist,
EntityDescriptor<?> descriptor, ObjectRegistry objectRegistry,
- ExecutionContext ExecutionContext, long memAvailable) {
+ ExecutionContext ExecutionContext, long memAvailable, TezExecutors sharedExecutor) {
checkNotNull(conf, "conf is null");
checkNotNull(dagName, "dagName is null");
checkNotNull(taskVertexName, "taskVertexName is null");
@@ -85,6 +88,7 @@ public abstract class TezTaskContextImpl implements TaskContext, Closeable {
checkNotNull(auxServiceEnv, "auxServiceEnv is null");
checkNotNull(memDist, "memDist is null");
checkNotNull(descriptor, "descriptor is null");
+ checkNotNull(sharedExecutor, "sharedExecutor is null");
this.dagName = dagName;
this.taskVertexName = taskVertexName;
this.taskAttemptID = taskAttemptID;
@@ -106,6 +110,7 @@ public abstract class TezTaskContextImpl implements TaskContext, Closeable {
this.vertexParallelism = vertexParallelism;
this.ExecutionContext = ExecutionContext;
this.memAvailable = memAvailable;
+ this.sharedExecutor = sharedExecutor;
}
@Override
@@ -237,6 +242,12 @@ public abstract class TezTaskContextImpl implements TaskContext, Closeable {
return this.ExecutionContext;
}
+ @Override
+ public ExecutorService createTezFrameworkExecutorService(
+ int parallelism, String threadNameFormat) {
+ return sharedExecutor.createExecutorService(parallelism, threadNameFormat);
+ }
+
private int generateId() {
return ID_GEN.incrementAndGet();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
index e8e7391..bc911c3 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -52,7 +52,9 @@ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.tez.common.ContainerContext;
import org.apache.tez.common.ContainerTask;
import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezExecutors;
import org.apache.tez.common.TezLocalResource;
+import org.apache.tez.common.TezSharedExecutor;
import org.apache.tez.common.TezTaskUmbilicalProtocol;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.Limits;
@@ -120,6 +122,7 @@ public class TezChild {
private int taskCount = 0;
private TezVertexID lastVertexID;
private final HadoopShim hadoopShim;
+ private final TezExecutors sharedExecutor;
public TezChild(Configuration conf, String host, int port, String containerIdentifier,
String tokenIdentifier, int appAttemptNumber, String workingDir, String[] localDirs,
@@ -141,6 +144,7 @@ public class TezChild {
this.user = user;
this.updateSysCounters = updateSysCounters;
this.hadoopShim = hadoopShim;
+ this.sharedExecutor = new TezSharedExecutor(defaultConf);
getTaskMaxSleepTime = defaultConf.getInt(
TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX,
@@ -258,7 +262,7 @@ public class TezChild {
localDirs, containerTask.getTaskSpec(), appAttemptNumber,
serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap, taskReporter,
executor, objectRegistry, pid, executionContext, memAvailable, updateSysCounters,
- hadoopShim);
+ hadoopShim, sharedExecutor);
boolean shouldDie;
try {
TaskRunner2Result result = taskRunner.run();
http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
index 96f8474..306f2a7 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
@@ -35,6 +35,8 @@ import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.tez.common.TezExecutors;
+import org.apache.tez.common.TezSharedExecutor;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.hadoop.shim.HadoopShim;
@@ -102,6 +104,25 @@ public class TezTaskRunner2 {
// The callable which is being used to execute the task.
private volatile TaskRunner2Callable taskRunnerCallable;
+ // This instance is set only if the runner was not configured explicity and will be shutdown
+ // when this task is finished.
+ private final TezSharedExecutor localExecutor;
+
+ @Deprecated
+ public TezTaskRunner2(Configuration tezConf, UserGroupInformation ugi, String[] localDirs,
+ TaskSpec taskSpec, int appAttemptNumber,
+ Map<String, ByteBuffer> serviceConsumerMetadata,
+ Map<String, String> serviceProviderEnvMap,
+ Multimap<String, String> startedInputsMap,
+ TaskReporterInterface taskReporter, ExecutorService executor,
+ ObjectRegistry objectRegistry, String pid,
+ ExecutionContext executionContext, long memAvailable,
+ boolean updateSysCounters, HadoopShim hadoopShim) throws IOException {
+ this(tezConf, ugi, localDirs, taskSpec, appAttemptNumber, serviceConsumerMetadata,
+ serviceProviderEnvMap, startedInputsMap, taskReporter, executor, objectRegistry,
+ pid, executionContext, memAvailable, updateSysCounters, hadoopShim, null);
+ }
+
public TezTaskRunner2(Configuration tezConf, UserGroupInformation ugi, String[] localDirs,
TaskSpec taskSpec, int appAttemptNumber,
Map<String, ByteBuffer> serviceConsumerMetadata,
@@ -110,7 +131,8 @@ public class TezTaskRunner2 {
TaskReporterInterface taskReporter, ExecutorService executor,
ObjectRegistry objectRegistry, String pid,
ExecutionContext executionContext, long memAvailable,
- boolean updateSysCounters, HadoopShim hadoopShim) throws
+ boolean updateSysCounters, HadoopShim hadoopShim,
+ TezExecutors sharedExecutor) throws
IOException {
this.ugi = ugi;
this.taskReporter = taskReporter;
@@ -125,9 +147,11 @@ public class TezTaskRunner2 {
taskConf.set(entry.getKey(), entry.getValue());
}
}
+ localExecutor = sharedExecutor == null ? new TezSharedExecutor(tezConf) : null;
this.task = new LogicalIOProcessorRuntimeTask(taskSpec, appAttemptNumber, taskConf, localDirs,
umbilicalAndErrorHandler, serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap,
- objectRegistry, pid, executionContext, memAvailable, updateSysCounters, hadoopShim);
+ objectRegistry, pid, executionContext, memAvailable, updateSysCounters, hadoopShim,
+ sharedExecutor == null ? localExecutor : sharedExecutor);
}
/**
@@ -258,6 +282,9 @@ public class TezTaskRunner2 {
if (taskKillStartTime != 0) {
LOG.info("Time taken to interrupt task={}", (System.currentTimeMillis() - taskKillStartTime));
}
+ if (localExecutor != null) {
+ localExecutor.shutdown();
+ }
Thread.interrupted();
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
index ecfc424..c1bb3a1 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
@@ -30,6 +30,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import org.apache.tez.common.TezSharedExecutor;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -81,10 +82,11 @@ public class TestLogicalIOProcessorRuntimeTask {
TezTaskAttemptID taId2 = createTaskAttemptID(vertexId, 2);
TaskSpec task2 = createTaskSpec(taId2, "dag2", "vertex1", 10);
+ TezSharedExecutor sharedExecutor = new TezSharedExecutor(tezConf);
LogicalIOProcessorRuntimeTask lio1 = new LogicalIOProcessorRuntimeTask(task1, 0, tezConf, null,
umbilical, serviceConsumerMetadata, new HashMap<String, String>(), startedInputsMap, null,
"", new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory(), true,
- new DefaultHadoopShim());
+ new DefaultHadoopShim(), sharedExecutor);
try {
lio1.initialize();
@@ -105,6 +107,7 @@ public class TestLogicalIOProcessorRuntimeTask {
assertEquals(30, lio1.getOutputContexts().iterator().next().getVertexParallelism());
} catch(Exception e) {
fail();
+ sharedExecutor.shutdownNow();
} finally {
cleanupAndTest(lio1);
}
@@ -114,7 +117,7 @@ public class TestLogicalIOProcessorRuntimeTask {
LogicalIOProcessorRuntimeTask lio2 = new LogicalIOProcessorRuntimeTask(task2, 0, tezConf, null,
umbilical, serviceConsumerMetadata, new HashMap<String, String>(), startedInputsMap, null,
"", new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory(), true,
- new DefaultHadoopShim());
+ new DefaultHadoopShim(), sharedExecutor);
try {
lio2.initialize();
lio2.run();
@@ -134,6 +137,7 @@ public class TestLogicalIOProcessorRuntimeTask {
fail();
} finally {
cleanupAndTest(lio2);
+ sharedExecutor.shutdownNow();
}
}
@@ -275,7 +279,7 @@ public class TestLogicalIOProcessorRuntimeTask {
@Override
public void start() throws Exception {
startCount++;
- this.vertexParallelism = getContext().getVertexParallelism();
+ vertexParallelism = getContext().getVertexParallelism();
getContext().notifyProgress();
}
@@ -315,7 +319,7 @@ public class TestLogicalIOProcessorRuntimeTask {
public void start() throws Exception {
System.err.println("Out started");
startCount++;
- this.vertexParallelism = getContext().getVertexParallelism();
+ vertexParallelism = getContext().getVertexParallelism();
getContext().notifyProgress();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java
index d16b880..bf4fdf6 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java
@@ -27,6 +27,7 @@ import java.util.Map;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.TezSharedExecutor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -61,10 +62,11 @@ public class TestProcessorContext {
TaskSpec mockSpec = mock(TaskSpec.class);
when(mockSpec.getInputs()).thenReturn(Collections.singletonList(mock(InputSpec.class)));
when(mockSpec.getOutputs()).thenReturn(Collections.singletonList(mock(OutputSpec.class)));
- LogicalIOProcessorRuntimeTask runtimeTask = new LogicalIOProcessorRuntimeTask(
- mockSpec, 1,
- new Configuration(), new String[]{"/"},
- tezUmbilical, null, null, null, null, "", null, 1024, false, new DefaultHadoopShim());
+ Configuration conf = new Configuration();
+ TezSharedExecutor sharedExecutor = new TezSharedExecutor(conf);
+ LogicalIOProcessorRuntimeTask runtimeTask = new LogicalIOProcessorRuntimeTask(mockSpec, 1, conf,
+ new String[]{"/"}, tezUmbilical, null, null, null, null, "", null, 1024, false,
+ new DefaultHadoopShim(), sharedExecutor);
LogicalIOProcessorRuntimeTask mockTask = spy(runtimeTask);
Map<String, ByteBuffer> serviceConsumerMetadata = Maps.newHashMap();
Map<String, String> auxServiceEnv = Maps.newHashMap();
@@ -94,7 +96,8 @@ public class TestProcessorContext {
inputReadyTracker,
objectRegistry,
execContext,
- memAvailable);
+ memAvailable,
+ sharedExecutor);
assertEquals(dagNumber, procContext.getDagIdentifier());
assertEquals(appAttemptNumber, procContext.getDAGAttemptNumber());
@@ -107,5 +110,6 @@ public class TestProcessorContext {
// test auto call of notifyProgress
procContext.setProgress(0.1f);
verify(mockTask, times(1)).notifyProgressInvocation();
+ sharedExecutor.shutdown();
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
index adcbe4a..07b9d33 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
@@ -49,6 +49,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.TezExecutors;
+import org.apache.tez.common.TezSharedExecutor;
import org.apache.tez.common.counters.CounterGroup;
import org.apache.tez.common.counters.FileSystemCounter;
import org.apache.tez.common.counters.TaskCounter;
@@ -778,19 +780,21 @@ public class TestTaskExecution2 {
new TaskSpec(taskAttemptId, "dagName", "vertexName", -1, processorDescriptor,
new ArrayList<InputSpec>(), new ArrayList<OutputSpec>(), null, null);
+ TezExecutors sharedExecutor = new TezSharedExecutor(tezConf);
TezTaskRunner2 taskRunner;
if (testRunner) {
taskRunner = new TezTaskRunner2ForTest(tezConf, ugi, localDirs, taskSpec, 1,
new HashMap<String, ByteBuffer>(), new HashMap<String, String>(),
HashMultimap.<String, String>create(), taskReporter,
executor, null, "", new ExecutionContextImpl("localhost"),
- Runtime.getRuntime().maxMemory(), updateSysCounters);
+ Runtime.getRuntime().maxMemory(), updateSysCounters, sharedExecutor);
} else {
taskRunner = new TezTaskRunner2(tezConf, ugi, localDirs, taskSpec, 1,
new HashMap<String, ByteBuffer>(), new HashMap<String, String>(),
HashMultimap.<String, String>create(), taskReporter,
executor, null, "", new ExecutionContextImpl("localhost"),
- Runtime.getRuntime().maxMemory(), updateSysCounters, new DefaultHadoopShim());
+ Runtime.getRuntime().maxMemory(), updateSysCounters, new DefaultHadoopShim(),
+ sharedExecutor);
}
return taskRunner;
@@ -815,10 +819,12 @@ public class TestTaskExecution2 {
String pid,
ExecutionContext executionContext,
long memAvailable,
- boolean updateSysCounters) throws IOException {
+ boolean updateSysCounters,
+ TezExecutors sharedExecutor) throws IOException {
super(tezConf, ugi, localDirs, taskSpec, appAttemptNumber, serviceConsumerMetadata,
serviceProviderEnvMap, startedInputsMap, taskReporter, executor, objectRegistry, pid,
- executionContext, memAvailable, updateSysCounters, new DefaultHadoopShim());
+ executionContext, memAvailable, updateSysCounters, new DefaultHadoopShim(),
+ sharedExecutor);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTezTaskRunner2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTezTaskRunner2.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTezTaskRunner2.java
index f58421a..6876df9 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTezTaskRunner2.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTezTaskRunner2.java
@@ -20,13 +20,13 @@ package org.apache.tez.runtime.task;
import static org.mockito.Mockito.*;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.tez.common.ProtoConverters;
+import org.apache.tez.common.TezExecutors;
+import org.apache.tez.common.TezSharedExecutor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.hadoop.shim.DefaultHadoopShim;
import org.apache.tez.runtime.api.impl.InputSpec;
@@ -52,13 +52,15 @@ public class TestTezTaskRunner2 {
List<OutputSpec> outputSpecList = new ArrayList<>();
TaskSpec taskSpec = new TaskSpec("dagName", "vertexName", 1, mock(ProcessorDescriptor.class),
inputSpecList, outputSpecList, null, taskConf);
+ TezExecutors sharedExecutor = new TezSharedExecutor(conf);
TezTaskRunner2 taskRunner2 = new TezTaskRunner2(conf, mock(UserGroupInformation.class),
localDirs, taskSpec, 1, null, null, null, mock(TaskReporter.class), null, null, "pid",
- null, 1000, false, new DefaultHadoopShim());
+ null, 1000, false, new DefaultHadoopShim(), sharedExecutor);
Assert.assertEquals("global1", taskRunner2.task.getTaskConf().get("global"));
Assert.assertEquals("task1", taskRunner2.task.getTaskConf().get("global_override"));
Assert.assertEquals("task1", taskRunner2.task.getTaskConf().get("task"));
+ sharedExecutor.shutdownNow();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
index 38a60a2..f549e8f 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
+import org.apache.tez.common.TezSharedExecutor;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.UserPayload;
@@ -125,7 +126,8 @@ public class TestOnFileUnorderedKVOutput {
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, IntWritable.class.getName());
- OutputContext outputContext = createOutputContext(conf);
+ TezSharedExecutor sharedExecutor = new TezSharedExecutor(conf);
+ OutputContext outputContext = createOutputContext(conf, sharedExecutor);
UnorderedKVOutput kvOutput = new UnorderedKVOutput(outputContext, 1);
@@ -155,6 +157,7 @@ public class TestOnFileUnorderedKVOutput {
assertEquals(outputContext.getUniqueIdentifier(), shufflePayload.getPathComponent());
assertEquals(shufflePort, shufflePayload.getPort());
assertEquals("localhost", shufflePayload.getHost());
+ sharedExecutor.shutdownNow();
}
@Test(timeout = 30000)
@@ -167,7 +170,8 @@ public class TestOnFileUnorderedKVOutput {
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB, 1);
- OutputContext outputContext = createOutputContext(conf);
+ TezSharedExecutor sharedExecutor = new TezSharedExecutor(conf);
+ OutputContext outputContext = createOutputContext(conf, sharedExecutor);
UnorderedKVOutput kvOutput = new UnorderedKVOutput(outputContext, 1);
@@ -202,9 +206,11 @@ public class TestOnFileUnorderedKVOutput {
assertFalse(shufflePayload.hasEmptyPartitions());
assertEquals(shufflePort, shufflePayload.getPort());
assertEquals("localhost", shufflePayload.getHost());
+ sharedExecutor.shutdownNow();
}
- private OutputContext createOutputContext(Configuration conf) throws IOException {
+ private OutputContext createOutputContext(Configuration conf, TezSharedExecutor sharedExecutor)
+ throws IOException {
int appAttemptNumber = 1;
TezUmbilical tezUmbilical = mock(TezUmbilical.class);
String dagName = "currentDAG";
@@ -219,11 +225,10 @@ public class TestOnFileUnorderedKVOutput {
TaskSpec mockSpec = mock(TaskSpec.class);
when(mockSpec.getInputs()).thenReturn(Collections.singletonList(mock(InputSpec.class)));
when(mockSpec.getOutputs()).thenReturn(Collections.singletonList(mock(OutputSpec.class)));
- task = new LogicalIOProcessorRuntimeTask(
- mockSpec, appAttemptNumber,
- new Configuration(), new String[]{"/"},
- tezUmbilical, null, null, null, null, "", null, 1024, false, new DefaultHadoopShim());
-
+ task = new LogicalIOProcessorRuntimeTask(mockSpec, appAttemptNumber, new Configuration(),
+ new String[]{"/"}, tezUmbilical, null, null, null, null, "", null, 1024, false,
+ new DefaultHadoopShim(), sharedExecutor);
+
LogicalIOProcessorRuntimeTask runtimeTask = spy(task);
Map<String, String> auxEnv = new HashMap<String, String>();
@@ -240,7 +245,7 @@ public class TestOnFileUnorderedKVOutput {
appAttemptNumber, tezUmbilical, dagName, taskVertexName, destinationVertexName,
-1, taskAttemptID, 0, userPayload, runtimeTask,
null, auxEnv, new MemoryDistributor(1, 1, conf) , outputDescriptor, null,
- new ExecutionContextImpl("localhost"), 2048);
+ new ExecutionContextImpl("localhost"), 2048, new TezSharedExecutor(defaultConf));
verify(runtimeTask, times(1)).addAndGetTezCounter(destinationVertexName);
verify(runtimeTask, times(1)).getTaskStatistics();
// verify output stats object got created