You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by jl...@apache.org on 2016/02/26 16:47:05 UTC
tez git commit: TEZ-3114. Shuffle OOM due to EventMetaData flood
(jlowe) (cherry picked from commit 923f7b4e298703658598d5cd3809f38f1231c4ab)
Repository: tez
Updated Branches:
refs/heads/branch-0.7 dedec2a78 -> 58218fb25
TEZ-3114. Shuffle OOM due to EventMetaData flood (jlowe)
(cherry picked from commit 923f7b4e298703658598d5cd3809f38f1231c4ab)
Conflicts:
CHANGES.txt
tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/58218fb2
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/58218fb2
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/58218fb2
Branch: refs/heads/branch-0.7
Commit: 58218fb25ca940b1d72764c0a39b7cbf41e09429
Parents: dedec2a
Author: Jason Lowe <jl...@apache.org>
Authored: Fri Feb 26 15:46:29 2016 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Fri Feb 26 15:46:29 2016 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/tez/dag/api/TezConfiguration.java | 10 +++++
.../runtime/LogicalIOProcessorRuntimeTask.java | 8 ++++
.../org/apache/tez/runtime/RuntimeTask.java | 2 +
.../apache/tez/runtime/task/TaskReporter.java | 3 +-
.../tez/runtime/task/TestTaskReporter.java | 41 ++++++++++++++++++++
6 files changed, 64 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/58218fb2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ec25e5e..8ea37dc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
TEZ-2972. Avoid task rescheduling when a node turns unhealthy
ALL CHANGES
+ TEZ-3114. Shuffle OOM due to EventMetaData flood
TEZ-3102. Fetch failure of a speculated task causes job hang
TEZ-3137. Tez task failed with illegal state exception in recovery
TEZ-3126. Log reason for not reducing parallelism
http://git-wip-us.apache.org/repos/asf/tez/blob/58218fb2/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index d4d5759..7fef24e 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -664,6 +664,16 @@ public class TezConfiguration extends Configuration {
public static final int TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT = 500;
/**
+ * Int value. Maximum number of pending task events before a task will stop
+ * asking for more events in the task heartbeat.
+ * Expert level setting.
+ */
+ @ConfigurationScope(Scope.AM)
+ public static final String TEZ_TASK_MAX_EVENT_BACKLOG = TEZ_TASK_PREFIX +
+ "max-event-backlog";
+ public static final int TEZ_TASK_MAX_EVENT_BACKLOG_DEFAULT = 10000;
+
+ /**
* Long value. Interval, in milliseconds, within which any of the tasks Input/Processor/Output
* components need to make successive progress notifications. If the progress is not notified
* for this interval then the task will be considered hung and terminated.
http://git-wip-us.apache.org/repos/asf/tez/blob/58218fb2/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 99e9cb7..9b8daf6 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -148,6 +148,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
private volatile ObjectRegistry objectRegistry;
private final ExecutionContext ExecutionContext;
private final long memAvailable;
+ private final int maxEventBacklog;
public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, int appAttemptNumber,
Configuration tezConf, String[] localDirs, TezUmbilical tezUmbilical,
@@ -196,6 +197,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
this.objectRegistry = objectRegistry;
this.ExecutionContext = ExecutionContext;
this.memAvailable = memAvailable;
+ this.maxEventBacklog = tezConf.getInt(TezConfiguration.TEZ_TASK_MAX_EVENT_BACKLOG,
+ TezConfiguration.TEZ_TASK_MAX_EVENT_BACKLOG_DEFAULT);
}
/**
@@ -720,6 +723,11 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
}
@Override
+ public int getMaxEventsToHandle() {
+ return Math.max(0, maxEventBacklog - eventsToBeProcessed.size());
+ }
+
+ @Override
public synchronized void handleEvents(Collection<TezEvent> events) {
if (events == null || events.isEmpty()) {
return;
http://git-wip-us.apache.org/repos/asf/tez/blob/58218fb2/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
index 70dc867..d827b91 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
@@ -139,6 +139,8 @@ public abstract class RuntimeTask {
return taskSpec.getTaskAttemptID();
}
+ public abstract int getMaxEventsToHandle();
+
public abstract void handleEvents(Collection<TezEvent> events);
public int getEventCounter() {
http://git-wip-us.apache.org/repos/asf/tez/blob/58218fb2/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
index 09693d9..18c37e6 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
@@ -250,8 +250,9 @@ public class TaskReporter {
long requestId = requestCounter.incrementAndGet();
int fromEventId = task.getNextFromEventId();
int fromPreRoutedEventId = task.getNextPreRoutedEventId();
+ int maxEvents = Math.min(maxEventsToGet, task.getMaxEventsToHandle());
TezHeartbeatRequest request = new TezHeartbeatRequest(requestId, events, fromPreRoutedEventId,
- containerIdStr, task.getTaskAttemptID(), fromEventId, maxEventsToGet);
+ containerIdStr, task.getTaskAttemptID(), fromEventId, maxEvents);
if (LOG.isDebugEnabled()) {
LOG.debug("Sending heartbeat to AM, request=" + request);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/58218fb2/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java
index e137d50..04c467a 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java
@@ -21,13 +21,16 @@ package org.apache.tez.runtime.task;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -45,6 +48,7 @@ import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -110,6 +114,43 @@ public class TestTaskReporter {
}
+ @Test(timeout = 10000)
+ public void testEventThrottling() throws Exception {
+ TezTaskAttemptID mockTaskAttemptId = mock(TezTaskAttemptID.class);
+ LogicalIOProcessorRuntimeTask mockTask = mock(LogicalIOProcessorRuntimeTask.class);
+ when(mockTask.getMaxEventsToHandle()).thenReturn(10000, 1);
+ when(mockTask.getVertexName()).thenReturn("vertexName");
+ when(mockTask.getTaskAttemptID()).thenReturn(mockTaskAttemptId);
+
+ TezTaskUmbilicalProtocol mockUmbilical = mock(TezTaskUmbilicalProtocol.class);
+ TezHeartbeatResponse resp1 = new TezHeartbeatResponse(createEvents(5));
+ resp1.setLastRequestId(1);
+ TezHeartbeatResponse resp2 = new TezHeartbeatResponse(createEvents(1));
+ resp2.setLastRequestId(2);
+ resp2.setShouldDie();
+ when(mockUmbilical.heartbeat(isA(TezHeartbeatRequest.class))).thenReturn(resp1, resp2);
+
+ // Setup the sleep time to be way higher than the test timeout
+ TaskReporter.HeartbeatCallable heartbeatCallable =
+ new TaskReporter.HeartbeatCallable(mockTask, mockUmbilical, 100000, 100000, 5,
+ new AtomicLong(0),
+ "containerIdStr");
+
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ try {
+ Future<Boolean> result = executor.submit(heartbeatCallable);
+ Assert.assertFalse(result.get());
+ } finally {
+ executor.shutdownNow();
+ }
+
+ ArgumentCaptor<TezHeartbeatRequest> captor = ArgumentCaptor.forClass(TezHeartbeatRequest.class);
+ verify(mockUmbilical, times(2)).heartbeat(captor.capture());
+ TezHeartbeatRequest req = captor.getValue();
+ Assert.assertEquals(2, req.getRequestId());
+ Assert.assertEquals(1, req.getMaxEvents());
+ }
+
@Test (timeout=5000)
public void testStatusUpdateAfterInitializationAndCounterFlag() {
TezTaskAttemptID mockTaskAttemptId = mock(TezTaskAttemptID.class);