You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by jl...@apache.org on 2016/02/26 16:42:32 UTC

tez git commit: TEZ-3114. Shuffle OOM due to EventMetaData flood (jlowe)

Repository: tez
Updated Branches:
  refs/heads/master 15d7339e9 -> 923f7b4e2


TEZ-3114. Shuffle OOM due to EventMetaData flood (jlowe)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/923f7b4e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/923f7b4e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/923f7b4e

Branch: refs/heads/master
Commit: 923f7b4e298703658598d5cd3809f38f1231c4ab
Parents: 15d7339
Author: Jason Lowe <jl...@apache.org>
Authored: Fri Feb 26 15:41:50 2016 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Fri Feb 26 15:41:50 2016 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../apache/tez/dag/api/TezConfiguration.java    | 11 ++++++
 .../runtime/LogicalIOProcessorRuntimeTask.java  |  8 ++++
 .../org/apache/tez/runtime/RuntimeTask.java     |  2 +
 .../apache/tez/runtime/task/TaskReporter.java   |  3 +-
 .../tez/runtime/task/TestTaskReporter.java      | 41 ++++++++++++++++++++
 6 files changed, 66 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/923f7b4e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e8e72b7..dd8b1dd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
   TEZ-3029. Add an onError method to service plugin contexts.
 
 ALL CHANGES:
+  TEZ-3114. Shuffle OOM due to EventMetaData flood
   TEZ-1911. MergeManager's unconditionalReserve() should check for memory limits before allocating.
   TEZ-3102. Fetch failure of a speculated task causes job hang
   TEZ-3124. Running task hangs due to missing event to initialize input in recovery.
@@ -386,6 +387,7 @@ INCOMPATIBLE CHANGES
   TEZ-2949. Allow duplicate dag names within session for Tez.
 
 ALL CHANGES
+  TEZ-3114. Shuffle OOM due to EventMetaData flood
   TEZ-3102. Fetch failure of a speculated task causes job hang
   TEZ-3126. Log reason for not reducing parallelism
   TEZ-3123. Containers can get re-used even with conflicting local resources.

http://git-wip-us.apache.org/repos/asf/tez/blob/923f7b4e/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 9f7777f..221ac47 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -715,6 +715,17 @@ public class TezConfiguration extends Configuration {
   public static final int TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT = 500;
   
   /**
+   * Int value. Maximum number of pending task events before a task will stop
+   * asking for more events in the task heartbeat.
+   * Expert level setting.
+   */
+  @ConfigurationScope(Scope.AM)
+  @ConfigurationProperty(type="integer")
+  public static final String TEZ_TASK_MAX_EVENT_BACKLOG = TEZ_TASK_PREFIX +
+      "max-event-backlog";
+  public static final int TEZ_TASK_MAX_EVENT_BACKLOG_DEFAULT = 10000;
+
+  /**
    * Long value. Interval, in milliseconds, within which any of the tasks Input/Processor/Output 
    * components need to make successive progress notifications. If the progress is not notified 
    * for this interval then the task will be considered hung and terminated.

http://git-wip-us.apache.org/repos/asf/tez/blob/923f7b4e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 7f546e6..07f92c2 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -153,6 +153,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   private final ExecutionContext ExecutionContext;
   private final long memAvailable;
   private final HadoopShim hadoopShim;
+  private final int maxEventBacklog;
 
   public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, int appAttemptNumber,
       Configuration tezConf, String[] localDirs, TezUmbilical tezUmbilical,
@@ -203,6 +204,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     this.ExecutionContext = ExecutionContext;
     this.memAvailable = memAvailable;
     this.hadoopShim = hadoopShim;
+    this.maxEventBacklog = tezConf.getInt(TezConfiguration.TEZ_TASK_MAX_EVENT_BACKLOG,
+        TezConfiguration.TEZ_TASK_MAX_EVENT_BACKLOG_DEFAULT);
   }
 
   /**
@@ -730,6 +733,11 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   }
 
   @Override
+  public int getMaxEventsToHandle() {
+    return Math.max(0, maxEventBacklog - eventsToBeProcessed.size());
+  }
+
+  @Override
   public synchronized void handleEvents(Collection<TezEvent> events) {
     if (events == null || events.isEmpty()) {
       return;

http://git-wip-us.apache.org/repos/asf/tez/blob/923f7b4e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
index 529dde0..59c8104 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
@@ -147,6 +147,8 @@ public abstract class RuntimeTask {
     return taskSpec.getTaskAttemptID();
   }
 
+  public abstract int getMaxEventsToHandle();
+
   public abstract void handleEvents(Collection<TezEvent> events);
 
   public int getEventCounter() {

http://git-wip-us.apache.org/repos/asf/tez/blob/923f7b4e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
index 30a1b9c..e5370d4 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
@@ -254,8 +254,9 @@ public class TaskReporter implements TaskReporterInterface {
       long requestId = requestCounter.incrementAndGet();
       int fromEventId = task.getNextFromEventId();
       int fromPreRoutedEventId = task.getNextPreRoutedEventId();
+      int maxEvents = Math.min(maxEventsToGet, task.getMaxEventsToHandle());
       TezHeartbeatRequest request = new TezHeartbeatRequest(requestId, events, fromPreRoutedEventId,
-          containerIdStr, task.getTaskAttemptID(), fromEventId, maxEventsToGet);
+          containerIdStr, task.getTaskAttemptID(), fromEventId, maxEvents);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Sending heartbeat to AM, request=" + request);
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/923f7b4e/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java
index e137d50..04c467a 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java
@@ -21,13 +21,16 @@ package org.apache.tez.runtime.task;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.isA;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -45,6 +48,7 @@ import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
 import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -110,6 +114,43 @@ public class TestTaskReporter {
 
   }
   
+  @Test(timeout = 10000)
+  public void testEventThrottling() throws Exception {
+    TezTaskAttemptID mockTaskAttemptId = mock(TezTaskAttemptID.class);
+    LogicalIOProcessorRuntimeTask mockTask = mock(LogicalIOProcessorRuntimeTask.class);
+    when(mockTask.getMaxEventsToHandle()).thenReturn(10000, 1);
+    when(mockTask.getVertexName()).thenReturn("vertexName");
+    when(mockTask.getTaskAttemptID()).thenReturn(mockTaskAttemptId);
+
+    TezTaskUmbilicalProtocol mockUmbilical = mock(TezTaskUmbilicalProtocol.class);
+    TezHeartbeatResponse resp1 = new TezHeartbeatResponse(createEvents(5));
+    resp1.setLastRequestId(1);
+    TezHeartbeatResponse resp2 = new TezHeartbeatResponse(createEvents(1));
+    resp2.setLastRequestId(2);
+    resp2.setShouldDie();
+    when(mockUmbilical.heartbeat(isA(TezHeartbeatRequest.class))).thenReturn(resp1, resp2);
+
+    // Setup the sleep time to be way higher than the test timeout
+    TaskReporter.HeartbeatCallable heartbeatCallable =
+        new TaskReporter.HeartbeatCallable(mockTask, mockUmbilical, 100000, 100000, 5,
+            new AtomicLong(0),
+            "containerIdStr");
+
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+    try {
+      Future<Boolean> result = executor.submit(heartbeatCallable);
+      Assert.assertFalse(result.get());
+    } finally {
+      executor.shutdownNow();
+    }
+
+    ArgumentCaptor<TezHeartbeatRequest> captor = ArgumentCaptor.forClass(TezHeartbeatRequest.class);
+    verify(mockUmbilical, times(2)).heartbeat(captor.capture());
+    TezHeartbeatRequest req = captor.getValue();
+    Assert.assertEquals(2, req.getRequestId());
+    Assert.assertEquals(1, req.getMaxEvents());
+  }
+
   @Test (timeout=5000)
   public void testStatusUpdateAfterInitializationAndCounterFlag() {
     TezTaskAttemptID mockTaskAttemptId = mock(TezTaskAttemptID.class);