You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/03/18 18:47:33 UTC

git commit: TEZ-940. Fix a memory leak in TaskSchedulerAppCallbackWrapper. Contributed by Gopal V and Siddharth Seth.

Repository: incubator-tez
Updated Branches:
  refs/heads/master 4a60f05c1 -> 3f3f94827


TEZ-940. Fix a memory leak in TaskSchedulerAppCallbackWrapper.
Contributed by Gopal V and Siddharth Seth.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/3f3f9482
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/3f3f9482
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/3f3f9482

Branch: refs/heads/master
Commit: 3f3f94827486ceb87d9a76311911fe4ebc8cdc02
Parents: 4a60f05
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Mar 18 10:46:56 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue Mar 18 10:46:56 2014 -0700

----------------------------------------------------------------------
 .../apache/tez/dag/app/rm/TaskScheduler.java    |   3 +-
 .../app/rm/TaskSchedulerAppCallbackWrapper.java |  35 ++-----
 .../dag/app/rm/TestTaskSchedulerHelpers.java    | 101 ++++++++++++++++++-
 3 files changed, 110 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3f3f9482/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
index bb06904..606a0b3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
@@ -254,7 +254,8 @@ public class TaskScheduler extends AbstractService
     this.appContext = appContext;
   }
 
-  private ExecutorService createAppCallbackExecutorService() {
+  @VisibleForTesting
+  ExecutorService createAppCallbackExecutorService() {
     return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
         .setNameFormat("TaskSchedulerAppCaller #%d").setDaemon(true).build());
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3f3f9482/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java
index c690926..53c3e95 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java
@@ -22,8 +22,6 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
@@ -36,20 +34,15 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.app.rm.TaskScheduler.TaskSchedulerAppCallback;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
  * Makes use of an ExecutionService to invoke application callbacks. Methods
  * which return values wait for execution to complete - effectively waiting for
  * all previous events in the queue to complete.
  */
-@SuppressWarnings({"rawtypes", "unchecked"})
 class TaskSchedulerAppCallbackWrapper implements TaskSchedulerAppCallback {
 
   private TaskSchedulerAppCallback real;
-  
-  @VisibleForTesting
-  CompletionService completionService;
+
   ExecutorService executorService;
   
   /**
@@ -60,58 +53,52 @@ class TaskSchedulerAppCallbackWrapper implements TaskSchedulerAppCallback {
       ExecutorService executorService) {
     this.real = real;
     this.executorService = executorService;
-    this.completionService = createAppCallbackCompletionService();
-  }
-
-  @VisibleForTesting
-  CompletionService createAppCallbackCompletionService() {
-    return new ExecutorCompletionService(this.executorService);
   }
 
   @Override
   public void taskAllocated(Object task, Object appCookie, Container container) {
-    completionService.submit(new TaskAllocatedCallable(real, task, appCookie,
+    executorService.submit(new TaskAllocatedCallable(real, task, appCookie,
         container));
   }
 
   @Override
   public void containerCompleted(Object taskLastAllocated,
       ContainerStatus containerStatus) {
-    completionService.submit(new ContainerCompletedCallable(real,
+    executorService.submit(new ContainerCompletedCallable(real,
         taskLastAllocated, containerStatus));
   }
 
   @Override
   public void containerBeingReleased(ContainerId containerId) {
-    completionService
+    executorService
         .submit(new ContainerBeingReleasedCallable(real, containerId));
   }
 
   @Override
   public void nodesUpdated(List<NodeReport> updatedNodes) {
-    completionService.submit(new NodesUpdatedCallable(real, updatedNodes));
+    executorService.submit(new NodesUpdatedCallable(real, updatedNodes));
   }
 
   @Override
   public void appShutdownRequested() {
-    completionService.submit(new AppShudownRequestedCallable(real));
+    executorService.submit(new AppShudownRequestedCallable(real));
   }
 
   @Override
   public void setApplicationRegistrationData(Resource maxContainerCapability,
       Map<ApplicationAccessType, String> appAcls, ByteBuffer key) {
-    completionService.submit(new SetApplicationRegistrationDataCallable(real,
+    executorService.submit(new SetApplicationRegistrationDataCallable(real,
         maxContainerCapability, appAcls, key));
   }
 
   @Override
   public void onError(Throwable t) {
-    completionService.submit(new OnErrorCallable(real, t));
+    executorService.submit(new OnErrorCallable(real, t));
   }
 
   @Override
   public float getProgress() {
-    Future<Float> progressFuture = completionService
+    Future<Float> progressFuture = executorService
         .submit(new GetProgressCallable(real));
     try {
       return progressFuture.get();
@@ -122,12 +109,12 @@ class TaskSchedulerAppCallbackWrapper implements TaskSchedulerAppCallback {
   
   @Override
   public void preemptContainer(ContainerId containerId) {
-    completionService.submit(new PreemptContainerCallable(real, containerId));
+    executorService.submit(new PreemptContainerCallable(real, containerId));
   }
 
   @Override
   public AppFinalStatus getFinalAppStatus() {
-    Future<AppFinalStatus> appFinalStatusFuture = completionService
+    Future<AppFinalStatus> appFinalStatusFuture = executorService
         .submit(new GetFinalAppStatusCallable(real));
     try {
       return appFinalStatusFuture.get();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3f3f9482/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index 087da83..72cc8e7 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -25,13 +25,18 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
 import java.nio.ByteBuffer;
+import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -108,7 +113,7 @@ class TestTaskSchedulerHelpers {
     protected void serviceStop() {
     }
   }
-
+  
   // Overrides start / stop. Will be controlled without the extra event handling thread.
   static class TaskSchedulerEventHandlerForTest extends
       TaskSchedulerEventHandler {
@@ -217,6 +222,12 @@ class TestTaskSchedulerHelpers {
               appCallbackExecutor));
       return drainableAppCallback;
     }
+    
+    @Override
+    ExecutorService createAppCallbackExecutorService() {
+      ExecutorService real = super.createAppCallbackExecutorService();
+      return new CountingExecutorService(real);
+    }
 
     public TaskSchedulerAppCallbackDrainable getDrainableAppCallback() {
       return drainableAppCallback;
@@ -228,11 +239,11 @@ class TestTaskSchedulerHelpers {
     int completedEvents;
     int invocations;
     private TaskSchedulerAppCallback real;
-    private CompletionService completionService;
+    private CountingExecutorService countingExecutorService;
     final AtomicInteger count = new AtomicInteger(0);
     
     public TaskSchedulerAppCallbackDrainable(TaskSchedulerAppCallbackWrapper real) {
-      completionService = real.completionService;
+      countingExecutorService = (CountingExecutorService) real.executorService;
       this.real = real;
     }
 
@@ -301,7 +312,7 @@ class TestTaskSchedulerHelpers {
 
     public void drain() throws InterruptedException, ExecutionException {
       while (completedEvents < invocations) {
-        Future f = completionService.poll(5000l, TimeUnit.MILLISECONDS);
+        Future f = countingExecutorService.completionService.poll(5000l, TimeUnit.MILLISECONDS);
         if (f != null) {
           completedEvents++;
         } else {
@@ -365,5 +376,87 @@ class TestTaskSchedulerHelpers {
       }
     }
   }
+  
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  private static class CountingExecutorService implements ExecutorService {
+
+    final ExecutorService real;
+    final CompletionService completionService;
+
+    CountingExecutorService(ExecutorService real) {
+      this.real = real;
+      completionService = new ExecutorCompletionService(real);
+    }
+
+    @Override
+    public void execute(Runnable command) {
+      throw new UnsupportedOperationException("Not expected to be used");
+    }
+
+    @Override
+    public void shutdown() {
+      real.shutdown();
+    }
+
+    @Override
+    public List<Runnable> shutdownNow() {
+      return real.shutdownNow();
+    }
+
+    @Override
+    public boolean isShutdown() {
+      return real.isShutdown();
+    }
+
+    @Override
+    public boolean isTerminated() {
+      return real.isTerminated();
+    }
+
+    @Override
+    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+      return real.awaitTermination(timeout, unit);
+    }
+
+    @Override
+    public <T> Future<T> submit(Callable<T> task) {
+      return completionService.submit(task);
+    }
+
+    @Override
+    public <T> Future<T> submit(Runnable task, T result) {
+      return completionService.submit(task, result);
+    }
+
+    @Override
+    public Future<?> submit(Runnable task) {
+      throw new UnsupportedOperationException("Not expected to be used");
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+        throws InterruptedException {
+      throw new UnsupportedOperationException("Not expected to be used");
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout,
+        TimeUnit unit) throws InterruptedException {
+      throw new UnsupportedOperationException("Not expected to be used");
+    }
+
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException,
+        ExecutionException {
+      throw new UnsupportedOperationException("Not expected to be used");
+    }
+
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+        throws InterruptedException, ExecutionException, TimeoutException {
+      throw new UnsupportedOperationException("Not expected to be used");
+    }
+    
+  }
 
 }