You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/03/18 18:47:33 UTC
git commit: TEZ-940. Fix a memory leak in
TaskSchedulerAppCallbackWrapper. Contributed by Gopal V and Siddharth Seth.
Repository: incubator-tez
Updated Branches:
refs/heads/master 4a60f05c1 -> 3f3f94827
TEZ-940. Fix a memory leak in TaskSchedulerAppCallbackWrapper.
Contributed by Gopal V and Siddharth Seth.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/3f3f9482
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/3f3f9482
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/3f3f9482
Branch: refs/heads/master
Commit: 3f3f94827486ceb87d9a76311911fe4ebc8cdc02
Parents: 4a60f05
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Mar 18 10:46:56 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue Mar 18 10:46:56 2014 -0700
----------------------------------------------------------------------
.../apache/tez/dag/app/rm/TaskScheduler.java | 3 +-
.../app/rm/TaskSchedulerAppCallbackWrapper.java | 35 ++-----
.../dag/app/rm/TestTaskSchedulerHelpers.java | 101 ++++++++++++++++++-
3 files changed, 110 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3f3f9482/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
index bb06904..606a0b3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
@@ -254,7 +254,8 @@ public class TaskScheduler extends AbstractService
this.appContext = appContext;
}
- private ExecutorService createAppCallbackExecutorService() {
+ @VisibleForTesting
+ ExecutorService createAppCallbackExecutorService() {
return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setNameFormat("TaskSchedulerAppCaller #%d").setDaemon(true).build());
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3f3f9482/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java
index c690926..53c3e95 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java
@@ -22,8 +22,6 @@ import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -36,20 +34,15 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.app.rm.TaskScheduler.TaskSchedulerAppCallback;
-import com.google.common.annotations.VisibleForTesting;
-
/**
* Makes use of an ExecutionService to invoke application callbacks. Methods
* which return values wait for execution to complete - effectively waiting for
* all previous events in the queue to complete.
*/
-@SuppressWarnings({"rawtypes", "unchecked"})
class TaskSchedulerAppCallbackWrapper implements TaskSchedulerAppCallback {
private TaskSchedulerAppCallback real;
-
- @VisibleForTesting
- CompletionService completionService;
+
ExecutorService executorService;
/**
@@ -60,58 +53,52 @@ class TaskSchedulerAppCallbackWrapper implements TaskSchedulerAppCallback {
ExecutorService executorService) {
this.real = real;
this.executorService = executorService;
- this.completionService = createAppCallbackCompletionService();
- }
-
- @VisibleForTesting
- CompletionService createAppCallbackCompletionService() {
- return new ExecutorCompletionService(this.executorService);
}
@Override
public void taskAllocated(Object task, Object appCookie, Container container) {
- completionService.submit(new TaskAllocatedCallable(real, task, appCookie,
+ executorService.submit(new TaskAllocatedCallable(real, task, appCookie,
container));
}
@Override
public void containerCompleted(Object taskLastAllocated,
ContainerStatus containerStatus) {
- completionService.submit(new ContainerCompletedCallable(real,
+ executorService.submit(new ContainerCompletedCallable(real,
taskLastAllocated, containerStatus));
}
@Override
public void containerBeingReleased(ContainerId containerId) {
- completionService
+ executorService
.submit(new ContainerBeingReleasedCallable(real, containerId));
}
@Override
public void nodesUpdated(List<NodeReport> updatedNodes) {
- completionService.submit(new NodesUpdatedCallable(real, updatedNodes));
+ executorService.submit(new NodesUpdatedCallable(real, updatedNodes));
}
@Override
public void appShutdownRequested() {
- completionService.submit(new AppShudownRequestedCallable(real));
+ executorService.submit(new AppShudownRequestedCallable(real));
}
@Override
public void setApplicationRegistrationData(Resource maxContainerCapability,
Map<ApplicationAccessType, String> appAcls, ByteBuffer key) {
- completionService.submit(new SetApplicationRegistrationDataCallable(real,
+ executorService.submit(new SetApplicationRegistrationDataCallable(real,
maxContainerCapability, appAcls, key));
}
@Override
public void onError(Throwable t) {
- completionService.submit(new OnErrorCallable(real, t));
+ executorService.submit(new OnErrorCallable(real, t));
}
@Override
public float getProgress() {
- Future<Float> progressFuture = completionService
+ Future<Float> progressFuture = executorService
.submit(new GetProgressCallable(real));
try {
return progressFuture.get();
@@ -122,12 +109,12 @@ class TaskSchedulerAppCallbackWrapper implements TaskSchedulerAppCallback {
@Override
public void preemptContainer(ContainerId containerId) {
- completionService.submit(new PreemptContainerCallable(real, containerId));
+ executorService.submit(new PreemptContainerCallable(real, containerId));
}
@Override
public AppFinalStatus getFinalAppStatus() {
- Future<AppFinalStatus> appFinalStatusFuture = completionService
+ Future<AppFinalStatus> appFinalStatusFuture = executorService
.submit(new GetFinalAppStatusCallable(real));
try {
return appFinalStatusFuture.get();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3f3f9482/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index 087da83..72cc8e7 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -25,13 +25,18 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.nio.ByteBuffer;
+import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -108,7 +113,7 @@ class TestTaskSchedulerHelpers {
protected void serviceStop() {
}
}
-
+
// Overrides start / stop. Will be controlled without the extra event handling thread.
static class TaskSchedulerEventHandlerForTest extends
TaskSchedulerEventHandler {
@@ -217,6 +222,12 @@ class TestTaskSchedulerHelpers {
appCallbackExecutor));
return drainableAppCallback;
}
+
+ @Override
+ ExecutorService createAppCallbackExecutorService() {
+ ExecutorService real = super.createAppCallbackExecutorService();
+ return new CountingExecutorService(real);
+ }
public TaskSchedulerAppCallbackDrainable getDrainableAppCallback() {
return drainableAppCallback;
@@ -228,11 +239,11 @@ class TestTaskSchedulerHelpers {
int completedEvents;
int invocations;
private TaskSchedulerAppCallback real;
- private CompletionService completionService;
+ private CountingExecutorService countingExecutorService;
final AtomicInteger count = new AtomicInteger(0);
public TaskSchedulerAppCallbackDrainable(TaskSchedulerAppCallbackWrapper real) {
- completionService = real.completionService;
+ countingExecutorService = (CountingExecutorService) real.executorService;
this.real = real;
}
@@ -301,7 +312,7 @@ class TestTaskSchedulerHelpers {
public void drain() throws InterruptedException, ExecutionException {
while (completedEvents < invocations) {
- Future f = completionService.poll(5000l, TimeUnit.MILLISECONDS);
+ Future f = countingExecutorService.completionService.poll(5000l, TimeUnit.MILLISECONDS);
if (f != null) {
completedEvents++;
} else {
@@ -365,5 +376,87 @@ class TestTaskSchedulerHelpers {
}
}
}
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private static class CountingExecutorService implements ExecutorService {
+
+ final ExecutorService real;
+ final CompletionService completionService;
+
+ CountingExecutorService(ExecutorService real) {
+ this.real = real;
+ completionService = new ExecutorCompletionService(real);
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ throw new UnsupportedOperationException("Not expected to be used");
+ }
+
+ @Override
+ public void shutdown() {
+ real.shutdown();
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ return real.shutdownNow();
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return real.isShutdown();
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return real.isTerminated();
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ return real.awaitTermination(timeout, unit);
+ }
+
+ @Override
+ public <T> Future<T> submit(Callable<T> task) {
+ return completionService.submit(task);
+ }
+
+ @Override
+ public <T> Future<T> submit(Runnable task, T result) {
+ return completionService.submit(task, result);
+ }
+
+ @Override
+ public Future<?> submit(Runnable task) {
+ throw new UnsupportedOperationException("Not expected to be used");
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+ throws InterruptedException {
+ throw new UnsupportedOperationException("Not expected to be used");
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout,
+ TimeUnit unit) throws InterruptedException {
+ throw new UnsupportedOperationException("Not expected to be used");
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException,
+ ExecutionException {
+ throw new UnsupportedOperationException("Not expected to be used");
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ throw new UnsupportedOperationException("Not expected to be used");
+ }
+
+ }
}