You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by go...@apache.org on 2018/10/23 22:34:23 UTC
tez git commit: TEZ-3976: Batch ShuffleManager error report events
(Jaume Marhuenda, reviewed by Gopal V)
Repository: tez
Updated Branches:
refs/heads/master 927781566 -> 79af4e8d0
TEZ-3976: Batch ShuffleManager error report events (Jaume Marhuenda, reviewed by Gopal V)
Signed-off-by: Gopal V <go...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/79af4e8d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/79af4e8d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/79af4e8d
Branch: refs/heads/master
Commit: 79af4e8d06417829986dfc34b3627ead15d563ee
Parents: 9277815
Author: Jaume Marhuenda <jm...@hortonworks.com>
Authored: Tue Oct 23 15:30:13 2018 -0700
Committer: Gopal V <go...@apache.org>
Committed: Tue Oct 23 15:30:13 2018 -0700
----------------------------------------------------------------------
.../runtime/api/events/InputReadErrorEvent.java | 45 ++++++-
.../library/api/TezRuntimeConfiguration.java | 10 ++
.../common/shuffle/impl/ShuffleManager.java | 129 ++++++++++++++++---
.../common/shuffle/impl/TestShuffleManager.java | 66 ++++++++++
4 files changed, 232 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/79af4e8d/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputReadErrorEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputReadErrorEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputReadErrorEvent.java
index 7d2e0d2..cabc39f 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputReadErrorEvent.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputReadErrorEvent.java
@@ -21,6 +21,8 @@ package org.apache.tez.runtime.api.events;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.tez.runtime.api.Event;
+import java.util.Objects;
+
/**
* Event generated by an Input to indicate error when trying to retrieve data.
* This is not necessarily a fatal event - it's an indication to the AM to retry
@@ -44,17 +46,31 @@ public final class InputReadErrorEvent extends Event {
*/
private final int version;
- private InputReadErrorEvent(String diagnostics, int index,
- int version) {
+ /**
+ * Number of failures.
+ */
+ private final int numFailures;
+
+ private InputReadErrorEvent(final String diagnostics, final int index,
+ final int version, final int numFailures) {
super();
this.diagnostics = diagnostics;
this.index = index;
this.version = version;
+ this.numFailures = numFailures;
}
public static InputReadErrorEvent create(String diagnostics, int index,
int version) {
- return new InputReadErrorEvent(diagnostics, index, version);
+ return create(diagnostics, index, version, 1);
+ }
+
+ /**
+ * Create an InputReadErrorEvent.
+ */
+ public static InputReadErrorEvent create(final String diagnostics, final int index,
+ final int version, final int numFailures) {
+ return new InputReadErrorEvent(diagnostics, index, version, numFailures);
}
public String getDiagnostics() {
@@ -69,4 +85,27 @@ public final class InputReadErrorEvent extends Event {
return version;
}
+ /**
+ * @return number of failures
+ */
+ public int getNumFailures() {
+ return numFailures;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(index, version);
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ InputReadErrorEvent that = (InputReadErrorEvent) o;
+ return index == that.index && version == that.version;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/79af4e8d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
index 85c53a5..86792e2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
@@ -512,6 +512,15 @@ public class TezRuntimeConfiguration {
TEZ_RUNTIME_PREFIX + "enable.final-merge.in.output";
public static final boolean TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT_DEFAULT = true;
+ /**
+ * Expert level setting. How long should @link{ShuffleManager} wait for batching
+ * before sending the events in milliseconds. Set to -1 to not wait.
+ */
+ @ConfigurationProperty(type = "integer")
+ public static final String TEZ_RUNTIME_SHUFFLE_BATCH_WAIT =
+ TEZ_RUNTIME_PREFIX + "shuffle.batch.wait";
+ public static final int TEZ_RUNTIME_SHUFFLE_BATCH_WAIT_DEFAULT = -1;
+
/**
* Share data fetched between tasks running on the same host if applicable
@@ -619,6 +628,7 @@ public class TezRuntimeConfiguration {
tezRuntimeKeys.add(TEZ_RUNTIME_UNORDERED_PARTITIONED_KVWRITER_BUFFER_MERGE_PERCENT);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_HOST_PENALTY_TIME_LIMIT_MS);
+ tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_BATCH_WAIT);
defaultConf.addResource("core-default.xml");
defaultConf.addResource("core-site.xml");
http://git-wip-us.apache.org/repos/asf/tez/blob/79af4e8d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index 5f3693f..ba8592f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -26,6 +26,7 @@ import java.text.DecimalFormat;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -35,6 +36,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -46,6 +48,8 @@ import java.util.concurrent.locks.ReentrantLock;
import javax.crypto.SecretKey;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.MonotonicClock;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.http.HttpConnectionParams;
import org.apache.tez.runtime.api.TaskFailureType;
@@ -114,9 +118,30 @@ public class ShuffleManager implements FetcherCallback {
@VisibleForTesting
final ListeningExecutorService fetcherExecutor;
+ /**
+ * Executor for ReportCallable.
+ */
+ private ExecutorService reporterExecutor;
+
+ /**
+ * Lock to sync failedEvents.
+ */
+ private final ReentrantLock reportLock = new ReentrantLock();
+
+ /**
+ * Condition to wake up the thread notifying when events fail.
+ */
+ private final Condition reportCondition = reportLock.newCondition();
+
+ /**
+ * Events reporting fetcher failed.
+ */
+ private final HashMap<InputReadErrorEvent, Integer> failedEvents
+ = new HashMap<>();
+
private final ListeningExecutorService schedulerExecutor;
private final RunShuffleCallable schedulerCallable;
-
+
private final BlockingQueue<FetchedInput> completedInputs;
private final AtomicBoolean inputReadyNotificationSent = new AtomicBoolean(false);
@VisibleForTesting
@@ -151,6 +176,11 @@ public class ShuffleManager implements FetcherCallback {
private final int ifileBufferSize;
private final boolean ifileReadAhead;
private final int ifileReadAheadLength;
+
+ /**
+ * Holds the time to wait for failures to batch them and send less events.
+ */
+ private final int maxTimeToWaitForReportMillis;
private final String srcNameTrimmed;
@@ -199,7 +229,8 @@ public class ShuffleManager implements FetcherCallback {
this.bytesShuffledToDiskCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_DISK);
this.bytesShuffledToMemCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_MEM);
this.bytesShuffledDirectDiskCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_DISK_DIRECT);
-
+
+
this.ifileBufferSize = bufferSize;
this.ifileReadAhead = ifileReadAheadEnabled;
this.ifileReadAheadLength = ifileReadAheadLength;
@@ -212,6 +243,10 @@ public class ShuffleManager implements FetcherCallback {
this.verifyDiskChecksum = conf.getBoolean(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM_DEFAULT);
+ this.maxTimeToWaitForReportMillis = conf.getInt(
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BATCH_WAIT,
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BATCH_WAIT_DEFAULT);
+
this.shufflePhaseTime = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_PHASE_TIME);
this.firstEventReceived = inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED);
@@ -302,12 +337,63 @@ public class ShuffleManager implements FetcherCallback {
public void run() throws IOException {
Preconditions.checkState(inputManager != null, "InputManager must be configured");
+ if (maxTimeToWaitForReportMillis > 0) {
+ reporterExecutor = Executors.newSingleThreadExecutor(
+ new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("ShuffleRunner {" + srcNameTrimmed + "}")
+ .build());
+ Future reporterFuture = reporterExecutor.submit(new ReporterCallable());
+ }
+
ListenableFuture<Void> runShuffleFuture = schedulerExecutor.submit(schedulerCallable);
Futures.addCallback(runShuffleFuture, new SchedulerFutureCallback());
// Shutdown this executor once this task, and the callback complete.
schedulerExecutor.shutdown();
}
-
+
+ private class ReporterCallable extends CallableWithNdc<Void> {
+ /**
+ * Measures if the batching interval has ended.
+ */
+ private final Clock clock;
+ ReporterCallable() {
+ clock = new MonotonicClock();
+ }
+
+ @Override
+ protected Void callInternal() throws Exception {
+ long nextReport = 0;
+ while (!isShutdown.get()) {
+ try {
+ reportLock.lock();
+ while (failedEvents.isEmpty()) {
+ boolean signaled = reportCondition.await(maxTimeToWaitForReportMillis,
+ TimeUnit.MILLISECONDS);
+ }
+
+ long currentTime = clock.getTime();
+ if (currentTime > nextReport) {
+ if (failedEvents.size() > 0) {
+ List<Event> failedEventsToSend = Lists.newArrayListWithCapacity(
+ failedEvents.size());
+ for (InputReadErrorEvent key : failedEvents.keySet()) {
+ failedEventsToSend.add(InputReadErrorEvent
+ .create(key.getDiagnostics(), key.getIndex(),
+ key.getVersion(), failedEvents.get(key)));
+ }
+ inputContext.sendEvents(failedEventsToSend);
+ failedEvents.clear();
+ nextReport = currentTime + maxTimeToWaitForReportMillis;
+ }
+ }
+ } finally {
+ reportLock.unlock();
+ }
+ }
+ return null;
+ }
+ }
+
private class RunShuffleCallable extends CallableWithNdc<Void> {
private final Configuration conf;
@@ -804,18 +890,27 @@ public class ShuffleManager implements FetcherCallback {
if (srcAttemptIdentifier == null) {
reportFatalError(null, "Received fetchFailure for an unknown src (null)");
} else {
- InputReadErrorEvent readError = InputReadErrorEvent.create(
- "Fetch failure while fetching from "
- + TezRuntimeUtils.getTaskAttemptIdentifier(
- inputContext.getSourceVertexName(),
- srcAttemptIdentifier.getInputIdentifier(),
- srcAttemptIdentifier.getAttemptNumber()),
- srcAttemptIdentifier.getInputIdentifier(),
- srcAttemptIdentifier.getAttemptNumber());
-
- List<Event> failedEvents = Lists.newArrayListWithCapacity(1);
- failedEvents.add(readError);
- inputContext.sendEvents(failedEvents);
+ InputReadErrorEvent readError = InputReadErrorEvent.create(
+ "Fetch failure while fetching from "
+ + TezRuntimeUtils.getTaskAttemptIdentifier(
+ inputContext.getSourceVertexName(),
+ srcAttemptIdentifier.getInputIdentifier(),
+ srcAttemptIdentifier.getAttemptNumber()),
+ srcAttemptIdentifier.getInputIdentifier(),
+ srcAttemptIdentifier.getAttemptNumber());
+ if (maxTimeToWaitForReportMillis > 0) {
+ try {
+ reportLock.lock();
+ failedEvents.merge(readError, 1, (a, b) -> a + b);
+ reportCondition.signal();
+ } finally {
+ reportLock.unlock();
+ }
+ } else {
+ List<Event> events = Lists.newArrayListWithCapacity(1);
+ events.add(readError);
+ inputContext.sendEvents(events);
+ }
}
}
/////////////////// End of Methods from FetcherCallbackHandler
@@ -849,6 +944,10 @@ public class ShuffleManager implements FetcherCallback {
if (this.schedulerExecutor != null && !this.schedulerExecutor.isShutdown()) {
this.schedulerExecutor.shutdownNow();
}
+ if (this.reporterExecutor != null
+ && !this.reporterExecutor.isShutdown()) {
+ this.reporterExecutor.shutdownNow();
+ }
if (this.fetcherExecutor != null && !this.fetcherExecutor.isShutdown()) {
this.fetcherExecutor.shutdownNow(); // Interrupts all running fetchers.
}
http://git-wip-us.apache.org/repos/asf/tez/blob/79af4e8d/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
index 103f83d..94f7f5a 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
@@ -20,7 +20,9 @@ package org.apache.tez.runtime.library.common.shuffle.impl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyList;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
@@ -35,6 +37,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
@@ -57,6 +60,7 @@ import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
@@ -67,8 +71,10 @@ import org.apache.tez.runtime.library.common.shuffle.InputHost;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -214,6 +220,64 @@ public class TestShuffleManager {
verify(inputContext, atLeast(3)).notifyProgress();
}
+ @Test (timeout = 200000)
+ public void testFetchFailed() throws Exception {
+ InputContext inputContext = createInputContext();
+ final ShuffleManager shuffleManager = spy(createShuffleManager(inputContext, 1));
+ Thread schedulerGetHostThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ shuffleManager.run();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ InputAttemptIdentifier inputAttemptIdentifier
+ = new InputAttemptIdentifier(1, 1);
+
+ schedulerGetHostThread.start();
+ Thread.sleep(1000);
+ shuffleManager.fetchFailed("host1", inputAttemptIdentifier, false);
+ Thread.sleep(1000);
+
+ ArgumentCaptor<List> captor = ArgumentCaptor.forClass(List.class);
+ verify(inputContext, times(1))
+ .sendEvents(captor.capture());
+ Assert.assertEquals("Size was: " + captor.getAllValues().size(),
+ captor.getAllValues().size(), 1);
+ List<Event> capturedList = captor.getAllValues().get(0);
+ Assert.assertEquals("Size was: " + capturedList.size(),
+ capturedList.size(), 1);
+ InputReadErrorEvent inputEvent = (InputReadErrorEvent)capturedList.get(0);
+ Assert.assertEquals("Number of failures was: " + inputEvent.getNumFailures(),
+ inputEvent.getNumFailures(), 1);
+
+ shuffleManager.fetchFailed("host1", inputAttemptIdentifier, false);
+ shuffleManager.fetchFailed("host1", inputAttemptIdentifier, false);
+
+ Thread.sleep(1000);
+ verify(inputContext, times(1)).sendEvents(any());
+
+ // Wait more than five seconds for the batch to go out
+ Thread.sleep(5000);
+ captor = ArgumentCaptor.forClass(List.class);
+ verify(inputContext, times(2))
+ .sendEvents(captor.capture());
+ Assert.assertEquals("Size was: " + captor.getAllValues().size(),
+ captor.getAllValues().size(), 2);
+ capturedList = captor.getAllValues().get(1);
+ Assert.assertEquals("Size was: " + capturedList.size(),
+ capturedList.size(), 1);
+ inputEvent = (InputReadErrorEvent)capturedList.get(0);
+ Assert.assertEquals("Number of failures was: " + inputEvent.getNumFailures(),
+ inputEvent.getNumFailures(), 2);
+
+
+ schedulerGetHostThread.interrupt();
+ }
+
private ShuffleManagerForTest createShuffleManager(
InputContext inputContext, int expectedNumOfPhysicalInputs)
throws IOException {
@@ -222,6 +286,8 @@ public class TestShuffleManager {
doReturn(outDirs).when(inputContext).getWorkDirs();
conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS,
inputContext.getWorkDirs());
+ // 5 seconds
+ conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BATCH_WAIT, 5000);
DataOutputBuffer out = new DataOutputBuffer();
Token<JobTokenIdentifier> token = new Token<JobTokenIdentifier>(new JobTokenIdentifier(),