You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/22 03:19:03 UTC
[13/50] [abbrv] tez git commit: TEZ-2388. Send dag identifier as part
of the fetcher request string. (sseth)
TEZ-2388. Send dag identifier as part of the fetcher request string. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/8ba7e44c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/8ba7e44c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/8ba7e44c
Branch: refs/heads/TEZ-2003
Commit: 8ba7e44c2db4fc6a8fa9c58e314d055fc8a63b2e
Parents: 96e48c5
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Apr 29 08:20:05 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Aug 21 18:13:54 2015 -0700
----------------------------------------------------------------------
TEZ-2003-CHANGES.txt | 1 +
.../tez/runtime/library/common/shuffle/Fetcher.java | 14 ++++++++------
.../runtime/library/common/shuffle/ShuffleUtils.java | 8 +++++---
.../library/common/shuffle/impl/ShuffleManager.java | 2 +-
.../ShuffleInputEventHandlerOrderedGrouped.java | 2 +-
.../runtime/library/common/shuffle/TestFetcher.java | 6 +++---
6 files changed, 19 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/8ba7e44c/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index d42aaf8..9fc9ed3 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -19,5 +19,6 @@ ALL CHANGES:
TEZ-2347. Expose additional information in TaskCommunicatorContext.
TEZ-2361. Propagate dag completion to TaskCommunicator.
TEZ-2381. Fixes after rebase 04/28.
+ TEZ-2388. Send dag identifier as part of the fetcher request string.
INCOMPATIBLE CHANGES:
http://git-wip-us.apache.org/repos/asf/tez/blob/8ba7e44c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
index e8389a6..8057be8 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
@@ -89,6 +89,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
private final FetcherCallback fetcherCallback;
private final FetchedInputAllocator inputManager;
private final ApplicationId appId;
+ private final int dagIdentifier;
private final String logIdentifier;
@@ -130,7 +131,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
private final boolean isDebugEnabled = LOG.isDebugEnabled();
private Fetcher(FetcherCallback fetcherCallback, HttpConnectionParams params,
- FetchedInputAllocator inputManager, ApplicationId appId,
+ FetchedInputAllocator inputManager, ApplicationId appId, int dagIdentifier,
JobTokenSecretManager jobTokenSecretManager, String srcNameTrimmed, Configuration conf,
RawLocalFileSystem localFs,
LocalDirAllocator localDirAllocator,
@@ -144,6 +145,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
this.inputManager = inputManager;
this.jobTokenSecretMgr = jobTokenSecretManager;
this.appId = appId;
+ this.dagIdentifier = dagIdentifier;
this.pathToAttemptMap = new HashMap<String, InputAttemptIdentifier>();
this.httpConnectionParams = params;
this.conf = conf;
@@ -413,7 +415,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
private HostFetchResult setupConnection(Collection<InputAttemptIdentifier> attempts) {
try {
StringBuilder baseURI = ShuffleUtils.constructBaseURIForShuffleHandler(host,
- port, partition, appId.toString(), httpConnectionParams.isSslShuffle());
+ port, partition, appId.toString(), dagIdentifier, httpConnectionParams.isSslShuffle());
this.url = ShuffleUtils.constructInputURL(baseURI.toString(), attempts,
httpConnectionParams.isKeepAlive());
@@ -926,22 +928,22 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
public FetcherBuilder(FetcherCallback fetcherCallback,
HttpConnectionParams params, FetchedInputAllocator inputManager,
- ApplicationId appId, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed,
+ ApplicationId appId, int dagIdentifier, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed,
Configuration conf, boolean localDiskFetchEnabled, String localHostname, int shufflePort,
boolean asyncHttp) {
- this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId,
+ this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, dagIdentifier,
jobTokenSecretMgr, srcNameTrimmed, conf, null, null, null, localDiskFetchEnabled,
false, localHostname, shufflePort, asyncHttp);
}
public FetcherBuilder(FetcherCallback fetcherCallback,
HttpConnectionParams params, FetchedInputAllocator inputManager,
- ApplicationId appId, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed,
+ ApplicationId appId, int dagIdentifier, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed,
Configuration conf, RawLocalFileSystem localFs,
LocalDirAllocator localDirAllocator, Path lockPath,
boolean localDiskFetchEnabled, boolean sharedFetchEnabled,
String localHostname, int shufflePort, boolean asyncHttp) {
- this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId,
+ this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, dagIdentifier,
jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator,
lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname, shufflePort, asyncHttp);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/8ba7e44c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index 1081587..c7cc907 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -184,19 +184,21 @@ public class ShuffleUtils {
// TODO NEWTEZ handle ssl shuffle
public static StringBuilder constructBaseURIForShuffleHandler(String host,
- int port, int partition, String appId, boolean sslShuffle) {
+ int port, int partition, String appId, int dagIdentifier, boolean sslShuffle) {
return constructBaseURIForShuffleHandler(host + ":" + String.valueOf(port),
- partition, appId, sslShuffle);
+ partition, appId, dagIdentifier, sslShuffle);
}
public static StringBuilder constructBaseURIForShuffleHandler(String hostIdentifier,
- int partition, String appId, boolean sslShuffle) {
+ int partition, String appId, int dagIdentifier, boolean sslShuffle) {
final String http_protocol = (sslShuffle) ? "https://" : "http://";
StringBuilder sb = new StringBuilder(http_protocol);
sb.append(hostIdentifier);
sb.append("/");
sb.append("mapOutput?job=");
sb.append(appId.replace("application", "job"));
+ sb.append("&dag=");
+ sb.append(String.valueOf(dagIdentifier));
sb.append("&reduce=");
sb.append(String.valueOf(partition));
sb.append("&map=");
http://git-wip-us.apache.org/repos/asf/tez/blob/8ba7e44c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index 10a0050..1977d5f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -402,7 +402,7 @@ public class ShuffleManager implements FetcherCallback {
}
FetcherBuilder fetcherBuilder = new FetcherBuilder(ShuffleManager.this,
- httpConnectionParams, inputManager, inputContext.getApplicationId(),
+ httpConnectionParams, inputManager, inputContext.getApplicationId(), inputContext.getDagIdentifier(),
jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator,
lockDisk, localDiskFetchEnabled, sharedFetchEnabled,
localhostName, shufflePort, asyncHttp);
http://git-wip-us.apache.org/repos/asf/tez/blob/8ba7e44c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
index 32ac766..9481e65 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
@@ -127,7 +127,7 @@ public class ShuffleInputEventHandlerOrderedGrouped {
@VisibleForTesting
URI getBaseURI(String host, int port, int partitionId) {
StringBuilder sb = ShuffleUtils.constructBaseURIForShuffleHandler(host, port,
- partitionId, inputContext.getApplicationId().toString(), sslShuffle);
+ partitionId, inputContext.getApplicationId().toString(), inputContext.getDagIdentifier(), sslShuffle);
URI u = URI.create(sb.toString());
return u;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/8ba7e44c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
index 7678b18..85e3540 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
@@ -75,7 +75,7 @@ public class TestFetcher {
final boolean DISABLE_LOCAL_FETCH = false;
Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
- ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST,
+ ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST,
PORT, false);
builder.assignWork(HOST, PORT, 0, Arrays.asList(srcAttempts));
Fetcher fetcher = spy(builder.build());
@@ -125,7 +125,7 @@ public class TestFetcher {
// When disabled use http fetch
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false);
builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
- ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, DISABLE_LOCAL_FETCH, HOST,
+ ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, DISABLE_LOCAL_FETCH, HOST,
PORT, false);
builder.assignWork(HOST, PORT, 0, Arrays.asList(srcAttempts));
fetcher = spy(builder.build());
@@ -159,7 +159,7 @@ public class TestFetcher {
int partition = 42;
FetcherCallback callback = mock(FetcherCallback.class);
Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(callback, null, null,
- ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, true, HOST, PORT, false);
+ ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, true, HOST, PORT, false);
builder.assignWork(HOST, PORT, partition, Arrays.asList(srcAttempts));
Fetcher fetcher = spy(builder.build());