You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ha...@apache.org on 2017/04/20 09:16:50 UTC

tez git commit: TEZ-1187. Add a framework ExecutorService which shares threads (harishjp)

Repository: tez
Updated Branches:
  refs/heads/master fb0e45bf7 -> a5179d649


TEZ-1187. Add a framework ExecutorService which shares threads (harishjp)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a5179d64
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a5179d64
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a5179d64

Branch: refs/heads/master
Commit: a5179d64937244e21694560f8d52d320ebca20c7
Parents: fb0e45b
Author: Harish JP <ha...@gmail.com>
Authored: Thu Apr 20 14:45:44 2017 +0530
Committer: Harish JP <ha...@gmail.com>
Committed: Thu Apr 20 14:45:44 2017 +0530

----------------------------------------------------------------------
 .../apache/tez/dag/api/TezConfiguration.java    |  18 +
 .../org/apache/tez/runtime/api/TaskContext.java |  17 +-
 .../org/apache/tez/common/TezExecutors.java     |  52 +++
 .../apache/tez/common/TezSharedExecutor.java    | 338 +++++++++++++++++++
 .../tez/common/TestTezSharedExecutor.java       | 256 ++++++++++++++
 .../tez/service/impl/ContainerRunnerImpl.java   |  15 +-
 .../apache/tez/service/impl/TezTestService.java |   8 +-
 .../tez/mapreduce/output/TestMROutput.java      |  15 +-
 .../tez/mapreduce/processor/MapUtils.java       |   5 +-
 .../processor/map/TestMapProcessor.java         |  29 +-
 .../processor/reduce/TestReduceProcessor.java   |   7 +-
 .../runtime/LogicalIOProcessorRuntimeTask.java  |  14 +-
 .../runtime/api/impl/TezInputContextImpl.java   |   7 +-
 .../runtime/api/impl/TezOutputContextImpl.java  |   7 +-
 .../api/impl/TezProcessorContextImpl.java       |   7 +-
 .../runtime/api/impl/TezTaskContextImpl.java    |  13 +-
 .../org/apache/tez/runtime/task/TezChild.java   |   6 +-
 .../apache/tez/runtime/task/TezTaskRunner2.java |  31 +-
 .../TestLogicalIOProcessorRuntimeTask.java      |  12 +-
 .../runtime/api/impl/TestProcessorContext.java  |  14 +-
 .../tez/runtime/task/TestTaskExecution2.java    |  14 +-
 .../tez/runtime/task/TestTezTaskRunner2.java    |   8 +-
 .../output/TestOnFileUnorderedKVOutput.java     |  23 +-
 23 files changed, 839 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 40f84e6..c0179f8 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -1790,4 +1790,22 @@ public class TezConfiguration extends Configuration {
       TEZ_PREFIX + "am.client.heartbeat.poll.interval.millis";
   public static final int TEZ_AM_CLIENT_HEARTBEAT_POLL_INTERVAL_MILLIS_DEFAULT = -1;
 
+  /**
+   * Int value. Minimum number of threads to be allocated by TezSharedExecutor.
+   */
+  @Private
+  @ConfigurationScope(Scope.AM)
+  public static final String TEZ_SHARED_EXECUTOR_MIN_THREADS = "tez.shared-executor.min-threads";
+  public static final int TEZ_SHARED_EXECUTOR_MIN_THREADS_DEFAULT = 0;
+
+  /**
+   * Int value. Maximum number of threads to be allocated by TezSharedExecutor. If value is negative
+   * then Integer.MAX_VALUE is used as the limit.
+   * Default: Integer.MAX_VALUE.
+   */
+  @Private
+  @ConfigurationScope(Scope.AM)
+  public static final String TEZ_SHARED_EXECUTOR_MAX_THREADS = "tez.shared-executor.max-threads";
+  public static final int TEZ_SHARED_EXECUTOR_MAX_THREADS_DEFAULT = -1;
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java
index b5e42bc..dd2951a 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java
@@ -20,6 +20,7 @@ package org.apache.tez.runtime.api;
 
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 
 import javax.annotation.Nullable;
 
@@ -234,5 +235,19 @@ public interface TaskContext {
    * @return the execution context
    */
   public ExecutionContext getExecutionContext();
-    
+
+  /**
+   * Create a new ExecutorService with the given parallelism and thread name format. The parallelism
+   * might not be guaranteed. The service returned works with tez framework, currently it provides
+   * thread reuse across tasks.
+   * Note: This is an unstable api, and is not recommended to be used by external users. Please wait
+   * until API and code is stablized by use in Tez processors, input and outputs.
+   * @param parallelism The expected parallelism for for this ExecutorService.
+   * @param threadNameFormat The thread name format, format will be given one parameter, threadId.
+   * @return An ExecutorService instance.
+   */
+  @Private
+  @Unstable
+  public ExecutorService createTezFrameworkExecutorService(
+      int parallelism, String threadNameFormat);
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-common/src/main/java/org/apache/tez/common/TezExecutors.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezExecutors.java b/tez-common/src/main/java/org/apache/tez/common/TezExecutors.java
new file mode 100644
index 0000000..a74c8ad
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/common/TezExecutors.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * Interface to capture factory of ExecutorService.
+ */
+@Private
+@Unstable
+public interface TezExecutors {
+
+  /**
+   * Create a ExecutorService with the given parameters.
+   *
+   * @param parallelism Represents total number of tasks to be executed in parallel.
+   * @param threadNameFormat The name the thread should take when executing tasks from this executor
+   * @return An ExecutorService.
+   */
+  ExecutorService createExecutorService(int parallelism, String threadNameFormat);
+
+  /**
+   * Shutdown all the ExecutorService created using this factory.
+   */
+  void shutdown();
+
+  /**
+   * Shutdown all the ExecutorService created using this factory. It will discard any tasks which
+   * are not running and interrupt the running tasks.
+   */
+  void shutdownNow();
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-common/src/main/java/org/apache/tez/common/TezSharedExecutor.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezSharedExecutor.java b/tez-common/src/main/java/org/apache/tez/common/TezSharedExecutor.java
new file mode 100644
index 0000000..93bf3cc
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/common/TezSharedExecutor.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.TezConfiguration;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * An ExecutorService factory which shares threads between executors created using this service.
+ */
+@Private
+@Unstable
+public class TezSharedExecutor implements TezExecutors {
+
+  // The shared executor service which will be used to execute all the tasks.
+  private final ThreadPoolExecutor service;
+
+  private final DelayedExecutionPoller poller;
+
+  public TezSharedExecutor(Configuration conf) {
+    // The default value is 0. We could start with a few threads so that thread pool is never empty.
+    int minThreads = conf.getInt(TezConfiguration.TEZ_SHARED_EXECUTOR_MIN_THREADS,
+        TezConfiguration.TEZ_SHARED_EXECUTOR_MIN_THREADS_DEFAULT);
+    // The default value is Integer.MAX_VALUE, but ExecutorServiceInternal will do the rate limiting
+    // of total numbers of tasks and hence the num threads will be bounded.
+    int maxThreads = conf.getInt(TezConfiguration.TEZ_SHARED_EXECUTOR_MAX_THREADS,
+        TezConfiguration.TEZ_SHARED_EXECUTOR_MAX_THREADS_DEFAULT);
+    if (maxThreads < 0) {
+      maxThreads = Integer.MAX_VALUE;
+    }
+    this.service = new ThreadPoolExecutor(
+        minThreads, maxThreads,
+        // The timeout is to give thread a chance to be re-used instead of being cleaned up.
+        60, TimeUnit.SECONDS,
+        new SynchronousQueue<Runnable>(),
+        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("TezSharedExecutor: %d").build());
+
+    // Setup polling thread to pick new tasks from the underlying executors.
+    poller = new DelayedExecutionPoller(service);
+    poller.start();
+  }
+
+  public ExecutorService createExecutorService(int poolSize, String threadName) {
+    return new ExecutorServiceInternal(poolSize, threadName);
+  }
+
+  // Should we allow a shared service shutdown, once this shutdown is complete, all the executors
+  // are in shutdown mode and will throw exception if we try to submit new tasks. And already
+  // submitted tasks in the ExecutorServiceInternal which are not yet submitted to the shared
+  // service will not be executed. That break contracts, we can fix this by tracking that the
+  // service is shutdown and wait until all the dependent.
+  public void shutdown() {
+    service.shutdown();
+    poller.interrupt();
+  }
+
+  public void shutdownNow() {
+    service.shutdownNow();
+    poller.interrupt();
+  }
+
+  @Override
+  protected void finalize() {
+    this.shutdown();
+  }
+
+  private static class DelayedExecutionPoller extends Thread {
+    // Store service reference in this static class to prevent a reference of TezSharedExecutor from
+    // being held inside a non static class which prevents cleanup via GC.
+    private final ThreadPoolExecutor service;
+
+    // A queue which contains instances which have tasks to be executed.
+    private final LinkedBlockingQueue<ExecutorServiceInternal> executeQueue =
+        new LinkedBlockingQueue<>();
+
+    DelayedExecutionPoller(ThreadPoolExecutor service) {
+      super("DelayedExecutionPoller");
+      this.setDaemon(true);
+      this.service = service;
+    }
+
+    void add(ExecutorServiceInternal es) {
+      executeQueue.add(es);
+    }
+
+    @Override
+    public void run() {
+      while (!service.isShutdown()) {
+        try {
+          executeQueue.take().tryExecute();
+        } catch (InterruptedException e) {
+        }
+      }
+    }
+  }
+
+  /*
+   * The internal shared executor service which delegates all the execution to the shared service.
+   * It allows managing a given instance of ExecutorService independently of other instances created
+   * in the same service.
+   *
+   * - It stores a queue of submitted tasks and submits only the configured poolSize number of tasks
+   *   into the shared executor service.
+   * - Stores a list of futures used implement shutdownNow and awaitTermination.
+   */
+  private class ExecutorServiceInternal extends AbstractExecutorService {
+    // This contains all the tasks which are submitted through this ExecutorService and has not
+    // finished, we use this to implement shutdownNow and awaitForTermination.
+    // Note: This should have been an Set, but we do not have a concurrent set.
+    private final ConcurrentHashMap<ManagedFutureTask<?>, Boolean> futures =
+        new ConcurrentHashMap<>();
+
+    // Number of tasks currently submitted by this executor to the common executor service.
+    private final AtomicInteger numTasksSubmitted = new AtomicInteger();
+
+    // The list of pending tasks to be submitted on behalf of this service.
+    private final LinkedBlockingQueue<ManagedFutureTask<?>> pendingTasks =
+        new LinkedBlockingQueue<>();
+
+    // Set to 0 when shutdown is complete, a CountDownLatch is used to enable wait for shutdown in
+    // awaitTermination.
+    private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+    // The thread name to be used for threads executing tasks of this executor.
+    private final String threadName;
+
+    // Total number of threads to be used.
+    private final int poolSize;
+
+    ExecutorServiceInternal(int poolSize, String threadName) {
+      Preconditions.checkArgument(poolSize > 0, "Expected poolSize > 0");
+      this.threadName = threadName;
+      this.poolSize = poolSize;
+    }
+
+    // A FutureTask which we will use to wrap all the runnable and callable. It adds and removes
+    // from the futures set above. And also notifies TezSharedExecutor to pick new tasks from the
+    // current ExecutorServiceInternal instance.
+    private class ManagedFutureTask<V> extends FutureTask<V> {
+      // Set to true if this task was submitted to the shared ExecutorService.
+      private boolean submitted = false;
+
+      ManagedFutureTask(Runnable runnable, V value) {
+        super(runnable, value);
+        addFuture(this);
+      }
+
+      ManagedFutureTask(Callable<V> callable) {
+        super(callable);
+        addFuture(this);
+      }
+
+      @Override
+      public void run() {
+        Thread thisThread = Thread.currentThread();
+        String savedThreadName = null;
+        if (threadName != null) {
+          savedThreadName = thisThread.getName();
+          thisThread.setName(String.format(threadName, thisThread.getId()));
+        }
+        try {
+          super.run();
+        } finally {
+          if (threadName != null) {
+            thisThread.setName(savedThreadName);
+          }
+        }
+      }
+
+      // There is a race b/w cancel and submit hence the synchronization.
+      synchronized void submit() {
+        submitted = true;
+        service.execute(this);
+      }
+
+      @Override
+      public void done() {
+        removeFuture(this);
+        synchronized (this) {
+          if (submitted) { // Decrement only if this task was submitted.
+            numTasksSubmitted.decrementAndGet();
+          }
+        }
+        // Add internal executor service to poller to schedule another task if available.
+        // We do this instead of invoking tryExecute here, to give a chance for this thread to be
+        // reused. But its still possible that a new thread is created.
+        poller.add(ExecutorServiceInternal.this);
+      }
+    }
+
+    private void addFuture(ManagedFutureTask<?> future) {
+      futures.put(future, Boolean.TRUE);
+      // If already shutdown, reject this task.
+      if (isShutdown()) {
+        service.getRejectedExecutionHandler().rejectedExecution(future, service);
+      }
+    }
+
+    private void removeFuture(ManagedFutureTask<?> future) {
+      futures.remove(future);
+    }
+
+    // Return our internal future task so that all the tasks submitted are tracked and cleaned up.
+    @SuppressWarnings("unchecked")
+    @Override
+    protected <T> ManagedFutureTask<T> newTaskFor(Runnable runnable, T value) {
+      if (runnable instanceof ManagedFutureTask) {
+        return (ManagedFutureTask<T>)runnable;
+      }
+      return new ManagedFutureTask<T>(runnable, value);
+    }
+
+    @Override
+    protected <T> ManagedFutureTask<T> newTaskFor(Callable<T> callable) {
+      return new ManagedFutureTask<T>(callable);
+    }
+
+    @Override
+    public void shutdown() {
+      shutdownLatch.countDown();
+    }
+
+    @Override
+    public List<Runnable> shutdownNow() {
+      shutdownLatch.countDown();
+      List<Runnable> pending = new ArrayList<>(pendingTasks.size());
+      pendingTasks.drainTo(pending);
+      // cancel all futures, interrupt if its running.
+      for (ManagedFutureTask<?> future : futures.keySet()) {
+        future.cancel(true);
+      }
+      return pending;
+    }
+
+    @Override
+    public boolean isShutdown() {
+      return shutdownLatch.getCount() == 0 || service.isShutdown();
+    }
+
+    @Override
+    public boolean isTerminated() {
+      return isShutdown() && futures.isEmpty();
+    }
+
+    @Override
+    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+      long deadline = System.nanoTime() + unit.toNanos(timeout);
+      // Wait for shutdown to be invoked.
+      if (!shutdownLatch.await(timeout, unit)) {
+        return false;
+      }
+      // Wait for the remaining futures to finish.
+      for (ManagedFutureTask<?> future : futures.keySet()) {
+        long nanosLeft = deadline - System.nanoTime();
+        if (nanosLeft <= 0) {
+          return false;
+        }
+        try {
+          future.get(nanosLeft, TimeUnit.NANOSECONDS);
+        } catch (ExecutionException | CancellationException ignore) {
+        } catch (TimeoutException e) {
+          return false;
+        }
+      }
+      return true;
+    }
+
+    // Submit a task if task is available and poolSize has not been reached.
+    private void tryExecute() {
+      while (!pendingTasks.isEmpty()) {
+        int numTasks = numTasksSubmitted.get();
+        if (numTasks >= poolSize) {
+          return;
+        }
+        if (numTasksSubmitted.compareAndSet(numTasks, numTasks + 1)) {
+          ManagedFutureTask<?> task = pendingTasks.poll();
+          // This breaks a contract unfortunately. If a task is submitted and it ends up in a
+          // queue and then the shared service is shutdown then this job cannot be executed, which
+          // is not the contract, ideally it should execute the task.
+          if (task == null || task.isCancelled() || service.isShutdown()) {
+            numTasksSubmitted.decrementAndGet();
+          } else {
+            task.submit();
+          }
+        }
+      }
+    }
+
+    @Override
+    public void execute(Runnable command) {
+      this.pendingTasks.add(newTaskFor(command, null));
+      this.tryExecute();
+    }
+
+    @Override
+    protected void finalize() {
+      this.shutdown();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-common/src/test/java/org/apache/tez/common/TestTezSharedExecutor.java
----------------------------------------------------------------------
diff --git a/tez-common/src/test/java/org/apache/tez/common/TestTezSharedExecutor.java b/tez-common/src/test/java/org/apache/tez/common/TestTezSharedExecutor.java
new file mode 100644
index 0000000..8d87846
--- /dev/null
+++ b/tez-common/src/test/java/org/apache/tez/common/TestTezSharedExecutor.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestTezSharedExecutor {
+
+  private static class Sleep implements Runnable {
+    private final long sleepTime;
+    Sleep(long sleepTime) {
+      this.sleepTime = sleepTime;
+    }
+    @Override
+    public void run() {
+      try {
+        Thread.sleep(sleepTime);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  private static class Wait implements Runnable {
+    private final Object ref;
+    Wait(Object ref) {
+      this.ref = ref == null ? this : ref;
+    }
+    @Override
+    public void run() {
+      try {
+        synchronized (ref) {
+          ref.wait();
+        }
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  private static class Counter implements Runnable {
+    private final AtomicInteger counter;
+    Counter(ConcurrentHashMap<String, AtomicInteger> map, String tag) {
+      if (!map.contains(tag)) {
+        map.putIfAbsent(tag, new AtomicInteger(0));
+      }
+      this.counter = map.get(tag);
+    }
+    @Override
+    public void run() {
+      counter.getAndIncrement();
+    }
+  }
+
+  private static class Appender<T> implements Runnable {
+    private final Collection<T> collection;
+    private final T obj;
+    Appender(Collection<T> collection, T obj) {
+      this.collection = collection;
+      this.obj = obj;
+    }
+    @Override
+    public void run() {
+      collection.add(obj);
+    }
+  }
+
+  private static class Runner implements Runnable {
+    private Runnable[] runnables;
+    Runner(Runnable ... runnables) {
+      this.runnables = runnables;
+    }
+    @Override
+    public void run() {
+      for (Runnable runnable : runnables) {
+        runnable.run();
+      }
+    }
+  }
+
+  private void _notify(Object obj) {
+    synchronized (obj) {
+      obj.notify();
+    }
+  }
+
+  private TezSharedExecutor sharedExecutor;
+
+  @Before
+  public void setup() {
+    sharedExecutor = new TezSharedExecutor(new Configuration());
+  }
+
+  @After
+  public void cleanup() {
+    sharedExecutor.shutdownNow();
+    sharedExecutor = null;
+  }
+
+  @Test(timeout=2000)
+  public void testSimpleExecution() throws Exception {
+    ConcurrentHashMap<String, AtomicInteger> map = new ConcurrentHashMap<>();
+
+    ExecutorService service = sharedExecutor.createExecutorService(1, "simple-test");
+
+    // Test runnable
+    service.submit(new Counter(map, "test")).get();
+    Assert.assertEquals(1, map.get("test").get());
+
+    // Test runnable with a result
+    final Object expected = new Object();
+    Object val = service.submit(new Counter(map, "test"), expected).get();
+    Assert.assertEquals(expected, val);
+    Assert.assertEquals(2, map.get("test").get());
+
+    // Test callable.
+    val = service.submit(new Callable<Object>() {
+      @Override
+      public Object call() throws Exception {
+        return expected;
+      }
+    }).get();
+    Assert.assertEquals(expected, val);
+
+    // Tasks should be rejected after a shutdown.
+    service.shutdown();
+
+    try {
+      service.submit(new Counter(map, "test"));
+      Assert.fail("Expected rejected execution exception.");
+    } catch (RejectedExecutionException e) {
+    }
+  }
+
+  @Test(timeout=5000)
+  public void testAwaitTermination() throws Exception {
+    ExecutorService service = sharedExecutor.createExecutorService(1, "await-termination");
+
+    final Runnable runnable = new Wait(null);
+    service.submit(runnable);
+    service.shutdown();
+
+    // No notify sent hence it should fail.
+    Assert.assertFalse(service.awaitTermination(100, TimeUnit.MILLISECONDS));
+    Assert.assertFalse(service.isTerminated());
+    Assert.assertTrue(service.isShutdown());
+
+    Timer timer = new Timer(true);
+    timer.schedule(new TimerTask() {
+      @Override
+      public void run() {
+        _notify(runnable);
+      }
+    }, 100);
+
+    // Highly unlikely that there are intermittent failures, but a possiblity :-(.
+    Assert.assertTrue(service.awaitTermination(1, TimeUnit.SECONDS));
+    Assert.assertTrue(service.isTerminated());
+    Assert.assertTrue(service.isShutdown());
+
+    timer.cancel();
+  }
+
+  @Test(timeout=2000)
+  public void testSerialExecution() throws Exception {
+    ExecutorService service = sharedExecutor.createExecutorService(1, "serial-test");
+
+    // Since it is serial we should never get concurrent modification exception too.
+    List<Integer> list = new ArrayList<>();
+    Future<?> f1 = service.submit(new Wait(list));
+    List<Future<?>> futures = new ArrayList<>();
+    for (int i = 0; i < 10; ++i) {
+      futures.add(service.submit(new Appender<Integer>(list, i)));
+    }
+
+    // This shutdown does not prevent already submitted tasks from completing.
+    service.shutdown();
+
+    // Until we notify nothing moves forward.
+    Assert.assertEquals(0, list.size());
+    _notify(list);
+    f1.get();
+
+    // Wait for all futures to finish.
+    for (Future<?> f : futures) {
+      f.get();
+    }
+    Assert.assertEquals(10, list.size());
+    Assert.assertEquals(Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), list);
+  }
+
+  @Test(timeout=5000)
+  public void testParallelExecution() throws Exception {
+    ConcurrentHashMap<String, AtomicInteger> map = new ConcurrentHashMap<>();
+
+    List<Future<?>> futures = new ArrayList<>();
+    ExecutorService[] services = {
+        sharedExecutor.createExecutorService(2, "parallel-1"),
+        sharedExecutor.createExecutorService(2, "parallel-2")
+    };
+    int[] expectedCounts = {0, 0};
+    Random random = new Random();
+    for (int i = 0; i < 200; ++i) {
+      int serviceIndex = random.nextInt(2);
+      expectedCounts[serviceIndex] += 1;
+      futures.add(services[serviceIndex].submit(
+          new Runner(new Sleep(10), new Counter(map, "test" + serviceIndex))));
+    }
+    for (Future<?> future : futures) {
+      future.get();
+    }
+    Assert.assertEquals(expectedCounts[0], map.get("test0").get());
+    Assert.assertEquals(expectedCounts[1], map.get("test1").get());
+
+    // Even if one service is shutdown the other should work.
+    services[0].shutdown();
+    services[1].submit(new Counter(map, "test1")).get();
+    Assert.assertEquals(expectedCounts[1] + 1, map.get("test1").get());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
index f9de995..3317bbf 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
 import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezExecutors;
 import org.apache.tez.common.TezTaskUmbilicalProtocol;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.TokenCache;
@@ -90,6 +91,8 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
   private final Map<String, String> localEnv = new HashMap<String, String>();
   private volatile FileSystem localFs;
   private final long memoryPerExecutor;
+
+  private final TezExecutors sharedExecutor;
   // TODO Support for removing queued containers, interrupting / killing specific containers - when preemption is supported
 
 
@@ -97,7 +100,8 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
 
   public ContainerRunnerImpl(int numExecutors, String[] localDirsBase,
                              AtomicReference<InetSocketAddress> localAddress,
-                             long totalMemoryAvailableBytes) {
+                             long totalMemoryAvailableBytes,
+                             TezExecutors sharedExecutor) {
     super("ContainerRunnerImpl");
     Preconditions.checkState(numExecutors > 0,
         "Invalid number of executors: " + numExecutors + ". Must be > 0");
@@ -117,6 +121,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
         "memoryPerExecutorDerived=" + memoryPerExecutor +
         ", numExecutors=" + numExecutors
     );
+    this.sharedExecutor = sharedExecutor;
   }
 
   @Override
@@ -262,7 +267,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
     ShuffleHandler.get().registerApplication(request.getApplicationIdString(), jobToken, request.getUser());
     TaskRunnerCallable callable = new TaskRunnerCallable(request, new Configuration(getConfig()),
         new ExecutionContextImpl(localAddress.get().getHostName()), env, localDirs,
-        workingDir, credentials, memoryPerExecutor);
+        workingDir, credentials, memoryPerExecutor, sharedExecutor);
     ListenableFuture<ContainerExecutionResult> future = executorService.submit(callable);
     Futures.addCallback(future, new TaskRunnerCallback(request, callable));
   }
@@ -385,12 +390,13 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
     private volatile TezTaskRunner2 taskRunner;
     private volatile TaskReporter taskReporter;
     private TezTaskUmbilicalProtocol umbilical;
+    private final TezExecutors sharedExecutor;
 
 
     TaskRunnerCallable(SubmitWorkRequestProto request, Configuration conf,
                              ExecutionContext executionContext, Map<String, String> envMap,
                              String[] localDirs, String workingDir, Credentials credentials,
-                             long memoryAvailable) {
+                             long memoryAvailable, TezExecutors sharedExecutor) {
       this.request = request;
       this.conf = conf;
       this.executionContext = executionContext;
@@ -405,6 +411,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
           .setDaemon(true)
           .setNameFormat("TezTaskRunner_" + request.getTaskSpec().getTaskAttemptIdString()).build());
       executor = MoreExecutors.listeningDecorator(executorReal);
+      this.sharedExecutor = sharedExecutor;
     }
 
     @Override
@@ -452,7 +459,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
           request.getAppAttemptNumber(),
           serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor, objectRegistry,
           pid,
-          executionContext, memoryAvailable, false, new DefaultHadoopShim());
+          executionContext, memoryAvailable, false, new DefaultHadoopShim(), sharedExecutor);
 
       boolean shouldDie;
       try {

http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java
index 322be00..17eb88c 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java
@@ -23,6 +23,8 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.common.TezExecutors;
+import org.apache.tez.common.TezSharedExecutor;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.service.ContainerRunner;
 import org.apache.tez.shufflehandler.ShuffleHandler;
@@ -46,6 +48,8 @@ public class TezTestService extends AbstractService implements ContainerRunner {
 
   private final AtomicReference<InetSocketAddress> address = new AtomicReference<InetSocketAddress>();
 
+  private final TezExecutors sharedExecutor;
+
   public TezTestService(Configuration conf, int numExecutors, long memoryAvailable, String[] localDirs) {
     super(TezTestService.class.getSimpleName());
     this.numExecutors = numExecutors;
@@ -73,8 +77,9 @@ public class TezTestService extends AbstractService implements ContainerRunner {
     this.shuffleHandlerConf.set(ShuffleHandler.SHUFFLE_HANDLER_LOCAL_DIRS, StringUtils.arrayToString(localDirs));
 
     this.server = new TezTestServiceProtocolServerImpl(this, address);
+    this.sharedExecutor = new TezSharedExecutor(conf);
     this.containerRunner = new ContainerRunnerImpl(numExecutors, localDirs, address,
-        memoryAvailableBytes);
+        memoryAvailableBytes, sharedExecutor);
   }
 
   @Override
@@ -95,6 +100,7 @@ public class TezTestService extends AbstractService implements ContainerRunner {
     containerRunner.stop();
     server.stop();
     ShuffleHandler.get().stop();
+    sharedExecutor.shutdownNow();
   }
 
   public InetSocketAddress getListenerAddress() {

http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
index 8b52cc9..f3403e6 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
@@ -46,6 +46,8 @@ import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.TezExecutors;
+import org.apache.tez.common.TezSharedExecutor;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DataSinkDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -235,7 +237,7 @@ public class TestMROutput {
   public static LogicalIOProcessorRuntimeTask createLogicalTask(
       Configuration conf,
       TezUmbilical umbilical, String dagName,
-      String vertexName) throws Exception {
+      String vertexName, TezExecutors sharedExecutor) throws Exception {
     ProcessorDescriptor procDesc = ProcessorDescriptor.create(TestProcessor.class.getName());
     List<InputSpec> inputSpecs = Lists.newLinkedList();
     List<OutputSpec> outputSpecs = Lists.newLinkedList();
@@ -263,9 +265,9 @@ public class TestMROutput {
         null,
         new HashMap<String, String>(),
         HashMultimap.<String, String>create(), null, "", new ExecutionContextImpl("localhost"),
-        Runtime.getRuntime().maxMemory(), true, new DefaultHadoopShim());
+        Runtime.getRuntime().maxMemory(), true, new DefaultHadoopShim(), sharedExecutor);
   }
-  
+
   public static class TestOutputCommitter extends OutputCommitter {
 
     @Override
@@ -395,10 +397,13 @@ public class TestMROutput {
   @Ignore
   @Test
   public void testPerf() throws Exception {
-    LogicalIOProcessorRuntimeTask task = createLogicalTask(new Configuration(), 
-        new TestUmbilical(), "dag", "vertex");
+    Configuration conf = new Configuration();
+    TezSharedExecutor sharedExecutor = new TezSharedExecutor(conf);
+    LogicalIOProcessorRuntimeTask task = createLogicalTask(conf, new TestUmbilical(), "dag",
+        "vertex", sharedExecutor);
     task.initialize();
     task.run();
     task.close();
+    sharedExecutor.shutdownNow();
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
index b69dc0c..29f9ca9 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
 import org.apache.tez.common.MRFrameworkConfigs;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.common.TezSharedExecutor;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.mapreduce.TezTestUtils;
@@ -200,7 +201,7 @@ public class MapUtils {
       JobConf jobConf, int mapId, Path mapInput,
       TezUmbilical umbilical, String dagName,
       String vertexName, List<InputSpec> inputSpecs,
-      List<OutputSpec> outputSpecs) throws Exception {
+      List<OutputSpec> outputSpecs, TezSharedExecutor sharedExecutor) throws Exception {
     jobConf.setInputFormat(SequenceFileInputFormat.class);
 
     ProcessorDescriptor mapProcessorDesc = ProcessorDescriptor.create(
@@ -234,7 +235,7 @@ public class MapUtils {
         serviceConsumerMetadata,
         envMap,
         HashMultimap.<String, String>create(), null, "", new ExecutionContextImpl("localhost"),
-        Runtime.getRuntime().maxMemory(), true, new DefaultHadoopShim());
+        Runtime.getRuntime().maxMemory(), true, new DefaultHadoopShim(), sharedExecutor);
     return task;
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index 7c5e2a7..eb30841 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@ -21,22 +21,10 @@ package org.apache.tez.mapreduce.processor.map;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
-import java.util.Random;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.db.FloatSplitter;
-import org.apache.hadoop.mapreduce.split.JobSplit;
-import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
 import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,6 +40,7 @@ import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.tez.common.MRFrameworkConfigs;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.common.TezSharedExecutor;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.UserPayload;
@@ -86,8 +75,6 @@ public class TestMapProcessor {
   private static FileSystem localFs = null; 
   private static Path workDir = null;
   static float progressUpdate = 0.0f;
-  final private static FsPermission JOB_FILE_PERMISSION = FsPermission
-      .createImmutable((short) 0644);
   static {
     try {
       defaultConf.set("fs.defaultFS", "file:///");
@@ -163,15 +150,17 @@ public class TestMapProcessor {
         OutputDescriptor.create(OrderedPartitionedKVOutput.class.getName())
             .setUserPayload(TezUtils.createUserPayloadFromConf(jobConf)), 1);
 
+    TezSharedExecutor sharedExecutor = new TezSharedExecutor(jobConf);
     LogicalIOProcessorRuntimeTask task = MapUtils.createLogicalTask(localFs, workDir, jobConf, 0,
         new Path(workDir, "map0"), new TestUmbilical(), dagName, vertexName,
-        Collections.singletonList(mapInputSpec),
-        Collections.singletonList(mapOutputSpec));
-    
+        Collections.singletonList(mapInputSpec), Collections.singletonList(mapOutputSpec),
+        sharedExecutor);
+
     task.initialize();
     task.run();
     task.close();
-    
+    sharedExecutor.shutdownNow();
+
     OutputContext outputContext = task.getOutputContexts().iterator().next();
     TezTaskOutput mapOutputs = new TezTaskOutputFiles(jobConf, outputContext.getUniqueIdentifier());
     
@@ -236,11 +225,12 @@ public class TestMapProcessor {
         OutputDescriptor.create(OrderedPartitionedKVOutput.class.getName())
             .setUserPayload(TezUtils.createUserPayloadFromConf(jobConf)), 1);
 
+    TezSharedExecutor sharedExecutor = new TezSharedExecutor(jobConf);
     final LogicalIOProcessorRuntimeTask task = MapUtils.createLogicalTask
         (localFs, workDir, jobConf, 0,
             new Path(workDir, "map0"), new TestUmbilical(), dagName, vertexName,
             Collections.singletonList(mapInputSpec),
-            Collections.singletonList(mapOutputSpec));
+            Collections.singletonList(mapOutputSpec), sharedExecutor);
 
     ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
     Thread monitorProgress = new Thread(new Runnable() {
@@ -259,5 +249,6 @@ public class TestMapProcessor {
     Assert.assertTrue("Progress Updates should be captured!",
         progressUpdate > 0.0f && progressUpdate < 1.0f);
     task.close();
+    sharedExecutor.shutdownNow();
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index ca3792f..42ea4f7 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
 import org.apache.tez.common.MRFrameworkConfigs;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.common.TezSharedExecutor;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.dag.api.InputDescriptor;
@@ -155,10 +156,11 @@ public class TestReduceProcessor {
 
     TestUmbilical testUmbilical = new TestUmbilical();
 
+    TezSharedExecutor sharedExecutor = new TezSharedExecutor(jobConf);
     LogicalIOProcessorRuntimeTask mapTask = MapUtils.createLogicalTask(localFs, workDir, jobConf, 0,
         mapInput, testUmbilical, dagName, mapVertexName,
         Collections.singletonList(mapInputSpec),
-        Collections.singletonList(mapOutputSpec));
+        Collections.singletonList(mapOutputSpec), sharedExecutor);
 
     mapTask.initialize();
     mapTask.run();
@@ -225,7 +227,7 @@ public class TestReduceProcessor {
         serviceConsumerMetadata,
         serviceProviderEnvMap,
         HashMultimap.<String, String>create(), null, "", new ExecutionContextImpl("localhost"),
-        Runtime.getRuntime().maxMemory(), true, new DefaultHadoopShim());
+        Runtime.getRuntime().maxMemory(), true, new DefaultHadoopShim(), sharedExecutor);
 
     List<Event> destEvents = new LinkedList<Event>();
     destEvents.add(dme);
@@ -235,6 +237,7 @@ public class TestReduceProcessor {
     sortedOut.handleEvents(destEvents);
     task.run();
     task.close();
+    sharedExecutor.shutdownNow();
 
     // MRTask mrTask = (MRTask)t.getProcessor();
     // TODO NEWTEZ Verify the partitioner has not been created

http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index e49791f..5c2ab77 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.CallableWithNdc;
 import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.RunnableWithNdc;
+import org.apache.tez.common.TezExecutors;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -158,13 +159,15 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
 
   private final boolean initializeProcessorFirst;
   private final boolean initializeProcessorIOSerially;
+  private final TezExecutors sharedExecutor;
 
   public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, int appAttemptNumber,
       Configuration tezConf, String[] localDirs, TezUmbilical tezUmbilical,
       Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> envMap,
       Multimap<String, String> startedInputsMap, ObjectRegistry objectRegistry,
       String pid, ExecutionContext ExecutionContext, long memAvailable,
-      boolean updateSysCounters, HadoopShim hadoopShim) throws IOException {
+      boolean updateSysCounters, HadoopShim hadoopShim,
+      TezExecutors sharedExecutor) throws IOException {
     // Note: If adding any fields here, make sure they're cleaned up in the cleanupContext method.
     // TODO Remove jobToken from here post TEZ-421
     super(taskSpec, tezConf, tezUmbilical, pid, updateSysCounters);
@@ -217,6 +220,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     this.hadoopShim = hadoopShim;
     this.maxEventBacklog = tezConf.getInt(TezConfiguration.TEZ_TASK_MAX_EVENT_BACKLOG,
         TezConfiguration.TEZ_TASK_MAX_EVENT_BACKLOG_DEFAULT);
+    this.sharedExecutor = sharedExecutor;
   }
 
   /**
@@ -596,7 +600,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
         inputSpec.getInputDescriptor().getUserPayload(), this,
         serviceConsumerMetadata, envMap, initialMemoryDistributor,
         inputSpec.getInputDescriptor(), inputMap, inputReadyTracker, objectRegistry,
-        ExecutionContext, memAvailable);
+        ExecutionContext, memAvailable, sharedExecutor);
     return inputContext;
   }
 
@@ -611,7 +615,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
         outputSpec.getOutputDescriptor().getUserPayload(), this,
         serviceConsumerMetadata, envMap, initialMemoryDistributor,
         outputSpec.getOutputDescriptor(), objectRegistry, ExecutionContext,
-        memAvailable);
+        memAvailable, sharedExecutor);
     return outputContext;
   }
 
@@ -622,8 +626,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
         taskSpec.getVertexParallelism(),
         taskSpec.getTaskAttemptID(),
         processorDescriptor.getUserPayload(), this,
-        serviceConsumerMetadata, envMap, initialMemoryDistributor,
-        processorDescriptor, inputReadyTracker, objectRegistry, ExecutionContext, memAvailable);
+        serviceConsumerMetadata, envMap, initialMemoryDistributor, processorDescriptor,
+        inputReadyTracker, objectRegistry, ExecutionContext, memAvailable, sharedExecutor);
     return processorContext;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
index afb78d9..15a6485 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
@@ -32,6 +32,7 @@ import javax.annotation.Nullable;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezExecutors;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -94,12 +95,13 @@ public class TezInputContextImpl extends TezTaskContextImpl
                              Map<String, String> auxServiceEnv, MemoryDistributor memDist,
                              InputDescriptor inputDescriptor, Map<String, LogicalInput> inputs,
                              InputReadyTracker inputReadyTracker, ObjectRegistry objectRegistry,
-                             ExecutionContext ExecutionContext, long memAvailable) {
+                             ExecutionContext ExecutionContext, long memAvailable,
+                             TezExecutors sharedExecutor) {
     super(conf, workDirs, appAttemptNumber, dagName, taskVertexName,
         vertexParallelism, taskAttemptID, wrapCounters(runtimeTask,
         taskVertexName, sourceVertexName, conf), runtimeTask, tezUmbilical,
         serviceConsumerMetadata, auxServiceEnv, memDist, inputDescriptor,
-        objectRegistry, ExecutionContext, memAvailable);
+        objectRegistry, ExecutionContext, memAvailable, sharedExecutor);
     checkNotNull(inputIndex, "inputIndex is null");
     checkNotNull(sourceVertexName, "sourceVertexName is null");
     checkNotNull(inputs, "input map is null");
@@ -153,7 +155,6 @@ public class TezInputContextImpl extends TezTaskContextImpl
     return sourceVertexName;
   }
 
-  @SuppressWarnings("deprecation")
   @Override
   public void fatalError(Throwable exception, String message) {
     super.signalFatalError(exception, message, sourceInfo);

http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
index 1bd78d3..41e8d41 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
@@ -32,6 +32,7 @@ import javax.annotation.Nullable;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezExecutors;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -89,12 +90,13 @@ public class TezOutputContextImpl extends TezTaskContextImpl
       Map<String, ByteBuffer> serviceConsumerMetadata,
       Map<String, String> auxServiceEnv, MemoryDistributor memDist,
       OutputDescriptor outputDescriptor, ObjectRegistry objectRegistry,
-      ExecutionContext ExecutionContext, long memAvailable) {
+      ExecutionContext executionContext, long memAvailable, TezExecutors sharedExecutor) {
     super(conf, workDirs, appAttemptNumber, dagName, taskVertexName, 
         vertexParallelism, taskAttemptID,
         wrapCounters(runtimeTask, taskVertexName, destinationVertexName, conf),
         runtimeTask, tezUmbilical, serviceConsumerMetadata,
-        auxServiceEnv, memDist, outputDescriptor, objectRegistry, ExecutionContext, memAvailable);
+        auxServiceEnv, memDist, outputDescriptor, objectRegistry, executionContext, memAvailable,
+        sharedExecutor);
     checkNotNull(outputIndex, "outputIndex is null");
     checkNotNull(destinationVertexName, "destinationVertexName is null");
     this.userPayload = userPayload;
@@ -138,7 +140,6 @@ public class TezOutputContextImpl extends TezTaskContextImpl
     return destinationVertexName;
   }
 
-  @SuppressWarnings("deprecation")
   @Override
   public void fatalError(Throwable exception, String message) {
     super.signalFatalError(exception, message, sourceInfo);

http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
index d03f48e..beae693 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import javax.annotation.Nullable;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezExecutors;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -63,10 +64,11 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements Proce
       Map<String, ByteBuffer> serviceConsumerMetadata,
       Map<String, String> auxServiceEnv, MemoryDistributor memDist,
       ProcessorDescriptor processorDescriptor, InputReadyTracker inputReadyTracker, ObjectRegistry objectRegistry,
-      ExecutionContext ExecutionContext, long memAvailable) {
+      ExecutionContext ExecutionContext, long memAvailable, TezExecutors sharedExecutor) {
     super(conf, workDirs, appAttemptNumber, dagName, vertexName, vertexParallelism, taskAttemptID,
         runtimeTask.addAndGetTezCounter(vertexName), runtimeTask, tezUmbilical, serviceConsumerMetadata,
-        auxServiceEnv, memDist, processorDescriptor, objectRegistry, ExecutionContext, memAvailable);
+        auxServiceEnv, memDist, processorDescriptor, objectRegistry, ExecutionContext, memAvailable,
+        sharedExecutor);
     checkNotNull(inputReadyTracker, "inputReadyTracker is null");
     this.userPayload = userPayload;
     this.sourceInfo = new EventMetaData(EventProducerConsumerType.PROCESSOR,
@@ -98,7 +100,6 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements Proce
     }
   }
 
-  @SuppressWarnings("deprecation")
   @Override
   public void fatalError(Throwable exception, String message) {
     super.signalFatalError(exception, message, sourceInfo);

http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
index 35abd1e..5a6a405 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.annotation.Nullable;
@@ -33,6 +34,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
+import org.apache.tez.common.TezExecutors;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.EntityDescriptor;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -67,6 +69,7 @@ public abstract class TezTaskContextImpl implements TaskContext, Closeable {
   private final int vertexParallelism;
   private final ExecutionContext ExecutionContext;
   private final long memAvailable;
+  private final TezExecutors sharedExecutor;
 
   @Private
   public TezTaskContextImpl(Configuration conf, String[] workDirs, int appAttemptNumber,
@@ -75,7 +78,7 @@ public abstract class TezTaskContextImpl implements TaskContext, Closeable {
       TezUmbilical tezUmbilical, Map<String, ByteBuffer> serviceConsumerMetadata,
       Map<String, String> auxServiceEnv, MemoryDistributor memDist,
       EntityDescriptor<?> descriptor, ObjectRegistry objectRegistry,
-      ExecutionContext ExecutionContext, long memAvailable) {
+      ExecutionContext ExecutionContext, long memAvailable, TezExecutors sharedExecutor) {
     checkNotNull(conf, "conf is null");
     checkNotNull(dagName, "dagName is null");
     checkNotNull(taskVertexName, "taskVertexName is null");
@@ -85,6 +88,7 @@ public abstract class TezTaskContextImpl implements TaskContext, Closeable {
     checkNotNull(auxServiceEnv, "auxServiceEnv is null");
     checkNotNull(memDist, "memDist is null");
     checkNotNull(descriptor, "descriptor is null");
+    checkNotNull(sharedExecutor, "sharedExecutor is null");
     this.dagName = dagName;
     this.taskVertexName = taskVertexName;
     this.taskAttemptID = taskAttemptID;
@@ -106,6 +110,7 @@ public abstract class TezTaskContextImpl implements TaskContext, Closeable {
     this.vertexParallelism = vertexParallelism;
     this.ExecutionContext = ExecutionContext;
     this.memAvailable = memAvailable;
+    this.sharedExecutor = sharedExecutor;
   }
 
   @Override
@@ -237,6 +242,12 @@ public abstract class TezTaskContextImpl implements TaskContext, Closeable {
     return this.ExecutionContext;
   }
 
+  @Override
+  public ExecutorService createTezFrameworkExecutorService(
+      int parallelism, String threadNameFormat) {
+    return sharedExecutor.createExecutorService(parallelism, threadNameFormat);
+  }
+
   private int generateId() {
     return ID_GEN.incrementAndGet();
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
index e8e7391..bc911c3 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -52,7 +52,9 @@ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.tez.common.ContainerContext;
 import org.apache.tez.common.ContainerTask;
 import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezExecutors;
 import org.apache.tez.common.TezLocalResource;
+import org.apache.tez.common.TezSharedExecutor;
 import org.apache.tez.common.TezTaskUmbilicalProtocol;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.common.counters.Limits;
@@ -120,6 +122,7 @@ public class TezChild {
   private int taskCount = 0;
   private TezVertexID lastVertexID;
   private final HadoopShim hadoopShim;
+  private final TezExecutors sharedExecutor;
 
   public TezChild(Configuration conf, String host, int port, String containerIdentifier,
       String tokenIdentifier, int appAttemptNumber, String workingDir, String[] localDirs,
@@ -141,6 +144,7 @@ public class TezChild {
     this.user = user;
     this.updateSysCounters = updateSysCounters;
     this.hadoopShim = hadoopShim;
+    this.sharedExecutor = new TezSharedExecutor(defaultConf);
 
     getTaskMaxSleepTime = defaultConf.getInt(
         TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX,
@@ -258,7 +262,7 @@ public class TezChild {
             localDirs, containerTask.getTaskSpec(), appAttemptNumber,
             serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap, taskReporter,
             executor, objectRegistry, pid, executionContext, memAvailable, updateSysCounters,
-            hadoopShim);
+            hadoopShim, sharedExecutor);
         boolean shouldDie;
         try {
           TaskRunner2Result result = taskRunner.run();

http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
index 96f8474..306f2a7 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
@@ -35,6 +35,8 @@ import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSError;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.tez.common.TezExecutors;
+import org.apache.tez.common.TezSharedExecutor;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.hadoop.shim.HadoopShim;
@@ -102,6 +104,25 @@ public class TezTaskRunner2 {
   // The callable which is being used to execute the task.
   private volatile TaskRunner2Callable taskRunnerCallable;
 
+  // This instance is set only if the runner was not configured explicity and will be shutdown
+  // when this task is finished.
+  private final TezSharedExecutor localExecutor;
+
+  @Deprecated
+  public TezTaskRunner2(Configuration tezConf, UserGroupInformation ugi, String[] localDirs,
+      TaskSpec taskSpec, int appAttemptNumber,
+      Map<String, ByteBuffer> serviceConsumerMetadata,
+      Map<String, String> serviceProviderEnvMap,
+      Multimap<String, String> startedInputsMap,
+      TaskReporterInterface taskReporter, ExecutorService executor,
+      ObjectRegistry objectRegistry, String pid,
+      ExecutionContext executionContext, long memAvailable,
+      boolean updateSysCounters, HadoopShim hadoopShim) throws IOException {
+    this(tezConf, ugi, localDirs, taskSpec, appAttemptNumber, serviceConsumerMetadata,
+        serviceProviderEnvMap, startedInputsMap, taskReporter, executor, objectRegistry,
+        pid, executionContext, memAvailable, updateSysCounters, hadoopShim, null);
+  }
+
   public TezTaskRunner2(Configuration tezConf, UserGroupInformation ugi, String[] localDirs,
                         TaskSpec taskSpec, int appAttemptNumber,
                         Map<String, ByteBuffer> serviceConsumerMetadata,
@@ -110,7 +131,8 @@ public class TezTaskRunner2 {
                         TaskReporterInterface taskReporter, ExecutorService executor,
                         ObjectRegistry objectRegistry, String pid,
                         ExecutionContext executionContext, long memAvailable,
-                        boolean updateSysCounters, HadoopShim hadoopShim) throws
+                        boolean updateSysCounters, HadoopShim hadoopShim,
+                        TezExecutors sharedExecutor) throws
       IOException {
     this.ugi = ugi;
     this.taskReporter = taskReporter;
@@ -125,9 +147,11 @@ public class TezTaskRunner2 {
         taskConf.set(entry.getKey(), entry.getValue());
       }
     }
+    localExecutor = sharedExecutor == null ? new TezSharedExecutor(tezConf) : null;
     this.task = new LogicalIOProcessorRuntimeTask(taskSpec, appAttemptNumber, taskConf, localDirs,
         umbilicalAndErrorHandler, serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap,
-        objectRegistry, pid, executionContext, memAvailable, updateSysCounters, hadoopShim);
+        objectRegistry, pid, executionContext, memAvailable, updateSysCounters, hadoopShim,
+        sharedExecutor == null ? localExecutor : sharedExecutor);
   }
 
   /**
@@ -258,6 +282,9 @@ public class TezTaskRunner2 {
       if (taskKillStartTime != 0) {
         LOG.info("Time taken to interrupt task={}", (System.currentTimeMillis() - taskKillStartTime));
       }
+      if (localExecutor != null) {
+        localExecutor.shutdown();
+      }
       Thread.interrupted();
     }
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
index ecfc424..c1bb3a1 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
@@ -30,6 +30,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.tez.common.TezSharedExecutor;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -81,10 +82,11 @@ public class TestLogicalIOProcessorRuntimeTask {
     TezTaskAttemptID taId2 = createTaskAttemptID(vertexId, 2);
     TaskSpec task2 = createTaskSpec(taId2, "dag2", "vertex1", 10);
 
+    TezSharedExecutor sharedExecutor = new TezSharedExecutor(tezConf);
     LogicalIOProcessorRuntimeTask lio1 = new LogicalIOProcessorRuntimeTask(task1, 0, tezConf, null,
         umbilical, serviceConsumerMetadata, new HashMap<String, String>(), startedInputsMap, null,
         "", new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory(), true,
-        new DefaultHadoopShim());
+        new DefaultHadoopShim(), sharedExecutor);
 
     try {
       lio1.initialize();
@@ -105,6 +107,7 @@ public class TestLogicalIOProcessorRuntimeTask {
       assertEquals(30, lio1.getOutputContexts().iterator().next().getVertexParallelism());
     } catch(Exception e) {
       fail();
+      sharedExecutor.shutdownNow();
     } finally {
       cleanupAndTest(lio1);
     }
@@ -114,7 +117,7 @@ public class TestLogicalIOProcessorRuntimeTask {
     LogicalIOProcessorRuntimeTask lio2 = new LogicalIOProcessorRuntimeTask(task2, 0, tezConf, null,
         umbilical, serviceConsumerMetadata, new HashMap<String, String>(), startedInputsMap, null,
         "", new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory(), true,
-        new DefaultHadoopShim());
+        new DefaultHadoopShim(), sharedExecutor);
     try {
       lio2.initialize();
       lio2.run();
@@ -134,6 +137,7 @@ public class TestLogicalIOProcessorRuntimeTask {
       fail();
     } finally {
       cleanupAndTest(lio2);
+      sharedExecutor.shutdownNow();
     }
 
   }
@@ -275,7 +279,7 @@ public class TestLogicalIOProcessorRuntimeTask {
     @Override
     public void start() throws Exception {
       startCount++;
-      this.vertexParallelism = getContext().getVertexParallelism();
+      vertexParallelism = getContext().getVertexParallelism();
       getContext().notifyProgress();
     }
 
@@ -315,7 +319,7 @@ public class TestLogicalIOProcessorRuntimeTask {
     public void start() throws Exception {
       System.err.println("Out started");
       startCount++;
-      this.vertexParallelism = getContext().getVertexParallelism();
+      vertexParallelism = getContext().getVertexParallelism();
       getContext().notifyProgress();
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java
index d16b880..bf4fdf6 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.TezSharedExecutor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -61,10 +62,11 @@ public class TestProcessorContext {
     TaskSpec mockSpec = mock(TaskSpec.class);
     when(mockSpec.getInputs()).thenReturn(Collections.singletonList(mock(InputSpec.class)));
     when(mockSpec.getOutputs()).thenReturn(Collections.singletonList(mock(OutputSpec.class)));
-    LogicalIOProcessorRuntimeTask runtimeTask = new LogicalIOProcessorRuntimeTask(
-        mockSpec, 1, 
-        new Configuration(), new String[]{"/"}, 
-        tezUmbilical, null, null, null, null, "", null, 1024, false, new DefaultHadoopShim());
+    Configuration conf = new Configuration();
+    TezSharedExecutor sharedExecutor = new TezSharedExecutor(conf);
+    LogicalIOProcessorRuntimeTask runtimeTask = new LogicalIOProcessorRuntimeTask(mockSpec, 1, conf,
+        new String[]{"/"}, tezUmbilical, null, null, null, null, "", null, 1024, false,
+        new DefaultHadoopShim(), sharedExecutor);
     LogicalIOProcessorRuntimeTask mockTask = spy(runtimeTask);
     Map<String, ByteBuffer> serviceConsumerMetadata = Maps.newHashMap();
     Map<String, String> auxServiceEnv = Maps.newHashMap();
@@ -94,7 +96,8 @@ public class TestProcessorContext {
             inputReadyTracker,
             objectRegistry,
             execContext,
-            memAvailable);
+            memAvailable,
+            sharedExecutor);
 
     assertEquals(dagNumber, procContext.getDagIdentifier());
     assertEquals(appAttemptNumber, procContext.getDAGAttemptNumber());
@@ -107,5 +110,6 @@ public class TestProcessorContext {
      // test auto call of notifyProgress
      procContext.setProgress(0.1f);
      verify(mockTask, times(1)).notifyProgressInvocation();
+     sharedExecutor.shutdown();
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
index adcbe4a..07b9d33 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
@@ -49,6 +49,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.TezExecutors;
+import org.apache.tez.common.TezSharedExecutor;
 import org.apache.tez.common.counters.CounterGroup;
 import org.apache.tez.common.counters.FileSystemCounter;
 import org.apache.tez.common.counters.TaskCounter;
@@ -778,19 +780,21 @@ public class TestTaskExecution2 {
         new TaskSpec(taskAttemptId, "dagName", "vertexName", -1, processorDescriptor,
             new ArrayList<InputSpec>(), new ArrayList<OutputSpec>(), null, null);
 
+    TezExecutors sharedExecutor = new TezSharedExecutor(tezConf);
     TezTaskRunner2 taskRunner;
     if (testRunner) {
       taskRunner = new TezTaskRunner2ForTest(tezConf, ugi, localDirs, taskSpec, 1,
           new HashMap<String, ByteBuffer>(), new HashMap<String, String>(),
           HashMultimap.<String, String>create(), taskReporter,
           executor, null, "", new ExecutionContextImpl("localhost"),
-          Runtime.getRuntime().maxMemory(), updateSysCounters);
+          Runtime.getRuntime().maxMemory(), updateSysCounters, sharedExecutor);
     } else {
       taskRunner = new TezTaskRunner2(tezConf, ugi, localDirs, taskSpec, 1,
           new HashMap<String, ByteBuffer>(), new HashMap<String, String>(),
           HashMultimap.<String, String>create(), taskReporter,
           executor, null, "", new ExecutionContextImpl("localhost"),
-          Runtime.getRuntime().maxMemory(), updateSysCounters, new DefaultHadoopShim());
+          Runtime.getRuntime().maxMemory(), updateSysCounters, new DefaultHadoopShim(),
+          sharedExecutor);
     }
 
     return taskRunner;
@@ -815,10 +819,12 @@ public class TestTaskExecution2 {
                                  String pid,
                                  ExecutionContext executionContext,
                                  long memAvailable,
-                                 boolean updateSysCounters) throws IOException {
+                                 boolean updateSysCounters,
+                                 TezExecutors sharedExecutor) throws IOException {
       super(tezConf, ugi, localDirs, taskSpec, appAttemptNumber, serviceConsumerMetadata,
           serviceProviderEnvMap, startedInputsMap, taskReporter, executor, objectRegistry, pid,
-          executionContext, memAvailable, updateSysCounters, new DefaultHadoopShim());
+          executionContext, memAvailable, updateSysCounters, new DefaultHadoopShim(),
+          sharedExecutor);
     }
 
 

http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTezTaskRunner2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTezTaskRunner2.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTezTaskRunner2.java
index f58421a..6876df9 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTezTaskRunner2.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTezTaskRunner2.java
@@ -20,13 +20,13 @@ package org.apache.tez.runtime.task;
 
 import static org.mockito.Mockito.*;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.tez.common.ProtoConverters;
+import org.apache.tez.common.TezExecutors;
+import org.apache.tez.common.TezSharedExecutor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.hadoop.shim.DefaultHadoopShim;
 import org.apache.tez.runtime.api.impl.InputSpec;
@@ -52,13 +52,15 @@ public class TestTezTaskRunner2 {
     List<OutputSpec> outputSpecList = new ArrayList<>();
     TaskSpec taskSpec = new TaskSpec("dagName", "vertexName", 1, mock(ProcessorDescriptor.class),
         inputSpecList, outputSpecList, null, taskConf);
+    TezExecutors sharedExecutor = new TezSharedExecutor(conf);
     TezTaskRunner2 taskRunner2 = new TezTaskRunner2(conf, mock(UserGroupInformation.class),
         localDirs, taskSpec, 1, null, null, null, mock(TaskReporter.class), null, null, "pid",
-        null, 1000, false, new DefaultHadoopShim());
+        null, 1000, false, new DefaultHadoopShim(), sharedExecutor);
 
     Assert.assertEquals("global1", taskRunner2.task.getTaskConf().get("global"));
     Assert.assertEquals("task1", taskRunner2.task.getTaskConf().get("global_override"));
     Assert.assertEquals("task1", taskRunner2.task.getTaskConf().get("task"));
+    sharedExecutor.shutdownNow();
   }
 
 

http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
index 38a60a2..f549e8f 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
+import org.apache.tez.common.TezSharedExecutor;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.UserPayload;
@@ -125,7 +126,8 @@ public class TestOnFileUnorderedKVOutput {
     conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
     conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, IntWritable.class.getName());
 
-    OutputContext outputContext = createOutputContext(conf);
+    TezSharedExecutor sharedExecutor = new TezSharedExecutor(conf);
+    OutputContext outputContext = createOutputContext(conf, sharedExecutor);
 
     UnorderedKVOutput kvOutput = new UnorderedKVOutput(outputContext, 1);
 
@@ -155,6 +157,7 @@ public class TestOnFileUnorderedKVOutput {
     assertEquals(outputContext.getUniqueIdentifier(), shufflePayload.getPathComponent());
     assertEquals(shufflePort, shufflePayload.getPort());
     assertEquals("localhost", shufflePayload.getHost());
+    sharedExecutor.shutdownNow();
   }
 
   @Test(timeout = 30000)
@@ -167,7 +170,8 @@ public class TestOnFileUnorderedKVOutput {
 
     conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB, 1);
 
-    OutputContext outputContext = createOutputContext(conf);
+    TezSharedExecutor sharedExecutor = new TezSharedExecutor(conf);
+    OutputContext outputContext = createOutputContext(conf, sharedExecutor);
 
     UnorderedKVOutput kvOutput = new UnorderedKVOutput(outputContext, 1);
 
@@ -202,9 +206,11 @@ public class TestOnFileUnorderedKVOutput {
     assertFalse(shufflePayload.hasEmptyPartitions());
     assertEquals(shufflePort, shufflePayload.getPort());
     assertEquals("localhost", shufflePayload.getHost());
+    sharedExecutor.shutdownNow();
   }
 
-  private OutputContext createOutputContext(Configuration conf) throws IOException {
+  private OutputContext createOutputContext(Configuration conf, TezSharedExecutor sharedExecutor)
+      throws IOException {
     int appAttemptNumber = 1;
     TezUmbilical tezUmbilical = mock(TezUmbilical.class);
     String dagName = "currentDAG";
@@ -219,11 +225,10 @@ public class TestOnFileUnorderedKVOutput {
     TaskSpec mockSpec = mock(TaskSpec.class);
     when(mockSpec.getInputs()).thenReturn(Collections.singletonList(mock(InputSpec.class)));
     when(mockSpec.getOutputs()).thenReturn(Collections.singletonList(mock(OutputSpec.class)));
-    task = new LogicalIOProcessorRuntimeTask(
-        mockSpec, appAttemptNumber, 
-        new Configuration(), new String[]{"/"},
-        tezUmbilical, null, null, null, null, "", null, 1024, false, new DefaultHadoopShim());
-    
+    task = new LogicalIOProcessorRuntimeTask(mockSpec, appAttemptNumber, new Configuration(),
+        new String[]{"/"}, tezUmbilical, null, null, null, null, "", null, 1024, false,
+        new DefaultHadoopShim(), sharedExecutor);
+
     LogicalIOProcessorRuntimeTask runtimeTask = spy(task);
     
     Map<String, String> auxEnv = new HashMap<String, String>();
@@ -240,7 +245,7 @@ public class TestOnFileUnorderedKVOutput {
         appAttemptNumber, tezUmbilical, dagName, taskVertexName, destinationVertexName,
         -1, taskAttemptID, 0, userPayload, runtimeTask,
         null, auxEnv, new MemoryDistributor(1, 1, conf) , outputDescriptor, null,
-        new ExecutionContextImpl("localhost"), 2048);
+        new ExecutionContextImpl("localhost"), 2048, new TezSharedExecutor(defaultConf));
     verify(runtimeTask, times(1)).addAndGetTezCounter(destinationVertexName);
     verify(runtimeTask, times(1)).getTaskStatistics();
     // verify output stats object got created