You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/09 03:42:40 UTC
[02/43] tez git commit: TEZ-2366. Pig tez MiniTezCluster unit tests
fail intermittently after TEZ-2333 (pramachandran)
TEZ-2366. Pig tez MiniTezCluster unit tests fail intermittently after TEZ-2333 (pramachandran)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6a04fa48
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6a04fa48
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6a04fa48
Branch: refs/heads/TEZ-2003
Commit: 6a04fa48cb1113faf640115dcbba9b2270e756f3
Parents: 12ef073
Author: Prakash Ramachandran <pr...@hortonworks.com>
Authored: Wed May 6 19:11:06 2015 +0530
Committer: Prakash Ramachandran <pr...@hortonworks.com>
Committed: Wed May 6 19:11:06 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../runtime/library/common/shuffle/Fetcher.java | 17 ++---
.../common/shuffle/impl/ShuffleManager.java | 12 +++-
.../orderedgrouped/FetcherOrderedGrouped.java | 67 ++++++++++--------
.../common/shuffle/orderedgrouped/Shuffle.java | 11 ++-
.../library/common/shuffle/TestFetcher.java | 49 +++++++++++--
.../impl/TestShuffleInputEventHandlerImpl.java | 14 +++-
.../shuffle/orderedgrouped/TestFetcher.java | 74 +++++++++++++++++++-
8 files changed, 191 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/6a04fa48/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f060a8c..91dd9c4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -15,6 +15,7 @@ INCOMPATIBLE CHANGES
TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly
ALL CHANGES:
+ TEZ-2366. Pig tez MiniTezCluster unit tests fail intermittently after TEZ-2333
TEZ-2406. Tez UI: Display per-io counter columns in task and attempt pages under vertex
TEZ-2384. Add warning message in the case of prewarn under non-session mode.
TEZ-2415. PMC RDF needs to use asfext:pmc, not asfext:PMC.
http://git-wip-us.apache.org/repos/asf/tez/blob/6a04fa48/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
index 48fe0f2..61e0151 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
@@ -33,7 +33,6 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -48,7 +47,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.common.security.JobTokenSecretManager;
@@ -75,6 +73,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
private static final AtomicInteger fetcherIdGen = new AtomicInteger(0);
private final Configuration conf;
+ private final int shufflePort;
// Configurable fields.
private CompressionCodec codec;
@@ -132,7 +131,8 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
Path lockPath,
boolean localDiskFetchEnabled,
boolean sharedFetchEnabled,
- String localHostname) {
+ String localHostname,
+ int shufflePort) {
this.fetcherCallback = fetcherCallback;
this.inputManager = inputManager;
this.jobTokenSecretMgr = jobTokenSecretManager;
@@ -151,6 +151,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
this.localDirAllocator = localDirAllocator;
this.lockPath = lockPath;
this.localHostname = localHostname;
+ this.shufflePort = shufflePort;
try {
if (this.sharedFetchEnabled) {
@@ -186,7 +187,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
HostFetchResult hostFetchResult;
- if (localDiskFetchEnabled && host.equals(localHostname)) {
+ if (localDiskFetchEnabled && host.equals(localHostname) && port == shufflePort) {
hostFetchResult = setupLocalDiskFetch();
} else if (multiplex) {
hostFetchResult = doSharedFetch();
@@ -902,10 +903,10 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
public FetcherBuilder(FetcherCallback fetcherCallback,
HttpConnectionParams params, FetchedInputAllocator inputManager,
ApplicationId appId, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed,
- Configuration conf, boolean localDiskFetchEnabled, String localHostname) {
+ Configuration conf, boolean localDiskFetchEnabled, String localHostname, int shufflePort) {
this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId,
jobTokenSecretMgr, srcNameTrimmed, conf, null, null, null, localDiskFetchEnabled,
- false, localHostname);
+ false, localHostname, shufflePort);
}
public FetcherBuilder(FetcherCallback fetcherCallback,
@@ -914,10 +915,10 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
Configuration conf, RawLocalFileSystem localFs,
LocalDirAllocator localDirAllocator, Path lockPath,
boolean localDiskFetchEnabled, boolean sharedFetchEnabled,
- String localHostname) {
+ String localHostname, int shufflePort) {
this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId,
jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator,
- lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname);
+ lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname, shufflePort);
}
public FetcherBuilder setHttpConnectionParameters(HttpConnectionParams httpParams) {
http://git-wip-us.apache.org/repos/asf/tez/blob/6a04fa48/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index d47e652..ac7caca 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -21,6 +21,7 @@ package org.apache.tez.runtime.library.common.shuffle.impl;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
import java.text.DecimalFormat;
import java.util.Arrays;
import java.util.BitSet;
@@ -161,7 +162,8 @@ public class ShuffleManager implements FetcherCallback {
private final LocalDirAllocator localDirAllocator;
private final RawLocalFileSystem localFs;
private final Path[] localDisks;
- private final static String localhostName = NetUtils.getHostname();
+ private final String localhostName;
+ private final int shufflePort;
private final TezCounter shufflePhaseTime;
private final TezCounter firstEventReceived;
@@ -216,7 +218,7 @@ public class ShuffleManager implements FetcherCallback {
int maxConfiguredFetchers =
conf.getInt(
- TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES,
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES_DEFAULT);
this.numFetchers = Math.min(maxConfiguredFetchers, numInputs);
@@ -249,6 +251,10 @@ public class ShuffleManager implements FetcherCallback {
this.localDisks = Iterables.toArray(
localDirAllocator.getAllLocalPathsToRead(".", conf), Path.class);
+ this.localhostName = inputContext.getExecutionContext().getHostName();
+ final ByteBuffer shuffleMetaData =
+ inputContext.getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
+ this.shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetaData);
Arrays.sort(this.localDisks);
@@ -390,7 +396,7 @@ public class ShuffleManager implements FetcherCallback {
httpConnectionParams, inputManager, inputContext.getApplicationId(),
jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator,
lockDisk, localDiskFetchEnabled, sharedFetchEnabled,
- inputContext.getExecutionContext().getHostName());
+ localhostName, shufflePort);
if (codec != null) {
fetcherBuilder.setCompressionParameters(codec);
http://git-wip-us.apache.org/repos/asf/tez/blob/6a04fa48/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index fbaabff..60f1c98 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@ -56,7 +56,7 @@ class FetcherOrderedGrouped extends Thread {
private final Configuration conf;
private final boolean localDiskFetchEnabled;
- private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
+ private enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
CONNECTION, WRONG_REDUCE}
private final static String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
@@ -72,7 +72,7 @@ class FetcherOrderedGrouped extends Thread {
private final Shuffle shuffle;
private final int id;
private final String logIdentifier;
- private final String localHostname;
+ private final String localShuffleHostPort;
private static int nextId = 0;
private int currentPartition = -1;
@@ -104,7 +104,8 @@ class FetcherOrderedGrouped extends Thread {
CompressionCodec codec,
InputContext inputContext, Configuration conf,
boolean localDiskFetchEnabled,
- String localHostname) throws IOException {
+ String localHostname,
+ int shufflePort) throws IOException {
setDaemon(true);
this.scheduler = scheduler;
this.merger = merger;
@@ -134,7 +135,7 @@ class FetcherOrderedGrouped extends Thread {
this.codec = null;
}
this.conf = conf;
- this.localHostname = localHostname;
+ this.localShuffleHostPort = localHostname + ":" + String.valueOf(shufflePort);
this.localDiskFetchEnabled = localDiskFetchEnabled;
@@ -144,37 +145,41 @@ class FetcherOrderedGrouped extends Thread {
setDaemon(true);
}
- public void run() {
+ @VisibleForTesting
+ protected void fetchNext() throws InterruptedException, IOException {
+ MapHost host = null;
try {
- while (!stopped && !Thread.currentThread().isInterrupted()) {
- remaining = null; // Safety.
- MapHost host = null;
- try {
- // If merge is on, block
- merger.waitForInMemoryMerge();
+ // If merge is on, block
+ merger.waitForInMemoryMerge();
- // In case usedMemory > memorylimit, wait until some memory is released
- merger.waitForShuffleToMergeMemory();
+ // In case usedMemory > memorylimit, wait until some memory is released
+ merger.waitForShuffleToMergeMemory();
- // Get a host to shuffle from
- host = scheduler.getHost();
- metrics.threadBusy();
+ // Get a host to shuffle from
+ host = scheduler.getHost();
+ metrics.threadBusy();
- String hostPort = host.getHostIdentifier();
- String hostname = hostPort.substring(0, hostPort.indexOf(":"));
- if (localDiskFetchEnabled && hostname.equals(localHostname)) {
- setupLocalDiskFetch(host);
- } else {
- // Shuffle
- copyFromHost(host);
- }
- } finally {
- cleanupCurrentConnection(false);
- if (host != null) {
- scheduler.freeHost(host);
- metrics.threadFree();
- }
- }
+ String hostPort = host.getHostIdentifier();
+ if (localDiskFetchEnabled && hostPort.equals(localShuffleHostPort)) {
+ setupLocalDiskFetch(host);
+ } else {
+ // Shuffle
+ copyFromHost(host);
+ }
+ } finally {
+ cleanupCurrentConnection(false);
+ if (host != null) {
+ scheduler.freeHost(host);
+ metrics.threadFree();
+ }
+ }
+ }
+
+ public void run() {
+ try {
+ while (!stopped && !Thread.currentThread().isInterrupted()) {
+ remaining = null; // Safety.
+ fetchNext();
}
} catch (InterruptedException ie) {
//TODO: might not be respected when fetcher is in progress / server is busy. TEZ-711
http://git-wip-us.apache.org/repos/asf/tez/blob/6a04fa48/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
index 442f032..ee05378 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
@@ -19,6 +19,7 @@ package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -95,6 +96,8 @@ public class Shuffle implements ExceptionReporter {
private final int ifileReadAheadLength;
private final int numFetchers;
private final boolean localDiskFetchEnabled;
+ private final String localHostname;
+ private final int shufflePort;
private AtomicReference<Throwable> throwable = new AtomicReference<Throwable>();
private String throwingThreadName = null;
@@ -158,6 +161,11 @@ public class Shuffle implements ExceptionReporter {
LocalDirAllocator localDirAllocator =
new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
+ this.localHostname = inputContext.getExecutionContext().getHostName();
+ final ByteBuffer shuffleMetadata =
+ inputContext.getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
+ this.shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata);
+
// TODO TEZ Get rid of Map / Reduce references.
TezCounter shuffledInputsCounter =
inputContext.getCounters().findCounter(TaskCounter.NUM_SHUFFLED_INPUTS);
@@ -336,8 +344,7 @@ public class Shuffle implements ExceptionReporter {
FetcherOrderedGrouped
fetcher = new FetcherOrderedGrouped(httpConnectionParams, scheduler, merger,
metrics, Shuffle.this, jobTokenSecretMgr, ifileReadAhead, ifileReadAheadLength,
- codec, inputContext, conf, localDiskFetchEnabled,
- inputContext.getExecutionContext().getHostName());
+ codec, inputContext, conf, localDiskFetchEnabled, localHostname, shufflePort);
fetchers.add(fetcher);
fetcher.start();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/6a04fa48/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
index e6f0c4a..4ef187d 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
@@ -31,6 +31,7 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
@@ -38,8 +39,11 @@ import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.runtime.api.ExecutionContext;
+import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
@@ -52,19 +56,21 @@ import org.mockito.stubbing.Answer;
public class TestFetcher {
private static final String SHUFFLE_INPUT_FILE_PREFIX = "shuffle_input_file_";
private static String HOST = "localhost";
- private static int PORT = 0;
+ private static int PORT = 41;
@Test(timeout = 3000)
public void testLocalFetchModeSetting() throws Exception {
TezConfiguration conf = new TezConfiguration();
- conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, "true");
+ conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
InputAttemptIdentifier[] srcAttempts = {
new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1")
};
FetcherCallback fetcherCallback = mock(FetcherCallback.class);
+ final boolean ENABLE_LOCAL_FETCH = true;
+ final boolean DISABLE_LOCAL_FETCH = false;
Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
- ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, true, HOST);
+ ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST, PORT);
builder.assignWork(HOST, PORT, 0, Arrays.asList(srcAttempts));
Fetcher fetcher = spy(builder.build());
@@ -79,10 +85,41 @@ public class TestFetcher {
verify(fetcher).setupLocalDiskFetch();
verify(fetcher, never()).doHttpFetch();
+ // when enabled and hostname does not match use http fetch.
+ builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
+ ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST,
+ PORT);
+ builder.assignWork(HOST + "_OTHER", PORT, 0, Arrays.asList(srcAttempts));
+ fetcher = spy(builder.build());
+
+ doReturn(null).when(fetcher).setupLocalDiskFetch();
+ doReturn(hfr).when(fetcher).doHttpFetch();
+ doNothing().when(fetcher).shutdown();
+
+ fetcher.call();
+
+ verify(fetcher, never()).setupLocalDiskFetch();
+ verify(fetcher).doHttpFetch();
+
+ // when enabled and port does not match use http fetch.
+ builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
+ ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST, PORT);
+ builder.assignWork(HOST, PORT + 1, 0, Arrays.asList(srcAttempts));
+ fetcher = spy(builder.build());
+
+ doReturn(null).when(fetcher).setupLocalDiskFetch();
+ doReturn(hfr).when(fetcher).doHttpFetch();
+ doNothing().when(fetcher).shutdown();
+
+ fetcher.call();
+
+ verify(fetcher, never()).setupLocalDiskFetch();
+ verify(fetcher).doHttpFetch();
+
// When disabled use http fetch
- conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, "false");
+ conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false);
builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
- ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, false, HOST);
+ ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, DISABLE_LOCAL_FETCH, HOST, PORT);
builder.assignWork(HOST, PORT, 0, Arrays.asList(srcAttempts));
fetcher = spy(builder.build());
@@ -115,7 +152,7 @@ public class TestFetcher {
int partition = 42;
FetcherCallback callback = mock(FetcherCallback.class);
Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(callback, null, null,
- ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, true, HOST);
+ ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, true, HOST, PORT);
builder.assignWork(HOST, PORT, partition, Arrays.asList(srcAttempts));
Fetcher fetcher = spy(builder.build());
http://git-wip-us.apache.org/repos/asf/tez/blob/6a04fa48/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
index 44122a2..c452898 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
@@ -49,11 +49,13 @@ import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.InputIdentifier;
import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
import org.junit.Test;
@@ -156,10 +158,20 @@ public class TestShuffleInputEventHandlerImpl {
}
- private InputContext createInputContext() {
+ private InputContext createInputContext() throws IOException {
+ DataOutputBuffer port_dob = new DataOutputBuffer();
+ port_dob.writeInt(PORT);
+ final ByteBuffer shuffleMetaData = ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength());
+
+ ExecutionContext executionContext = mock(ExecutionContext.class);
+ doReturn(HOST).when(executionContext).getHostName();
+
InputContext inputContext = mock(InputContext.class);
doReturn(new TezCounters()).when(inputContext).getCounters();
doReturn("sourceVertex").when(inputContext).getSourceVertexName();
+ doReturn(shuffleMetaData).when(inputContext)
+ .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
+ doReturn(executionContext).when(inputContext).getExecutionContext();
return inputContext;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/6a04fa48/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
index 2e826d8..c33905f 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
@@ -29,6 +29,7 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -39,9 +40,11 @@ import java.io.DataInputStream;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
+import org.apache.hadoop.io.DataOutputBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -68,11 +71,76 @@ public class TestFetcher {
public static final String SHUFFLE_INPUT_FILE_PREFIX = "shuffle_input_file_";
public static final String HOST = "localhost";
- public static final int PORT = 0;
+ public static final int PORT = 65;
static final Logger LOG = LoggerFactory.getLogger(TestFetcher.class);
@Test(timeout = 5000)
+ public void testLocalFetchModeSetting1() throws Exception {
+ Configuration conf = new TezConfiguration();
+ ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
+ MergeManager merger = mock(MergeManager.class);
+ ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
+ Shuffle shuffle = mock(Shuffle.class);
+
+ InputContext inputContext = mock(InputContext.class);
+ doReturn(new TezCounters()).when(inputContext).getCounters();
+ doReturn("src vertex").when(inputContext).getSourceVertexName();
+
+ final boolean ENABLE_LOCAL_FETCH = true;
+ final boolean DISABLE_LOCAL_FETCH = false;
+ MapHost mapHost = new MapHost(0, HOST + ":" + PORT, "baseurl");
+ FetcherOrderedGrouped
+ fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null,
+ false, 0, null, inputContext, conf, ENABLE_LOCAL_FETCH, HOST, PORT);
+
+ // when local mode is enabled and host and port matches use local fetch
+ FetcherOrderedGrouped spyFetcher = spy(fetcher);
+ doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);
+ doReturn(mapHost).when(scheduler).getHost();
+
+ spyFetcher.fetchNext();
+
+ verify(spyFetcher, times(1)).setupLocalDiskFetch(mapHost);
+ verify(spyFetcher, never()).copyFromHost(any(MapHost.class));
+
+ // if hostname does not match use http
+ spyFetcher = spy(fetcher);
+ mapHost = new MapHost(0, HOST + "_OTHER" + ":" + PORT, "baseurl");
+ doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);
+ doReturn(mapHost).when(scheduler).getHost();
+
+ spyFetcher.fetchNext();
+
+ verify(spyFetcher, never()).setupLocalDiskFetch(any(MapHost.class));
+ verify(spyFetcher, times(1)).copyFromHost(mapHost);
+
+ // if port does not match use http
+ spyFetcher = spy(fetcher);
+ mapHost = new MapHost(0, HOST + ":" + (PORT + 1), "baseurl");
+ doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);
+ doReturn(mapHost).when(scheduler).getHost();
+
+ spyFetcher.fetchNext();
+
+ verify(spyFetcher, never()).setupLocalDiskFetch(any(MapHost.class));
+ verify(spyFetcher, times(1)).copyFromHost(mapHost);
+
+ //if local fetch is not enabled
+ mapHost = new MapHost(0, HOST + ":" + PORT, "baseurl");
+ fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null,
+ false, 0, null, inputContext, conf, DISABLE_LOCAL_FETCH, HOST, PORT);
+ spyFetcher = spy(fetcher);
+ doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);
+ doReturn(mapHost).when(scheduler).getHost();
+
+ spyFetcher.fetchNext();
+
+ verify(spyFetcher, never()).setupLocalDiskFetch(any(MapHost.class));
+ verify(spyFetcher, times(1)).copyFromHost(mapHost);
+ }
+
+ @Test(timeout = 5000)
public void testSetupLocalDiskFetch() throws Exception {
Configuration conf = new TezConfiguration();
ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
@@ -85,7 +153,7 @@ public class TestFetcher {
FetcherOrderedGrouped
fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null,
- false, 0, null, inputContext, conf, true, HOST);
+ false, 0, null, inputContext, conf, true, HOST, PORT);
FetcherOrderedGrouped spyFetcher = spy(fetcher);
MapHost host = new MapHost(1, HOST + ":" + PORT,
@@ -228,7 +296,7 @@ public class TestFetcher {
ShuffleUtils.constructHttpShuffleConnectionParams(conf);
FetcherOrderedGrouped mockFetcher =
new FetcherOrderedGrouped(httpConnectionParams, scheduler, merger, metrics, shuffle, null,
- false, 0, null, inputContext, conf, false, HOST);
+ false, 0, null, inputContext, conf, false, HOST, PORT);
final FetcherOrderedGrouped fetcher = spy(mockFetcher);
final MapHost host = new MapHost(1, HOST + ":" + PORT,