You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/20 19:49:01 UTC
[1/2] tez git commit: TEZ-2454. Change FetcherOrderedGroup to work as
Callables instead of blocking threads. (sseth)
Repository: tez
Updated Branches:
refs/heads/master a9048bb52 -> 70a465dfb
http://git-wip-us.apache.org/repos/asf/tez/blob/70a465df/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
new file mode 100644
index 0000000..ac6c6c0
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.runtime.api.ExecutionContext;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.InputIdentifier;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestShuffleScheduler {
+
+
+ @Test (timeout = 10000)
+ public void testNumParallelScheduledFetchers() throws IOException, InterruptedException {
+ InputContext inputContext = createTezInputContext();
+ Configuration conf = new TezConfiguration();
+ // Allow 10 parallel copies at once.
+ conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES, 10);
+ int numInputs = 50;
+ Shuffle shuffle = mock(Shuffle.class);
+ MergeManager mergeManager = mock(MergeManager.class);
+
+ final ShuffleSchedulerForTest scheduler =
+ new ShuffleSchedulerForTest(inputContext, conf, numInputs, shuffle, mergeManager,
+ mergeManager,
+ System.currentTimeMillis(), null, false, 0, "srcName", true);
+
+ Future<Void> executorFuture = null;
+ ExecutorService executor = Executors.newFixedThreadPool(1);
+ try {
+ executorFuture = executor.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ scheduler.start();
+ return null;
+ }
+ });
+
+ InputAttemptIdentifier[] identifiers = new InputAttemptIdentifier[numInputs];
+
+ // Schedule all copies.
+ for (int i = 0; i < numInputs; i++) {
+ InputAttemptIdentifier inputAttemptIdentifier =
+ new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ scheduler.addKnownMapOutput("host" + i, 10000, 1, "hostUrl", inputAttemptIdentifier);
+ identifiers[i] = inputAttemptIdentifier;
+ }
+
+ // Sleep for a bit to allow the copies to be scheduled.
+ Thread.sleep(2000l);
+ assertEquals(10, scheduler.numFetchersCreated.get());
+
+ } finally {
+ scheduler.close();
+ if (executorFuture != null) {
+ executorFuture.cancel(true);
+ }
+ executor.shutdownNow();
+ }
+ }
+
+ @Test(timeout = 5000)
+ public void testSimpleFlow() throws Exception {
+ InputContext inputContext = createTezInputContext();
+ Configuration conf = new TezConfiguration();
+ int numInputs = 10;
+ Shuffle shuffle = mock(Shuffle.class);
+ MergeManager mergeManager = mock(MergeManager.class);
+
+ final ShuffleSchedulerForTest scheduler =
+ new ShuffleSchedulerForTest(inputContext, conf, numInputs, shuffle, mergeManager,
+ mergeManager,
+ System.currentTimeMillis(), null, false, 0, "srcName");
+
+ ExecutorService executor = Executors.newFixedThreadPool(1);
+ try {
+ Future<Void> executorFuture = executor.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ scheduler.start();
+ return null;
+ }
+ });
+
+ InputAttemptIdentifier[] identifiers = new InputAttemptIdentifier[numInputs];
+
+ for (int i = 0; i < numInputs; i++) {
+ InputAttemptIdentifier inputAttemptIdentifier =
+ new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ scheduler.addKnownMapOutput("host" + i, 10000, 1, "hostUrl", inputAttemptIdentifier);
+ identifiers[i] = inputAttemptIdentifier;
+ }
+
+ MapHost[] mapHosts = new MapHost[numInputs];
+ int count = 0;
+ for (MapHost mh : scheduler.mapLocations.values()) {
+ mapHosts[count++] = mh;
+ }
+
+ for (int i = 0; i < numInputs; i++) {
+ MapOutput mapOutput = MapOutput
+ .createMemoryMapOutput(identifiers[i], mock(FetchedInputAllocatorOrderedGrouped.class),
+ 100, false);
+ scheduler.copySucceeded(identifiers[i], mapHosts[i], 20, 25, 100, mapOutput);
+ scheduler.freeHost(mapHosts[i]);
+ }
+
+ // Ensure the executor exits, and without an error.
+ executorFuture.get();
+ } finally {
+ scheduler.close();
+ executor.shutdownNow();
+ }
+ }
+
+ @Test(timeout = 5000)
+ public void testShutdown() throws Exception {
+ InputContext inputContext = createTezInputContext();
+ Configuration conf = new TezConfiguration();
+ int numInputs = 10;
+ Shuffle shuffle = mock(Shuffle.class);
+ MergeManager mergeManager = mock(MergeManager.class);
+
+ final ShuffleSchedulerForTest scheduler =
+ new ShuffleSchedulerForTest(inputContext, conf, numInputs, shuffle, mergeManager,
+ mergeManager,
+ System.currentTimeMillis(), null, false, 0, "srcName");
+
+ ExecutorService executor = Executors.newFixedThreadPool(1);
+ try {
+ Future<Void> executorFuture = executor.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ scheduler.start();
+ return null;
+ }
+ });
+
+ InputAttemptIdentifier[] identifiers = new InputAttemptIdentifier[numInputs];
+
+ for (int i = 0; i < numInputs; i++) {
+ InputAttemptIdentifier inputAttemptIdentifier =
+ new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ scheduler.addKnownMapOutput("host" + i, 10000, 1, "hostUrl", inputAttemptIdentifier);
+ identifiers[i] = inputAttemptIdentifier;
+ }
+
+ MapHost[] mapHosts = new MapHost[numInputs];
+ int count = 0;
+ for (MapHost mh : scheduler.mapLocations.values()) {
+ mapHosts[count++] = mh;
+ }
+
+ // Copy succeeded for 1 less host
+ for (int i = 0; i < numInputs - 1; i++) {
+ MapOutput mapOutput = MapOutput
+ .createMemoryMapOutput(identifiers[i], mock(FetchedInputAllocatorOrderedGrouped.class),
+ 100, false);
+ scheduler.copySucceeded(identifiers[i], mapHosts[i], 20, 25, 100, mapOutput);
+ scheduler.freeHost(mapHosts[i]);
+ }
+
+ scheduler.close();
+ // Ensure the executor exits, and without an error.
+ executorFuture.get();
+ } finally {
+ scheduler.close();
+ executor.shutdownNow();
+ }
+ }
+
+
+ private InputContext createTezInputContext() throws IOException {
+ ApplicationId applicationId = ApplicationId.newInstance(1, 1);
+ InputContext inputContext = mock(InputContext.class);
+ doReturn(applicationId).when(inputContext).getApplicationId();
+ doReturn("sourceVertex").when(inputContext).getSourceVertexName();
+ when(inputContext.getCounters()).thenReturn(new TezCounters());
+ ExecutionContext executionContext = new ExecutionContextImpl("localhost");
+ doReturn(executionContext).when(inputContext).getExecutionContext();
+ ByteBuffer shuffleBuffer = ByteBuffer.allocate(4).putInt(0, 4);
+ doReturn(shuffleBuffer).when(inputContext).getServiceProviderMetaData(anyString());
+ Token<JobTokenIdentifier>
+ sessionToken = new Token<JobTokenIdentifier>(new JobTokenIdentifier(new Text("text")),
+ new JobTokenSecretManager());
+ ByteBuffer tokenBuffer = TezCommonUtils.serializeServiceData(sessionToken);
+ doReturn(tokenBuffer).when(inputContext).getServiceConsumerMetaData(anyString());
+ return inputContext;
+ }
+
+ private static class ShuffleSchedulerForTest extends ShuffleScheduler {
+
+ private final AtomicInteger numFetchersCreated = new AtomicInteger(0);
+ private final boolean fetcherShouldWait;
+
+ public ShuffleSchedulerForTest(InputContext inputContext, Configuration conf,
+ int numberOfInputs,
+ Shuffle shuffle,
+ MergeManager mergeManager,
+ FetchedInputAllocatorOrderedGrouped allocator, long startTime,
+ CompressionCodec codec,
+ boolean ifileReadAhead, int ifileReadAheadLength,
+ String srcNameTrimmed) throws IOException {
+ this(inputContext, conf, numberOfInputs, shuffle, mergeManager, allocator, startTime, codec,
+ ifileReadAhead, ifileReadAheadLength, srcNameTrimmed, false);
+ }
+
+ public ShuffleSchedulerForTest(InputContext inputContext, Configuration conf,
+ int numberOfInputs,
+ Shuffle shuffle,
+ MergeManager mergeManager,
+ FetchedInputAllocatorOrderedGrouped allocator, long startTime,
+ CompressionCodec codec,
+ boolean ifileReadAhead, int ifileReadAheadLength,
+ String srcNameTrimmed, boolean fetcherShouldWait) throws IOException {
+ super(inputContext, conf, numberOfInputs, shuffle, mergeManager, allocator, startTime, codec,
+ ifileReadAhead, ifileReadAheadLength, srcNameTrimmed);
+ this.fetcherShouldWait = fetcherShouldWait;
+ }
+
+ @Override
+ FetcherOrderedGrouped constructFetcherForHost(MapHost mapHost) {
+ numFetchersCreated.incrementAndGet();
+ FetcherOrderedGrouped mockFetcher = mock(FetcherOrderedGrouped.class);
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ if (fetcherShouldWait) {
+ Thread.sleep(100000l);
+ }
+ return null;
+ }
+ }).when(mockFetcher).callInternal();
+ return mockFetcher;
+ }
+ }
+}
[2/2] tez git commit: TEZ-2454. Change FetcherOrderedGroup to work as
Callables instead of blocking threads. (sseth)
Posted by ss...@apache.org.
TEZ-2454. Change FetcherOrderedGroup to work as Callables instead of
blocking threads. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/70a465df
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/70a465df
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/70a465df
Branch: refs/heads/master
Commit: 70a465dfb73ca3ce97caf1c1427fe6324c1e073f
Parents: a9048bb
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed May 20 10:48:43 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed May 20 10:48:43 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
tez-runtime-library/findbugs-exclude.xml | 5 +
.../library/common/shuffle/HttpConnection.java | 8 +-
.../common/shuffle/impl/ShuffleManager.java | 2 +
.../FetchedInputAllocatorOrderedGrouped.java | 36 ++
.../orderedgrouped/FetcherOrderedGrouped.java | 163 ++++---
.../shuffle/orderedgrouped/MapOutput.java | 26 +-
.../shuffle/orderedgrouped/MergeManager.java | 12 +-
.../common/shuffle/orderedgrouped/Shuffle.java | 176 ++------
.../orderedgrouped/ShuffleScheduler.java | 426 +++++++++++++++----
.../shuffle/orderedgrouped/TestFetcher.java | 110 ++++-
...tShuffleInputEventHandlerOrderedGrouped.java | 51 ++-
.../orderedgrouped/TestShuffleScheduler.java | 279 ++++++++++++
13 files changed, 911 insertions(+), 384 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/70a465df/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7781a9c..32118e7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.8.0: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2454. Change FetcherOrderedGroup to work as Callables instead of blocking threads.
TEZ-2466. tez-history-parser breaks hadoop 2.2 compatability.
TEZ-2463. Update site for 0.7.0 release
TEZ-2461. tez-history-parser compile fails with hadoop-2.4.
http://git-wip-us.apache.org/repos/asf/tez/blob/70a465df/tez-runtime-library/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-runtime-library/findbugs-exclude.xml b/tez-runtime-library/findbugs-exclude.xml
index aa1c7a2..489e243 100644
--- a/tez-runtime-library/findbugs-exclude.xml
+++ b/tez-runtime-library/findbugs-exclude.xml
@@ -104,5 +104,10 @@
<Bug pattern="UCF_USELESS_CONTROL_FLOW"/>
</Match>
+ <Match>
+ <Class name="org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleScheduler"/>
+ <Method name="close" params="" returns="void"/>
+ <Bug pattern="NN_NAKED_NOTIFY"/>
+ </Match>
</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/tez/blob/70a465df/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
index ad6ed19..7827f0a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
@@ -59,10 +59,10 @@ public class HttpConnection {
private static SSLFactory sslFactory;
@VisibleForTesting
- protected HttpURLConnection connection;
- private DataInputStream input;
+ protected volatile HttpURLConnection connection;
+ private volatile DataInputStream input;
- private boolean connectionSucceeed;
+ private volatile boolean connectionSucceeed;
private volatile boolean cleanup;
private final JobTokenSecretManager jobTokenSecretMgr;
@@ -276,6 +276,7 @@ public class HttpConnection {
if (input != null) {
LOG.info("Closing input on " + logIdentifier);
input.close();
+ input = null;
}
if (httpConnParams.keepAlive && connectionSucceeed) {
// Refer:
@@ -287,6 +288,7 @@ public class HttpConnection {
LOG.debug("Closing connection on " + logIdentifier);
}
connection.disconnect();
+ connection = null;
}
} catch (IOException e) {
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/tez/blob/70a465df/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index ac7caca..f354920 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -321,6 +321,7 @@ public class ShuffleManager implements FetcherCallback {
} catch (InterruptedException e) {
if (isShutdown.get()) {
LOG.info("Interrupted and hasBeenShutdown, Breaking out of ShuffleScheduler Loop");
+ Thread.currentThread().interrupt();
break;
} else {
throw e;
@@ -335,6 +336,7 @@ public class ShuffleManager implements FetcherCallback {
runningFetchers.add(fetcher);
if (isShutdown.get()) {
LOG.info("hasBeenShutdown, Breaking out of ShuffleScheduler Loop");
+ break;
}
ListenableFuture<FetchResult> future = fetcherExecutor
.submit(fetcher);
http://git-wip-us.apache.org/repos/asf/tez/blob/70a465df/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetchedInputAllocatorOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetchedInputAllocatorOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetchedInputAllocatorOrderedGrouped.java
new file mode 100644
index 0000000..ec1f8eb
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetchedInputAllocatorOrderedGrouped.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.FileChunk;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+
+public interface FetchedInputAllocatorOrderedGrouped {
+
+ // TODO TEZ-912 Consolidate this with FetchedInputAllocator.
+ public MapOutput reserve(InputAttemptIdentifier srcAttemptIdentifier,
+ long requestedSize,
+ long compressedLength,
+ int fetcherId) throws IOException;
+
+ void closeInMemoryFile(MapOutput mapOutput);
+
+ void closeOnDiskFile(FileChunk file);
+
+ void unreserve(long bytes);
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/70a465df/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index 60f1c98..0248f13 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@ -19,25 +19,24 @@ package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
import java.io.DataInputStream;
import java.io.IOException;
-import java.net.HttpURLConnection;
import java.net.SocketTimeoutException;
import java.net.URL;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.tez.common.CallableWithNdc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.security.JobTokenSecretManager;
-import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.library.common.Constants;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput.Type;
@@ -50,81 +49,87 @@ import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import com.google.common.annotations.VisibleForTesting;
-class FetcherOrderedGrouped extends Thread {
+class FetcherOrderedGrouped extends CallableWithNdc<Void> {
private static final Logger LOG = LoggerFactory.getLogger(FetcherOrderedGrouped.class);
+
+ private static final AtomicInteger nextId = new AtomicInteger(0);
+
private final Configuration conf;
private final boolean localDiskFetchEnabled;
- private enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
- CONNECTION, WRONG_REDUCE}
-
- private final static String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
private final TezCounter connectionErrs;
private final TezCounter ioErrs;
private final TezCounter wrongLengthErrs;
private final TezCounter badIdErrs;
private final TezCounter wrongMapErrs;
private final TezCounter wrongReduceErrs;
- private final MergeManager merger;
+ private final FetchedInputAllocatorOrderedGrouped allocator;
private final ShuffleScheduler scheduler;
private final ShuffleClientMetrics metrics;
private final Shuffle shuffle;
private final int id;
private final String logIdentifier;
private final String localShuffleHostPort;
- private static int nextId = 0;
- private int currentPartition = -1;
+ private final MapHost mapHost;
+
+ private final int currentPartition;
// Decompression of map-outputs
private final CompressionCodec codec;
private final JobTokenSecretManager jobTokenSecretManager;
+ final HttpConnectionParams httpConnectionParams;
+
@VisibleForTesting
volatile boolean stopped = false;
-
+
private final boolean ifileReadAhead;
private final int ifileReadAheadLength;
private LinkedList<InputAttemptIdentifier> remaining;
- volatile HttpURLConnection connection;
volatile DataInputStream input;
- HttpConnection httpConnection;
- HttpConnectionParams httpConnectionParams;
+ volatile HttpConnection httpConnection;
+
// Initiative value is 0, which means it hasn't retried yet.
private long retryStartTime = 0;
-
+
public FetcherOrderedGrouped(HttpConnectionParams httpConnectionParams,
- ShuffleScheduler scheduler, MergeManager merger,
+ ShuffleScheduler scheduler,
+ FetchedInputAllocatorOrderedGrouped allocator,
ShuffleClientMetrics metrics,
Shuffle shuffle, JobTokenSecretManager jobTokenSecretMgr,
boolean ifileReadAhead, int ifileReadAheadLength,
CompressionCodec codec,
- InputContext inputContext, Configuration conf,
+ Configuration conf,
boolean localDiskFetchEnabled,
String localHostname,
- int shufflePort) throws IOException {
- setDaemon(true);
+ int shufflePort,
+ String srcNameTrimmed,
+ MapHost mapHost,
+ TezCounter ioErrsCounter,
+ TezCounter wrongLengthErrsCounter,
+ TezCounter badIdErrsCounter,
+ TezCounter wrongMapErrsCounter,
+ TezCounter connectionErrsCounter,
+ TezCounter wrongReduceErrsCounter) {
this.scheduler = scheduler;
- this.merger = merger;
+ this.allocator = allocator;
this.metrics = metrics;
this.shuffle = shuffle;
- this.id = ++nextId;
+ this.mapHost = mapHost;
+ this.currentPartition = this.mapHost.getPartitionId();
+ this.id = nextId.incrementAndGet();
this.jobTokenSecretManager = jobTokenSecretMgr;
- ioErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
- ShuffleErrors.IO_ERROR.toString());
- wrongLengthErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
- ShuffleErrors.WRONG_LENGTH.toString());
- badIdErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
- ShuffleErrors.BAD_ID.toString());
- wrongMapErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
- ShuffleErrors.WRONG_MAP.toString());
- connectionErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
- ShuffleErrors.CONNECTION.toString());
- wrongReduceErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
- ShuffleErrors.WRONG_REDUCE.toString());
+
+ this.ioErrs = ioErrsCounter;
+ this.wrongLengthErrs = wrongLengthErrsCounter;
+ this.badIdErrs = badIdErrsCounter;
+ this.wrongMapErrs = wrongMapErrsCounter;
+ this.connectionErrs = connectionErrsCounter;
+ this.wrongReduceErrs = wrongReduceErrsCounter;
this.ifileReadAhead = ifileReadAhead;
this.ifileReadAheadLength = ifileReadAheadLength;
@@ -139,73 +144,54 @@ class FetcherOrderedGrouped extends Thread {
this.localDiskFetchEnabled = localDiskFetchEnabled;
- this.logIdentifier = "fetcher [" + TezUtilsInternal
- .cleanVertexName(inputContext.getSourceVertexName()) + "] #" + id;
- setName(logIdentifier);
- setDaemon(true);
- }
+ this.logIdentifier = "fetcher [" + srcNameTrimmed + "] #" + id;
+ }
@VisibleForTesting
protected void fetchNext() throws InterruptedException, IOException {
- MapHost host = null;
try {
- // If merge is on, block
- merger.waitForInMemoryMerge();
-
- // In case usedMemory > memorylimit, wait until some memory is released
- merger.waitForShuffleToMergeMemory();
-
- // Get a host to shuffle from
- host = scheduler.getHost();
metrics.threadBusy();
- String hostPort = host.getHostIdentifier();
+ String hostPort = mapHost.getHostIdentifier();
if (localDiskFetchEnabled && hostPort.equals(localShuffleHostPort)) {
- setupLocalDiskFetch(host);
+ setupLocalDiskFetch(mapHost);
} else {
// Shuffle
- copyFromHost(host);
+ copyFromHost(mapHost);
}
} finally {
cleanupCurrentConnection(false);
- if (host != null) {
- scheduler.freeHost(host);
- metrics.threadFree();
- }
+ scheduler.freeHost(mapHost);
+ metrics.threadFree();
}
}
- public void run() {
+ @Override
+ public Void callInternal() {
try {
- while (!stopped && !Thread.currentThread().isInterrupted()) {
- remaining = null; // Safety.
- fetchNext();
- }
+ remaining = null; // Safety.
+ fetchNext();
} catch (InterruptedException ie) {
//TODO: might not be respected when fetcher is in progress / server is busy. TEZ-711
//Set the status back
Thread.currentThread().interrupt();
- return;
+ return null;
} catch (Throwable t) {
shuffle.reportException(t);
// Shuffle knows how to deal with failures post shutdown via the onFailure hook
}
+ return null;
}
- public void shutDown() throws InterruptedException {
- this.stopped = true;
- interrupt();
- cleanupCurrentConnection(true);
- try {
- join(5000);
- } catch (InterruptedException ie) {
- //Reset the status
- Thread.currentThread().interrupt();
- LOG.warn("Got interrupt while joining " + getName());
+ public void shutDown() {
+ if (!stopped) {
+ stopped = true;
+ // An interrupt will come in while shutting down the thread.
+ cleanupCurrentConnection(false);
}
}
- private Object cleanupLock = new Object();
+ private final Object cleanupLock = new Object();
private void cleanupCurrentConnection(boolean disconnect) {
// Synchronizing on cleanupLock to ensure we don't run into a parallel close
// Can't synchronize on the main class itself since that would cause the
@@ -214,6 +200,7 @@ class FetcherOrderedGrouped extends Thread {
try {
if (httpConnection != null) {
httpConnection.cleanup(disconnect);
+ httpConnection = null;
}
} catch (IOException e) {
if (LOG.isDebugEnabled()) {
@@ -237,8 +224,7 @@ class FetcherOrderedGrouped extends Thread {
retryStartTime = 0;
// Get completed maps on 'host'
List<InputAttemptIdentifier> srcAttempts = scheduler.getMapsForHost(host);
- currentPartition = host.getPartitionId();
-
+
// Sanity check to catch hosts with only 'OBSOLETE' maps,
// especially at the tail of large jobs
if (srcAttempts.size() == 0) {
@@ -254,18 +240,16 @@ class FetcherOrderedGrouped extends Thread {
remaining = new LinkedList<InputAttemptIdentifier>(srcAttempts);
// Construct the url and connect
- if (!setupConnection(host, srcAttempts)) {
- if (stopped) {
- cleanupCurrentConnection(true);
- }
- // Add back all remaining maps - which at this point is ALL MAPS the
- // Fetcher was started with. The Scheduler takes care of retries,
- // reporting too many failures etc.
- putBackRemainingMapOutputs(host);
- return;
- }
try {
+ if (!setupConnection(host, srcAttempts)) {
+ if (stopped) {
+ cleanupCurrentConnection(true);
+ }
+ // Maps will be added back in the finally block in case of failure.
+ return;
+ }
+
// Loop through available map-outputs and fetch them
// On any error, faildTasks is not null and we exit
// after putting back the remaining maps to the
@@ -453,7 +437,7 @@ class FetcherOrderedGrouped extends Thread {
// Get the location for the map output - either in-memory or on-disk
try {
- mapOutput = merger.reserve(srcAttemptId, decompressedLength, compressedLength, id);
+ mapOutput = allocator.reserve(srcAttemptId, decompressedLength, compressedLength, id);
} catch (IOException e) {
if (!stopped) {
// Kill the reduce attempt
@@ -493,7 +477,7 @@ class FetcherOrderedGrouped extends Thread {
// Reset retryStartTime as map task make progress if retried before.
retryStartTime = 0;
- scheduler.copySucceeded(srcAttemptId, host, compressedLength, decompressedLength,
+ scheduler.copySucceeded(srcAttemptId, host, compressedLength, decompressedLength,
endTime - startTime, mapOutput);
// Note successful shuffle
remaining.remove(srcAttemptId);
@@ -584,7 +568,7 @@ class FetcherOrderedGrouped extends Thread {
int forReduce, List<InputAttemptIdentifier> remaining, InputAttemptIdentifier srcAttemptId) {
if (compressedLength < 0 || decompressedLength < 0) {
wrongLengthErrs.increment(1);
- LOG.warn(getName() + " invalid lengths in map output header: id: " +
+ LOG.warn(logIdentifier + " invalid lengths in map output header: id: " +
srcAttemptId + " len: " + compressedLength + ", decomp len: " +
decompressedLength);
return false;
@@ -594,7 +578,7 @@ class FetcherOrderedGrouped extends Thread {
// URI
if (forReduce != currentPartition) {
wrongReduceErrs.increment(1);
- LOG.warn(getName() + " data for the wrong partition map: " + srcAttemptId + " len: "
+ LOG.warn(logIdentifier + " data for the wrong partition map: " + srcAttemptId + " len: "
+ compressedLength + " decomp len: " + decompressedLength + " for partition " + forReduce
+ ", expected partition: " + currentPartition);
return false;
@@ -622,7 +606,6 @@ class FetcherOrderedGrouped extends Thread {
protected void setupLocalDiskFetch(MapHost host) throws InterruptedException {
// Get completed maps on 'host'
List<InputAttemptIdentifier> srcAttempts = scheduler.getMapsForHost(host);
- currentPartition = host.getPartitionId();
// Sanity check to catch hosts with only 'OBSOLETE' maps,
// especially at the tail of large jobs
@@ -708,7 +691,7 @@ class FetcherOrderedGrouped extends Thread {
protected MapOutput getMapOutputForDirectDiskFetch(InputAttemptIdentifier srcAttemptId,
Path filename, TezIndexRecord indexRecord)
throws IOException {
- return MapOutput.createLocalDiskMapOutput(srcAttemptId, merger, filename,
+ return MapOutput.createLocalDiskMapOutput(srcAttemptId, allocator, filename,
indexRecord.getStartOffset(), indexRecord.getPartLength(), true);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/70a465df/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
index 55c80aa..29bf799 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
@@ -50,7 +50,7 @@ class MapOutput {
private final long size;
private final boolean primaryMapOutput;
- private final MergeManager merger;
+ private final FetchedInputAllocatorOrderedGrouped callback;
// MEMORY
private final byte[] memory;
@@ -62,13 +62,13 @@ class MapOutput {
private final FileChunk outputPath;
private OutputStream disk;
- private MapOutput(Type type, InputAttemptIdentifier attemptIdentifier, MergeManager merger,
+ private MapOutput(Type type, InputAttemptIdentifier attemptIdentifier, FetchedInputAllocatorOrderedGrouped callback,
long size, Path outputPath, long offset, boolean primaryMapOutput,
FileSystem fs, Path tmpOutputPath) {
this.id = ID.incrementAndGet();
this.type = type;
this.attemptIdentifier = attemptIdentifier;
- this.merger = merger;
+ this.callback = callback;
this.primaryMapOutput = primaryMapOutput;
this.localFS = fs;
@@ -101,7 +101,7 @@ class MapOutput {
}
public static MapOutput createDiskMapOutput(InputAttemptIdentifier attemptIdentifier,
- MergeManager merger, long size, Configuration conf,
+ FetchedInputAllocatorOrderedGrouped callback, long size, Configuration conf,
int fetcher, boolean primaryMapOutput,
TezTaskOutputFiles mapOutputFile) throws
IOException {
@@ -113,7 +113,7 @@ class MapOutput {
Path tmpOuputPath = outputpath.suffix(String.valueOf(fetcher));
long offset = 0;
- MapOutput mapOutput = new MapOutput(Type.DISK, attemptIdentifier, merger, size, outputpath, offset,
+ MapOutput mapOutput = new MapOutput(Type.DISK, attemptIdentifier, callback, size, outputpath, offset,
primaryMapOutput, fs, tmpOuputPath);
mapOutput.disk = mapOutput.localFS.create(tmpOuputPath);
@@ -121,16 +121,16 @@ class MapOutput {
}
public static MapOutput createLocalDiskMapOutput(InputAttemptIdentifier attemptIdentifier,
- MergeManager merger, Path path, long offset,
+ FetchedInputAllocatorOrderedGrouped callback, Path path, long offset,
long size, boolean primaryMapOutput) {
- return new MapOutput(Type.DISK_DIRECT, attemptIdentifier, merger, size, path, offset,
+ return new MapOutput(Type.DISK_DIRECT, attemptIdentifier, callback, size, path, offset,
primaryMapOutput, null, null);
}
public static MapOutput createMemoryMapOutput(InputAttemptIdentifier attemptIdentifier,
- MergeManager merger, int size,
+ FetchedInputAllocatorOrderedGrouped callback, int size,
boolean primaryMapOutput) {
- return new MapOutput(Type.MEMORY, attemptIdentifier, merger, size, null, -1, primaryMapOutput,
+ return new MapOutput(Type.MEMORY, attemptIdentifier, callback, size, null, -1, primaryMapOutput,
null, null);
}
@@ -185,12 +185,12 @@ class MapOutput {
public void commit() throws IOException {
if (type == Type.MEMORY) {
- merger.closeInMemoryFile(this);
+ callback.closeInMemoryFile(this);
} else if (type == Type.DISK) {
localFS.rename(tmpOutputPath, outputPath.getPath());
- merger.closeOnDiskFile(outputPath);
+ callback.closeOnDiskFile(outputPath);
} else if (type == Type.DISK_DIRECT) {
- merger.closeOnDiskFile(outputPath);
+ callback.closeOnDiskFile(outputPath);
} else {
throw new IOException("Cannot commit MapOutput of type WAIT!");
}
@@ -198,7 +198,7 @@ class MapOutput {
public void abort() {
if (type == Type.MEMORY) {
- merger.unreserve(memory.length);
+ callback.unreserve(memory.length);
} else if (type == Type.DISK) {
try {
localFS.delete(tmpOutputPath, false);
http://git-wip-us.apache.org/repos/asf/tez/blob/70a465df/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
index 5a35f2f..0536bc0 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
@@ -74,7 +74,7 @@ import org.apache.tez.runtime.library.hadoop.compat.NullProgressable;
@InterfaceAudience.Private
@InterfaceStability.Unstable
@SuppressWarnings(value={"rawtypes"})
-public class MergeManager {
+public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
private static final Logger LOG = LoggerFactory.getLogger(MergeManager.class);
@@ -373,6 +373,7 @@ public class MergeManager {
final private MapOutput stallShuffle = MapOutput.createWaitMapOutput(null);
+ @Override
public synchronized MapOutput reserve(InputAttemptIdentifier srcAttemptIdentifier,
long requestedSize,
long compressedLength,
@@ -429,8 +430,9 @@ public class MergeManager {
return MapOutput.createMemoryMapOutput(srcAttemptIdentifier, this, (int)requestedSize,
primaryMapOutput);
}
-
- synchronized void unreserve(long size) {
+
+ @Override
+ public synchronized void unreserve(long size) {
commitMemory -= size;
usedMemory -= size;
if (LOG.isDebugEnabled()) {
@@ -440,6 +442,7 @@ public class MergeManager {
notifyAll();
}
+ @Override
public synchronized void closeInMemoryFile(MapOutput mapOutput) {
inMemoryMapOutputs.add(mapOutput);
LOG.info("closeInMemoryFile -> map-output of size: " + mapOutput.getSize()
@@ -483,7 +486,8 @@ public class MergeManager {
", inMemoryMergedMapOutputs.size() -> " +
inMemoryMergedMapOutputs.size());
}
-
+
+ @Override
public synchronized void closeOnDiskFile(FileChunk file) {
//including only path & offset for valdiations.
for (FileChunk fileChunk : onDiskMapOutputs) {
http://git-wip-us.apache.org/repos/asf/tez/blob/70a465df/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
index cb12a63..20e7f5b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
@@ -19,7 +19,6 @@ package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
-import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -27,7 +26,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import javax.crypto.SecretKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,15 +37,12 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.common.security.JobTokenSecretManager;
-import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.InputContext;
@@ -57,11 +52,8 @@ import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.apache.tez.runtime.library.common.combine.Combiner;
import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
import org.apache.tez.runtime.library.exceptions.InputAlreadyClosedException;
-import org.apache.tez.runtime.library.common.shuffle.HttpConnection.HttpConnectionParams;
-import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -83,22 +75,14 @@ public class Shuffle implements ExceptionReporter {
private final Configuration conf;
private final InputContext inputContext;
- private final ShuffleClientMetrics metrics;
-
private final ShuffleInputEventHandlerOrderedGrouped eventHandler;
private final ShuffleScheduler scheduler;
private final MergeManager merger;
- private final SecretKey jobTokenSecret;
- private final JobTokenSecretManager jobTokenSecretMgr;
private final CompressionCodec codec;
private final boolean ifileReadAhead;
private final int ifileReadAheadLength;
- private final int numFetchers;
- private final boolean localDiskFetchEnabled;
- private final String localHostname;
- private final int shufflePort;
-
+
private AtomicReference<Throwable> throwable = new AtomicReference<Throwable>();
private String throwingThreadName = null;
@@ -108,9 +92,6 @@ public class Shuffle implements ExceptionReporter {
private final String srcNameTrimmed;
- private final List<FetcherOrderedGrouped> fetchers;
- private final HttpConnectionParams httpConnectionParams;
-
private AtomicBoolean isShutDown = new AtomicBoolean(false);
private AtomicBoolean fetchersClosed = new AtomicBoolean(false);
private AtomicBoolean schedulerClosed = new AtomicBoolean(false);
@@ -124,19 +105,10 @@ public class Shuffle implements ExceptionReporter {
long initialMemoryAvailable) throws IOException {
this.inputContext = inputContext;
this.conf = conf;
- this.httpConnectionParams =
- ShuffleUtils.constructHttpShuffleConnectionParams(conf);
- this.metrics = new ShuffleClientMetrics(inputContext.getDAGName(),
- inputContext.getTaskVertexName(), inputContext.getTaskIndex(),
- this.conf, UserGroupInformation.getCurrentUser().getShortUserName());
-
+
this.srcNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName());
- this.jobTokenSecret = ShuffleUtils
- .getJobTokenSecretFromTokenBytes(inputContext
- .getServiceConsumerMetaData(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID));
- this.jobTokenSecretMgr = new JobTokenSecretManager(jobTokenSecret);
-
+
if (ConfigUtils.isIntermediateInputCompressed(conf)) {
Class<? extends CompressionCodec> codecClass =
ConfigUtils.getIntermediateInputCompressorClass(conf, DefaultCodec.class);
@@ -161,33 +133,14 @@ public class Shuffle implements ExceptionReporter {
LocalDirAllocator localDirAllocator =
new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
- this.localHostname = inputContext.getExecutionContext().getHostName();
- final ByteBuffer shuffleMetadata =
- inputContext.getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
- this.shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata);
-
// TODO TEZ Get rid of Map / Reduce references.
- TezCounter shuffledInputsCounter =
- inputContext.getCounters().findCounter(TaskCounter.NUM_SHUFFLED_INPUTS);
- TezCounter reduceShuffleBytes =
- inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES);
- TezCounter reduceDataSizeDecompressed = inputContext.getCounters().findCounter(
- TaskCounter.SHUFFLE_BYTES_DECOMPRESSED);
- TezCounter failedShuffleCounter =
- inputContext.getCounters().findCounter(TaskCounter.NUM_FAILED_SHUFFLE_INPUTS);
- TezCounter spilledRecordsCounter =
+ TezCounter spilledRecordsCounter =
inputContext.getCounters().findCounter(TaskCounter.SPILLED_RECORDS);
TezCounter reduceCombineInputCounter =
inputContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
TezCounter mergedMapOutputsCounter =
inputContext.getCounters().findCounter(TaskCounter.MERGED_MAP_OUTPUTS);
- TezCounter bytesShuffedToDisk = inputContext.getCounters().findCounter(
- TaskCounter.SHUFFLE_BYTES_TO_DISK);
- TezCounter bytesShuffedToDiskDirect = inputContext.getCounters().findCounter(
- TaskCounter.SHUFFLE_BYTES_DISK_DIRECT);
- TezCounter bytesShuffedToMem = inputContext.getCounters().findCounter(
- TaskCounter.SHUFFLE_BYTES_TO_MEM);
-
+
LOG.info("Shuffle assigned with " + numInputs + " inputs" + ", codec: "
+ (codec == null ? "None" : codec.getClass().getName()) +
"ifileReadAhead: " + ifileReadAhead);
@@ -195,36 +148,38 @@ public class Shuffle implements ExceptionReporter {
boolean sslShuffle = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT);
startTime = System.currentTimeMillis();
+ merger = new MergeManager(
+ this.conf,
+ localFS,
+ localDirAllocator,
+ inputContext,
+ combiner,
+ spilledRecordsCounter,
+ reduceCombineInputCounter,
+ mergedMapOutputsCounter,
+ this,
+ initialMemoryAvailable,
+ codec,
+ ifileReadAhead,
+ ifileReadAheadLength);
+
scheduler = new ShuffleScheduler(
this.inputContext,
this.conf,
numInputs,
this,
- shuffledInputsCounter,
- reduceShuffleBytes,
- reduceDataSizeDecompressed,
- failedShuffleCounter,
- bytesShuffedToDisk,
- bytesShuffedToDiskDirect,
- bytesShuffedToMem,
- startTime);
+ merger,
+ merger,
+ startTime,
+ codec,
+ ifileReadAhead,
+ ifileReadAheadLength,
+ srcNameTrimmed);
+
this.mergePhaseTime = inputContext.getCounters().findCounter(TaskCounter.MERGE_PHASE_TIME);
this.shufflePhaseTime = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_PHASE_TIME);
- merger = new MergeManager(
- this.conf,
- localFS,
- localDirAllocator,
- inputContext,
- combiner,
- spilledRecordsCounter,
- reduceCombineInputCounter,
- mergedMapOutputsCounter,
- this,
- initialMemoryAvailable,
- codec,
- ifileReadAhead,
- ifileReadAheadLength);
+
eventHandler= new ShuffleInputEventHandlerOrderedGrouped(
inputContext,
@@ -234,16 +189,6 @@ public class Shuffle implements ExceptionReporter {
ExecutorService rawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
.setDaemon(true).setNameFormat("ShuffleAndMergeRunner [" + srcNameTrimmed + "]").build());
- int configuredNumFetchers =
- conf.getInt(
- TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES,
- TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES_DEFAULT);
- numFetchers = Math.min(configuredNumFetchers, numInputs);
- LOG.info("Num fetchers being started: " + numFetchers);
- fetchers = Lists.newArrayListWithCapacity(numFetchers);
- localDiskFetchEnabled = conf.getBoolean(
- TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH,
- TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH_DEFAULT);
executor = MoreExecutors.listeningDecorator(rawExecutor);
runShuffleCallable = new RunShuffleCallable();
@@ -338,36 +283,16 @@ public class Shuffle implements ExceptionReporter {
@Override
protected TezRawKeyValueIterator callInternal() throws IOException, InterruptedException {
- synchronized (this) {
- synchronized (fetchers) {
- for (int i = 0; i < numFetchers; ++i) {
- if (!isShutDown.get()) {
- FetcherOrderedGrouped
- fetcher = new FetcherOrderedGrouped(httpConnectionParams, scheduler, merger,
- metrics, Shuffle.this, jobTokenSecretMgr, ifileReadAhead, ifileReadAheadLength,
- codec, inputContext, conf, localDiskFetchEnabled, localHostname, shufflePort);
- fetchers.add(fetcher);
- fetcher.start();
- }
- }
+ if (!isShutDown.get()) {
+ try {
+ scheduler.start();
+ } catch (Throwable e) {
+ throw new ShuffleError("Error during shuffle", e);
}
}
-
- while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
- synchronized (Shuffle.this) {
- if (throwable.get() != null) {
- throw new ShuffleError("error in shuffle in " + throwingThreadName,
- throwable.get());
- }
- }
- }
shufflePhaseTime.setValue(System.currentTimeMillis() - startTime);
- // Stop the map-output fetcher threads
- LOG.info("Cleaning up fetchers");
- cleanupFetchers(false);
-
// stop the scheduler
cleanupShuffleScheduler(false);
@@ -395,38 +320,6 @@ public class Shuffle implements ExceptionReporter {
return kvIter;
}
}
-
- private synchronized void cleanupFetchers(boolean ignoreErrors) throws InterruptedException {
- // Stop the fetcher threads
- InterruptedException ie = null;
- if (!fetchersClosed.getAndSet(true)) {
- synchronized (fetchers) {
- for (FetcherOrderedGrouped fetcher : fetchers) {
- try {
- fetcher.shutDown();
- LOG.info("Shutdown.." + fetcher.getName());
- } catch (InterruptedException e) {
- if (ignoreErrors) {
- LOG.info("Interrupted while shutting down fetchers. Ignoring.");
- } else {
- if (ie != null) {
- ie = e;
- } else {
- LOG.warn(
- "Ignoring exception while shutting down fetcher since a previous one was seen and will be thrown "
- + e);
- }
- }
- }
- }
- fetchers.clear();
- }
- // throw only the first exception while attempting to shutdown.
- if (ie != null) {
- throw ie;
- }
- }
- }
private void cleanupShuffleScheduler(boolean ignoreErrors) throws InterruptedException {
@@ -469,7 +362,6 @@ public class Shuffle implements ExceptionReporter {
private void cleanupIgnoreErrors() {
try {
- cleanupFetchers(true);
cleanupShuffleScheduler(true);
cleanupMerger(true);
} catch (Throwable t) {
http://git-wip-us.apache.org/repos/asf/tez/blob/70a465df/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index c54b005..85c3a30 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -17,10 +17,13 @@
*/
package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+import javax.crypto.SecretKey;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.BitSet;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -32,13 +35,29 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.ListMultimap;
-import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.tez.common.CallableWithNdc;
+import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.runtime.library.common.shuffle.HttpConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -59,32 +78,42 @@ import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput.Ty
import com.google.common.collect.Lists;
class ShuffleScheduler {
- static ThreadLocal<Long> shuffleStart = new ThreadLocal<Long>() {
- protected Long initialValue() {
- return 0L;
- }
- };
+
+ @VisibleForTesting
+ enum ShuffleErrors {
+ IO_ERROR,
+ WRONG_LENGTH,
+ BAD_ID,
+ WRONG_MAP,
+ CONNECTION,
+ WRONG_REDUCE
+ }
+ @VisibleForTesting
+ final static String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
+
+ private final AtomicLong shuffleStart = new AtomicLong(0);
private static final Logger LOG = LoggerFactory.getLogger(ShuffleScheduler.class);
private static final long INITIAL_PENALTY = 2000l; // 2 seconds
private static final float PENALTY_GROWTH_RATE = 1.3f;
- private boolean[] finishedMaps;
+ private final BitSet finishedMaps;
private final int numInputs;
- private int remainingMaps;
private int numFetchedSpills;
- private Map<String, MapHost> mapLocations = new HashMap<String, MapHost>();
+ @VisibleForTesting
+ final Map<String, MapHost> mapLocations = new HashMap<String, MapHost>();
//TODO Clean this and other maps at some point
- private ConcurrentMap<String, InputAttemptIdentifier> pathToIdentifierMap = new ConcurrentHashMap<String, InputAttemptIdentifier>();
+ private final ConcurrentMap<String, InputAttemptIdentifier> pathToIdentifierMap = new ConcurrentHashMap<String, InputAttemptIdentifier>();
//To track shuffleInfo events when finalMerge is disabled in source or pipelined shuffle is
// enabled in source.
@VisibleForTesting
final Map<InputIdentifier, ShuffleEventInfo> shuffleInfoEventsMap;
- private Set<MapHost> pendingHosts = new HashSet<MapHost>();
- private Set<InputAttemptIdentifier> obsoleteInputs = new HashSet<InputAttemptIdentifier>();
-
+ private final Set<MapHost> pendingHosts = new HashSet<MapHost>();
+ private final Set<InputAttemptIdentifier> obsoleteInputs = new HashSet<InputAttemptIdentifier>();
+
+ private final AtomicBoolean isShutdown = new AtomicBoolean(false);
private final Random random = new Random(System.currentTimeMillis());
private final DelayQueue<Penalty> penalties = new DelayQueue<Penalty>();
private final Referee referee;
@@ -93,7 +122,6 @@ class ShuffleScheduler {
private final Map<String,IntWritable> hostFailures =
new HashMap<String,IntWritable>();
private final InputContext inputContext;
- private final Shuffle shuffle;
private final TezCounter shuffledInputsCounter;
private final TezCounter skippedInputCounter;
private final TezCounter reduceShuffleBytes;
@@ -105,13 +133,42 @@ class ShuffleScheduler {
private final TezCounter firstEventReceived;
private final TezCounter lastEventReceived;
+ private final String srcNameTrimmed;
+ private final AtomicInteger remainingMaps;
private final long startTime;
private long lastProgressTime;
- private int maxTaskOutputAtOnce;
- private int maxFetchFailuresBeforeReporting;
- private boolean reportReadErrorImmediately = true;
- private int maxFailedUniqueFetches = 5;
+ private final int numFetchers;
+ private final Set<FetcherOrderedGrouped> runningFetchers =
+ Collections.newSetFromMap(new ConcurrentHashMap<FetcherOrderedGrouped, Boolean>());
+
+ private final ListeningExecutorService fetcherExecutor;
+
+ private final HttpConnection.HttpConnectionParams httpConnectionParams;
+ private final FetchedInputAllocatorOrderedGrouped allocator;
+ private final ShuffleClientMetrics shuffleMetrics;
+ private final Shuffle shuffle;
+ private final MergeManager mergeManager;
+ private final JobTokenSecretManager jobTokenSecretManager;
+ private final boolean ifileReadAhead;
+ private final int ifileReadAheadLength;
+ private final CompressionCodec codec;
+ private final Configuration conf;
+ private final boolean localDiskFetchEnabled;
+ private final String localHostname;
+ private final int shufflePort;
+
+ private final TezCounter ioErrsCounter;
+ private final TezCounter wrongLengthErrsCounter;
+ private final TezCounter badIdErrsCounter;
+ private final TezCounter wrongMapErrsCounter;
+ private final TezCounter connectionErrsCounter;
+ private final TezCounter wrongReduceErrsCounter;
+
+ private final int maxTaskOutputAtOnce;
+ private final int maxFetchFailuresBeforeReporting;
+ private final boolean reportReadErrorImmediately;
+ private final int maxFailedUniqueFetches;
private final int abortFailureLimit;
private int maxMapRuntime = 0;
@@ -122,32 +179,88 @@ class ShuffleScheduler {
Configuration conf,
int numberOfInputs,
Shuffle shuffle,
- TezCounter shuffledInputsCounter,
- TezCounter reduceShuffleBytes,
- TezCounter reduceBytesDecompressed,
- TezCounter failedShuffleCounter,
- TezCounter bytesShuffledToDisk,
- TezCounter bytesShuffledToDiskDirect,
- TezCounter bytesShuffledToMem, long startTime) {
+ MergeManager mergeManager,
+ FetchedInputAllocatorOrderedGrouped allocator,
+ long startTime,
+ CompressionCodec codec,
+ boolean ifileReadAhead,
+ int ifileReadAheadLength,
+ String srcNameTrimmed) throws IOException {
this.inputContext = inputContext;
+ this.conf = conf;
+ this.shuffle = shuffle;
+ this.allocator = allocator;
+ this.mergeManager = mergeManager;
this.numInputs = numberOfInputs;
abortFailureLimit = Math.max(30, numberOfInputs / 10);
- remainingMaps = numberOfInputs;
- finishedMaps = new boolean[remainingMaps]; // default init to false
+ remainingMaps = new AtomicInteger(numberOfInputs);
+ finishedMaps = new BitSet(numberOfInputs);
+ this.ifileReadAhead = ifileReadAhead;
+ this.ifileReadAheadLength = ifileReadAheadLength;
+ this.srcNameTrimmed = srcNameTrimmed;
+ this.codec = codec;
+ int configuredNumFetchers =
+ conf.getInt(
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES,
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES_DEFAULT);
+ numFetchers = Math.min(configuredNumFetchers, numInputs);
+ LOG.info("Num fetchers determined to be: " + numFetchers);
+
+ localDiskFetchEnabled = conf.getBoolean(
+ TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH,
+ TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH_DEFAULT);
+ this.localHostname = inputContext.getExecutionContext().getHostName();
+ final ByteBuffer shuffleMetadata =
+ inputContext.getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
+ this.shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata);
+
this.referee = new Referee();
- this.shuffle = shuffle;
- this.shuffledInputsCounter = shuffledInputsCounter;
- this.reduceShuffleBytes = reduceShuffleBytes;
- this.reduceBytesDecompressed = reduceBytesDecompressed;
- this.failedShuffleCounter = failedShuffleCounter;
- this.bytesShuffledToDisk = bytesShuffledToDisk;
- this.bytesShuffledToDiskDirect = bytesShuffledToDiskDirect;
- this.bytesShuffledToMem = bytesShuffledToMem;
+ // Counters used by the ShuffleScheduler
+ this.shuffledInputsCounter = inputContext.getCounters().findCounter(
+ TaskCounter.NUM_SHUFFLED_INPUTS);
+ this.reduceShuffleBytes = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES);
+ this.reduceBytesDecompressed = inputContext.getCounters().findCounter(
+ TaskCounter.SHUFFLE_BYTES_DECOMPRESSED);
+ this.failedShuffleCounter = inputContext.getCounters().findCounter(
+ TaskCounter.NUM_FAILED_SHUFFLE_INPUTS);
+ this.bytesShuffledToDisk = inputContext.getCounters().findCounter(
+ TaskCounter.SHUFFLE_BYTES_TO_DISK);
+ this.bytesShuffledToDiskDirect = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_DISK_DIRECT);
+ this.bytesShuffledToMem = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_MEM);
+
+ // Counters used by Fetchers
+ ioErrsCounter = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
+ ShuffleErrors.IO_ERROR.toString());
+ wrongLengthErrsCounter = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
+ ShuffleErrors.WRONG_LENGTH.toString());
+ badIdErrsCounter = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
+ ShuffleErrors.BAD_ID.toString());
+ wrongMapErrsCounter = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
+ ShuffleErrors.WRONG_MAP.toString());
+ connectionErrsCounter = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
+ ShuffleErrors.CONNECTION.toString());
+ wrongReduceErrsCounter = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
+ ShuffleErrors.WRONG_REDUCE.toString());
+
this.startTime = startTime;
this.lastProgressTime = startTime;
- this.maxFailedUniqueFetches = Math.min(numberOfInputs,
- this.maxFailedUniqueFetches);
+ this.httpConnectionParams =
+ ShuffleUtils.constructHttpShuffleConnectionParams(conf);
+ this.shuffleMetrics = new ShuffleClientMetrics(inputContext.getDAGName(),
+ inputContext.getTaskVertexName(), inputContext.getTaskIndex(),
+ this.conf, UserGroupInformation.getCurrentUser().getShortUserName());
+ SecretKey jobTokenSecret = ShuffleUtils
+ .getJobTokenSecretFromTokenBytes(inputContext
+ .getServiceConsumerMetaData(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID));
+ this.jobTokenSecretManager = new JobTokenSecretManager(jobTokenSecret);
+
+ ExecutorService fetcherRawExecutor = Executors.newFixedThreadPool(numFetchers,
+ new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("Fetcher [" + srcNameTrimmed + "] #%d").build());
+ this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor);
+
+ this.maxFailedUniqueFetches = Math.min(numberOfInputs, 5);
referee.start();
this.maxFetchFailuresBeforeReporting =
conf.getInt(
@@ -175,6 +288,30 @@ class ShuffleScheduler {
+ ", maxMapRuntime=" + maxMapRuntime);
}
+ public void start() throws Exception {
+ ShuffleSchedulerCallable schedulerCallable = new ShuffleSchedulerCallable();
+ schedulerCallable.call();
+ }
+
+ public void close() throws InterruptedException {
+ if (!isShutdown.getAndSet(true)) {
+
+ // Interrupt the waiting Scheduler thread.
+ synchronized (this) {
+ notifyAll();
+ }
+
+ // Interrupt the fetchers.
+ for (FetcherOrderedGrouped fetcher : runningFetchers) {
+ fetcher.shutDown();
+ }
+
+ // Kill the Referee thread.
+ referee.interrupt();
+ referee.join();
+ }
+ }
+
protected synchronized void updateEventReceivedTime() {
long relativeTime = System.currentTimeMillis() - startTime;
if (firstEventReceived.getValue() == 0) {
@@ -264,7 +401,7 @@ class ShuffleScheduler {
* we retrieve all spill details to claim success.
*/
if (!srcAttemptIdentifier.canRetrieveInputInChunks()) {
- remainingMaps = remainingMaps - 1;
+ remainingMaps.decrementAndGet();
setInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex());
numFetchedSpills++;
} else {
@@ -292,7 +429,7 @@ class ShuffleScheduler {
//check if we downloaded all spills pertaining to this InputAttemptIdentifier
if (eventInfo.isDone()) {
- remainingMaps = remainingMaps - 1;
+ remainingMaps.decrementAndGet();
setInputFinished(inputIdentifier.getInputIndex());
shuffleInfoEventsMap.remove(inputIdentifier);
if (LOG.isTraceEnabled()) {
@@ -306,9 +443,9 @@ class ShuffleScheduler {
}
}
- if (remainingMaps == 0) {
+ if (remainingMaps.get() == 0) {
+ notifyAll(); // Notify the getHost() method.
LOG.info("All inputs fetched for input vertex : " + inputContext.getSourceVertexName());
- notifyAll();
}
// update the status
@@ -365,11 +502,11 @@ class ShuffleScheduler {
private void logProgress() {
double mbs = (double) totalBytesShuffledTillNow / (1024 * 1024);
- int inputsDone = numInputs - remainingMaps;
+ int inputsDone = numInputs - remainingMaps.get();
long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000 + 1;
double transferRate = mbs / secsSinceStart;
- LOG.info("copy(" + inputsDone + " (spillsFetched=" + numFetchedSpills + ") of " + numInputs +
+ LOG.info("copy(" + inputsDone + " (spillsFetched=" + numFetchedSpills + ") of " + numInputs +
". Transfer rate (CumulativeDataFetched/TimeSinceInputStarted)) "
+ mbpsFormat.format(transferRate) + " MB/s)");
}
@@ -422,7 +559,7 @@ class ShuffleScheduler {
long delay = (long) (INITIAL_PENALTY *
Math.pow(PENALTY_GROWTH_RATE, failures));
- penalties.add(new Penalty(host, delay));
+ penalties.add(new Penalty(host, delay));
}
public void reportLocalError(IOException ioe) {
@@ -461,7 +598,7 @@ class ShuffleScheduler {
final float MAX_ALLOWED_STALL_TIME_PERCENT = 0.5f;
long totalFailures = failedShuffleCounter.getValue();
- int doneMaps = numInputs - remainingMaps;
+ int doneMaps = numInputs - remainingMaps.get();
boolean reducerHealthy =
(((float)totalFailures / (totalFailures + doneMaps))
@@ -559,28 +696,33 @@ class ShuffleScheduler {
}
public synchronized MapHost getHost() throws InterruptedException {
- while(pendingHosts.isEmpty()) {
- wait();
- }
-
+ while (pendingHosts.isEmpty() && remainingMaps.get() > 0) {
+ LOG.info("PendingHosts=" + pendingHosts);
+ wait();
+ }
+
+ if (!pendingHosts.isEmpty()) {
+
MapHost host = null;
Iterator<MapHost> iter = pendingHosts.iterator();
int numToPick = random.nextInt(pendingHosts.size());
- for (int i=0; i <= numToPick; ++i) {
+ for (int i = 0; i <= numToPick; ++i) {
host = iter.next();
}
-
- pendingHosts.remove(host);
+
+ pendingHosts.remove(host);
host.markBusy();
if (LOG.isDebugEnabled()) {
LOG.debug("Assigning " + host + " with " + host.getNumKnownMapOutputs() +
" to " + Thread.currentThread().getName());
}
shuffleStart.set(System.currentTimeMillis());
-
return host;
+ } else {
+ return null;
+ }
}
-
+
public InputAttemptIdentifier getIdentifierForFetchedOutput(
String path, int reduceId) {
return pathToIdentifierMap.get(getIdentifierFromPathAndReduceId(path, reduceId));
@@ -680,8 +822,8 @@ class ShuffleScheduler {
notifyAll();
}
}
- LOG.info(host + " freed by " + Thread.currentThread().getName() + " in " +
- (System.currentTimeMillis()-shuffleStart.get()) + "ms");
+ LOG.info(host + " freed by " + Thread.currentThread().getName() + " in " +
+ (System.currentTimeMillis() - shuffleStart.get()) + "ms");
}
public synchronized void resetKnownMaps() {
@@ -693,28 +835,13 @@ class ShuffleScheduler {
/**
* Utility method to check if the Shuffle data fetch is complete.
- * @return
+ * @return true if complete
*/
public synchronized boolean isDone() {
- return remainingMaps == 0;
+ return remainingMaps.get() == 0;
}
/**
- * Wait until the shuffle finishes or until the timeout.
- * @param millis maximum wait time
- * @return true if the shuffle is done
- * @throws InterruptedException
- */
- public synchronized boolean waitUntilDone(int millis
- ) throws InterruptedException {
- if (remainingMaps > 0) {
- wait(millis);
- return remainingMaps == 0;
- }
- return true;
- }
-
- /**
* A structure that records the penalty for a host.
*/
private static class Penalty implements Delayed {
@@ -754,7 +881,7 @@ class ShuffleScheduler {
public void run() {
try {
- while (true) {
+ while (!isShutdown.get()) {
// take the first host that has an expired penalty
MapHost host = penalties.take().host;
synchronized (ShuffleScheduler.this) {
@@ -767,7 +894,6 @@ class ShuffleScheduler {
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
// This handles shutdown of the entire fetch / merge process.
- return;
} catch (Throwable t) {
// Shuffle knows how to deal with failures post shutdown via the onFailure hook
shuffle.reportException(t);
@@ -775,11 +901,6 @@ class ShuffleScheduler {
}
}
- public void close() throws InterruptedException {
- referee.interrupt();
- referee.join();
- }
-
public synchronized void informMaxMapRunTime(int duration) {
if (duration > maxMapRuntime) {
maxMapRuntime = duration;
@@ -788,13 +909,154 @@ class ShuffleScheduler {
void setInputFinished(int inputIndex) {
synchronized(finishedMaps) {
- finishedMaps[inputIndex] = true;
+ finishedMaps.set(inputIndex, true);
}
}
boolean isInputFinished(int inputIndex) {
synchronized (finishedMaps) {
- return finishedMaps[inputIndex];
+ return finishedMaps.get(inputIndex);
+ }
+ }
+
+ private class ShuffleSchedulerCallable extends CallableWithNdc<Void> {
+
+
+ @Override
+ protected Void callInternal() throws InterruptedException {
+ outer:
+ while (!isShutdown.get() && remainingMaps.get() > 0) {
+ synchronized (ShuffleScheduler.this) {
+ if (runningFetchers.size() >= numFetchers || pendingHosts.isEmpty()) {
+ if (remainingMaps.get() > 0) {
+ try {
+ ShuffleScheduler.this.wait();
+ } catch (InterruptedException e) {
+ if (isShutdown.get()) {
+ LOG.info(
+ "Interrupted while waiting for fetchers to complete and hasBeenShutdown. Breaking out of ShuffleSchedulerCallable loop");
+ Thread.currentThread().interrupt();
+ break;
+ } else {
+ throw e;
+ }
+ }
+ }
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("NumCompletedInputs: {}" + (numInputs - remainingMaps.get()));
+ }
+
+ // Ensure there's memory available before scheduling the next Fetcher.
+ try {
+ // If merge is on, block
+ mergeManager.waitForInMemoryMerge();
+ // In case usedMemory > memorylimit, wait until some memory is released
+ mergeManager.waitForShuffleToMergeMemory();
+ } catch (InterruptedException e) {
+ if (isShutdown.get()) {
+ LOG.info(
+ "Interrupted while waiting for merge to complete and hasBeenShutdown. Breaking out of ShuffleSchedulerCallable loop");
+ Thread.currentThread().interrupt();
+ break;
+ } else {
+ throw e;
+ }
+ }
+
+ if (!isShutdown.get() && remainingMaps.get() > 0) {
+ synchronized (ShuffleScheduler.this) {
+ int numFetchersToRun = numFetchers - runningFetchers.size();
+ int count = 0;
+ while (count < numFetchersToRun && !isShutdown.get() && remainingMaps.get() > 0) {
+ MapHost mapHost;
+ try {
+ mapHost = getHost(); // Leads to a wait.
+ } catch (InterruptedException e) {
+ if (isShutdown.get()) {
+ LOG.info(
+ "Interrupted while waiting for host and hasBeenShutdown. Breaking out of ShuffleSchedulerCallable loop");
+ Thread.currentThread().interrupt();
+ break;
+ } else {
+ throw e;
+ }
+ }
+ if (mapHost == null) {
+ break; // Check for the exit condition.
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing pending host: " + mapHost.toString());
+ }
+ if (!isShutdown.get()) {
+ count++;
+ LOG.info("Scheduling fetch for inputHost: {}", mapHost.getIdentifier());
+ FetcherOrderedGrouped fetcherOrderedGrouped = constructFetcherForHost(mapHost);
+ runningFetchers.add(fetcherOrderedGrouped);
+ ListenableFuture<Void> future = fetcherExecutor.submit(fetcherOrderedGrouped);
+ Futures.addCallback(future, new FetchFutureCallback(fetcherOrderedGrouped));
+ }
+ }
+ }
+ }
+ }
+ LOG.info("Shutting down FetchScheduler for input: {}, wasInterrupted={}", srcNameTrimmed, Thread.currentThread().isInterrupted());
+ if (!fetcherExecutor.isShutdown()) {
+ fetcherExecutor.shutdownNow();
+ }
+ return null;
+ }
+ }
+
+ @VisibleForTesting
+ FetcherOrderedGrouped constructFetcherForHost(MapHost mapHost) {
+ return new FetcherOrderedGrouped(httpConnectionParams, ShuffleScheduler.this, allocator,
+ shuffleMetrics, shuffle, jobTokenSecretManager, ifileReadAhead, ifileReadAheadLength,
+ codec, conf, localDiskFetchEnabled, localHostname, shufflePort, srcNameTrimmed, mapHost,
+ ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter,
+ connectionErrsCounter, wrongReduceErrsCounter);
+ }
+
+ private class FetchFutureCallback implements FutureCallback<Void> {
+
+ private final FetcherOrderedGrouped fetcherOrderedGrouped;
+
+ public FetchFutureCallback(
+ FetcherOrderedGrouped fetcherOrderedGrouped) {
+ this.fetcherOrderedGrouped = fetcherOrderedGrouped;
+ }
+
+ private void doBookKeepingForFetcherComplete() {
+ synchronized (ShuffleScheduler.this) {
+ runningFetchers.remove(fetcherOrderedGrouped);
+ ShuffleScheduler.this.notifyAll();
+ }
+ }
+
+
+
+ @Override
+ public void onSuccess(Void result) {
+ fetcherOrderedGrouped.shutDown();
+ if (isShutdown.get()) {
+ LOG.info("Already shutdown. Ignoring fetch complete");
+ } else {
+ doBookKeepingForFetcherComplete();
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ fetcherOrderedGrouped.shutDown();
+ if (isShutdown.get()) {
+ LOG.info("Already shutdown. Ignoring fetch complete");
+ } else {
+ LOG.error("Fetcher failed with error", t);
+ shuffle.reportException(t);
+ doBookKeepingForFetcherComplete();
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/70a465df/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
index c33905f..f77e9a6 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
@@ -40,11 +40,12 @@ import java.io.DataInputStream;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
-import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
-import org.apache.hadoop.io.DataOutputBuffer;
+import com.google.common.collect.Lists;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.runtime.library.common.InputIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -73,8 +74,59 @@ public class TestFetcher {
public static final String HOST = "localhost";
public static final int PORT = 65;
+ private TezCounters tezCounters = new TezCounters();
+ private TezCounter ioErrsCounter = tezCounters.findCounter(ShuffleScheduler.SHUFFLE_ERR_GRP_NAME,
+ ShuffleScheduler.ShuffleErrors.IO_ERROR.toString());
+ private TezCounter wrongLengthErrsCounter =
+ tezCounters.findCounter(ShuffleScheduler.SHUFFLE_ERR_GRP_NAME,
+ ShuffleScheduler.ShuffleErrors.WRONG_LENGTH.toString());
+ private TezCounter badIdErrsCounter =
+ tezCounters.findCounter(ShuffleScheduler.SHUFFLE_ERR_GRP_NAME,
+ ShuffleScheduler.ShuffleErrors.BAD_ID.toString());
+ private TezCounter wrongMapErrsCounter =
+ tezCounters.findCounter(ShuffleScheduler.SHUFFLE_ERR_GRP_NAME,
+ ShuffleScheduler.ShuffleErrors.WRONG_MAP.toString());
+ private TezCounter connectionErrsCounter =
+ tezCounters.findCounter(ShuffleScheduler.SHUFFLE_ERR_GRP_NAME,
+ ShuffleScheduler.ShuffleErrors.CONNECTION.toString());
+ private TezCounter wrongReduceErrsCounter =
+ tezCounters.findCounter(ShuffleScheduler.SHUFFLE_ERR_GRP_NAME,
+ ShuffleScheduler.ShuffleErrors.WRONG_REDUCE.toString());
+
static final Logger LOG = LoggerFactory.getLogger(TestFetcher.class);
+ @Test (timeout = 5000)
+ public void testInputsReturnedOnConnectionException() throws Exception {
+ Configuration conf = new TezConfiguration();
+ ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
+ MergeManager merger = mock(MergeManager.class);
+
+ ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
+ Shuffle shuffle = mock(Shuffle.class);
+
+ InputContext inputContext = mock(InputContext.class);
+ doReturn(new TezCounters()).when(inputContext).getCounters();
+ doReturn("src vertex").when(inputContext).getSourceVertexName();
+
+ MapHost mapHost = new MapHost(0, HOST + ":" + PORT, "baseurl");
+ InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(new InputIdentifier(0), 0, "attempt");
+ mapHost.addKnownMap(inputAttemptIdentifier);
+ List<InputAttemptIdentifier> mapsForHost = Lists.newArrayList(inputAttemptIdentifier);
+ doReturn(mapsForHost).when(scheduler).getMapsForHost(mapHost);
+
+ FetcherOrderedGrouped fetcher =
+ new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
+ null, conf, false, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
+ wrongLengthErrsCounter, badIdErrsCounter,
+ wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter);
+
+ fetcher.call();
+ verify(scheduler).getMapsForHost(mapHost);
+ verify(scheduler).freeHost(mapHost);
+ verify(scheduler).putBackKnownMapOutput(mapHost, inputAttemptIdentifier);
+ }
+
+
@Test(timeout = 5000)
public void testLocalFetchModeSetting1() throws Exception {
Configuration conf = new TezConfiguration();
@@ -90,14 +142,15 @@ public class TestFetcher {
final boolean ENABLE_LOCAL_FETCH = true;
final boolean DISABLE_LOCAL_FETCH = false;
MapHost mapHost = new MapHost(0, HOST + ":" + PORT, "baseurl");
- FetcherOrderedGrouped
- fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null,
- false, 0, null, inputContext, conf, ENABLE_LOCAL_FETCH, HOST, PORT);
+ FetcherOrderedGrouped fetcher =
+ new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
+ null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
+ wrongLengthErrsCounter, badIdErrsCounter,
+ wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter);
// when local mode is enabled and host and port matches use local fetch
FetcherOrderedGrouped spyFetcher = spy(fetcher);
doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);
- doReturn(mapHost).when(scheduler).getHost();
spyFetcher.fetchNext();
@@ -105,10 +158,14 @@ public class TestFetcher {
verify(spyFetcher, never()).copyFromHost(any(MapHost.class));
// if hostname does not match use http
- spyFetcher = spy(fetcher);
mapHost = new MapHost(0, HOST + "_OTHER" + ":" + PORT, "baseurl");
+ fetcher =
+ new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
+ null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
+ wrongLengthErrsCounter, badIdErrsCounter,
+ wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter);
+ spyFetcher = spy(fetcher);
doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);
- doReturn(mapHost).when(scheduler).getHost();
spyFetcher.fetchNext();
@@ -116,10 +173,14 @@ public class TestFetcher {
verify(spyFetcher, times(1)).copyFromHost(mapHost);
// if port does not match use http
- spyFetcher = spy(fetcher);
mapHost = new MapHost(0, HOST + ":" + (PORT + 1), "baseurl");
+ fetcher =
+ new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
+ null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
+ wrongLengthErrsCounter, badIdErrsCounter,
+ wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter);
+ spyFetcher = spy(fetcher);
doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);
- doReturn(mapHost).when(scheduler).getHost();
spyFetcher.fetchNext();
@@ -128,11 +189,12 @@ public class TestFetcher {
//if local fetch is not enabled
mapHost = new MapHost(0, HOST + ":" + PORT, "baseurl");
- fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null,
- false, 0, null, inputContext, conf, DISABLE_LOCAL_FETCH, HOST, PORT);
+ fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
+ null, conf, DISABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
+ wrongLengthErrsCounter, badIdErrsCounter,
+ wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter);
spyFetcher = spy(fetcher);
doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);
- doReturn(mapHost).when(scheduler).getHost();
spyFetcher.fetchNext();
@@ -151,13 +213,14 @@ public class TestFetcher {
when(inputContext.getCounters()).thenReturn(new TezCounters());
when(inputContext.getSourceVertexName()).thenReturn("");
- FetcherOrderedGrouped
- fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null,
- false, 0, null, inputContext, conf, true, HOST, PORT);
- FetcherOrderedGrouped spyFetcher = spy(fetcher);
-
MapHost host = new MapHost(1, HOST + ":" + PORT,
"http://" + HOST + ":" + PORT + "/mapOutput?job=job_123&&reduce=1&map=");
+ FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
+ null, conf, true, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter,
+ wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter);
+ FetcherOrderedGrouped spyFetcher = spy(fetcher);
+
+
List<InputAttemptIdentifier> srcAttempts = Arrays.asList(
new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0"),
new InputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1"),
@@ -294,13 +357,14 @@ public class TestFetcher {
HttpConnection.HttpConnectionParams httpConnectionParams =
ShuffleUtils.constructHttpShuffleConnectionParams(conf);
- FetcherOrderedGrouped mockFetcher =
- new FetcherOrderedGrouped(httpConnectionParams, scheduler, merger, metrics, shuffle, null,
- false, 0, null, inputContext, conf, false, HOST, PORT);
- final FetcherOrderedGrouped fetcher = spy(mockFetcher);
-
final MapHost host = new MapHost(1, HOST + ":" + PORT,
"http://" + HOST + ":" + PORT + "/mapOutput?job=job_123&&reduce=1&map=");
+ FetcherOrderedGrouped mockFetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
+ null, conf, false, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter,
+ wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter);
+ final FetcherOrderedGrouped fetcher = spy(mockFetcher);
+
+
final List<InputAttemptIdentifier> srcAttempts = Arrays.asList(
new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0"),
new InputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1"),
http://git-wip-us.apache.org/repos/asf/tez/blob/70a465df/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
index eed9fd8..78d214c 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
@@ -1,18 +1,21 @@
package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
-import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtilsInternal;
-import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputFailedEvent;
+import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.InputIdentifier;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
@@ -20,6 +23,7 @@ import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.Collections;
import java.util.LinkedList;
@@ -27,7 +31,7 @@ import java.util.List;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -62,12 +66,20 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
private ShuffleScheduler realScheduler;
private MergeManager mergeManager;
- private InputContext createTezInputContext() {
+ private InputContext createTezInputContext() throws IOException {
ApplicationId applicationId = ApplicationId.newInstance(1, 1);
InputContext inputContext = mock(InputContext.class);
doReturn(applicationId).when(inputContext).getApplicationId();
doReturn("sourceVertex").when(inputContext).getSourceVertexName();
when(inputContext.getCounters()).thenReturn(new TezCounters());
+ ExecutionContext executionContext = new ExecutionContextImpl("localhost");
+ doReturn(executionContext).when(inputContext).getExecutionContext();
+ ByteBuffer shuffleBuffer = ByteBuffer.allocate(4).putInt(0, 4);
+ doReturn(shuffleBuffer).when(inputContext).getServiceProviderMetaData(anyString());
+ Token<JobTokenIdentifier> sessionToken = new Token<JobTokenIdentifier>(new JobTokenIdentifier(new Text("text")),
+ new JobTokenSecretManager());
+ ByteBuffer tokenBuffer = TezCommonUtils.serializeServiceData(sessionToken);
+ doReturn(tokenBuffer).when(inputContext).getServiceConsumerMetaData(anyString());
return inputContext;
}
@@ -129,33 +141,18 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
private void setupScheduler(int numInputs) throws Exception {
InputContext inputContext = createTezInputContext();
Configuration config = new Configuration();
- TezCounter shuffledInputsCounter =
- inputContext.getCounters().findCounter(TaskCounter.NUM_SHUFFLED_INPUTS);
- TezCounter reduceShuffleBytes =
- inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES);
- TezCounter reduceDataSizeDecompressed = inputContext.getCounters().findCounter(
- TaskCounter.SHUFFLE_BYTES_DECOMPRESSED);
- TezCounter failedShuffleCounter =
- inputContext.getCounters().findCounter(TaskCounter.NUM_FAILED_SHUFFLE_INPUTS);
- TezCounter bytesShuffedToDisk = inputContext.getCounters().findCounter(
- TaskCounter.SHUFFLE_BYTES_TO_DISK);
- TezCounter bytesShuffedToDiskDirect = inputContext.getCounters().findCounter(
- TaskCounter.SHUFFLE_BYTES_DISK_DIRECT);
- TezCounter bytesShuffedToMem = inputContext.getCounters().findCounter(
- TaskCounter.SHUFFLE_BYTES_TO_MEM);
realScheduler = new ShuffleScheduler(
inputContext,
config,
numInputs,
mock(Shuffle.class),
- shuffledInputsCounter,
- reduceShuffleBytes,
- reduceDataSizeDecompressed,
- failedShuffleCounter,
- bytesShuffedToDisk,
- bytesShuffedToDiskDirect,
- bytesShuffedToMem,
- System.currentTimeMillis());
+ mock(MergeManager.class),
+ mock(MergeManager.class),
+ System.currentTimeMillis(),
+ null,
+ false,
+ 0,
+ "src vertex");
scheduler = spy(realScheduler);
handler = new ShuffleInputEventHandlerOrderedGrouped(inputContext, scheduler, false);
mergeManager = mock(MergeManager.class);