You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/09 03:43:19 UTC
[41/43] tez git commit: TEZ-2388. Send dag identifier as part of the
fetcher request string. (sseth)
TEZ-2388. Send dag identifier as part of the fetcher request string. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/fdb91771
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/fdb91771
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/fdb91771
Branch: refs/heads/TEZ-2003
Commit: fdb91771f0235295ef078871501c1d7d81b63ba8
Parents: 9e098f7
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Apr 29 08:20:05 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri May 8 14:44:34 2015 -0700
----------------------------------------------------------------------
TEZ-2003-CHANGES.txt | 1 +
.../tez/runtime/library/common/shuffle/Fetcher.java | 14 ++++++++------
.../runtime/library/common/shuffle/ShuffleUtils.java | 8 +++++---
.../library/common/shuffle/impl/ShuffleManager.java | 2 +-
.../ShuffleInputEventHandlerOrderedGrouped.java | 2 +-
.../runtime/library/common/shuffle/TestFetcher.java | 6 +++---
6 files changed, 19 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/fdb91771/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index d42aaf8..9fc9ed3 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -19,5 +19,6 @@ ALL CHANGES:
TEZ-2347. Expose additional information in TaskCommunicatorContext.
TEZ-2361. Propagate dag completion to TaskCommunicator.
TEZ-2381. Fixes after rebase 04/28.
+ TEZ-2388. Send dag identifier as part of the fetcher request string.
INCOMPATIBLE CHANGES:
http://git-wip-us.apache.org/repos/asf/tez/blob/fdb91771/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
index 61e0151..9fd46a4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
@@ -86,6 +86,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
private final FetcherCallback fetcherCallback;
private final FetchedInputAllocator inputManager;
private final ApplicationId appId;
+ private final int dagIdentifier;
private final String logIdentifier;
@@ -124,7 +125,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
private final boolean isDebugEnabled = LOG.isDebugEnabled();
private Fetcher(FetcherCallback fetcherCallback, HttpConnectionParams params,
- FetchedInputAllocator inputManager, ApplicationId appId,
+ FetchedInputAllocator inputManager, ApplicationId appId, int dagIdentifier,
JobTokenSecretManager jobTokenSecretManager, String srcNameTrimmed, Configuration conf,
RawLocalFileSystem localFs,
LocalDirAllocator localDirAllocator,
@@ -137,6 +138,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
this.inputManager = inputManager;
this.jobTokenSecretMgr = jobTokenSecretManager;
this.appId = appId;
+ this.dagIdentifier = dagIdentifier;
this.pathToAttemptMap = new HashMap<String, InputAttemptIdentifier>();
this.httpConnectionParams = params;
this.conf = conf;
@@ -402,7 +404,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
private HostFetchResult setupConnection(List<InputAttemptIdentifier> attempts) {
try {
StringBuilder baseURI = ShuffleUtils.constructBaseURIForShuffleHandler(host,
- port, partition, appId.toString(), httpConnectionParams.isSSLShuffleEnabled());
+ port, partition, appId.toString(), dagIdentifier, httpConnectionParams.isSSLShuffleEnabled());
this.url = ShuffleUtils.constructInputURL(baseURI.toString(), attempts,
httpConnectionParams.getKeepAlive());
@@ -902,21 +904,21 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
public FetcherBuilder(FetcherCallback fetcherCallback,
HttpConnectionParams params, FetchedInputAllocator inputManager,
- ApplicationId appId, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed,
+ ApplicationId appId, int dagIdentifier, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed,
Configuration conf, boolean localDiskFetchEnabled, String localHostname, int shufflePort) {
- this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId,
+ this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, dagIdentifier,
jobTokenSecretMgr, srcNameTrimmed, conf, null, null, null, localDiskFetchEnabled,
false, localHostname, shufflePort);
}
public FetcherBuilder(FetcherCallback fetcherCallback,
HttpConnectionParams params, FetchedInputAllocator inputManager,
- ApplicationId appId, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed,
+ ApplicationId appId, int dagIdentifier, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed,
Configuration conf, RawLocalFileSystem localFs,
LocalDirAllocator localDirAllocator, Path lockPath,
boolean localDiskFetchEnabled, boolean sharedFetchEnabled,
String localHostname, int shufflePort) {
- this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId,
+ this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, dagIdentifier,
jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator,
lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname, shufflePort);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/fdb91771/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index 46489ed..d7cb7c1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -174,19 +174,21 @@ public class ShuffleUtils {
// TODO NEWTEZ handle ssl shuffle
public static StringBuilder constructBaseURIForShuffleHandler(String host,
- int port, int partition, String appId, boolean sslShuffle) {
+ int port, int partition, String appId, int dagIdentifier, boolean sslShuffle) {
return constructBaseURIForShuffleHandler(host + ":" + String.valueOf(port),
- partition, appId, sslShuffle);
+ partition, appId, dagIdentifier, sslShuffle);
}
public static StringBuilder constructBaseURIForShuffleHandler(String hostIdentifier,
- int partition, String appId, boolean sslShuffle) {
+ int partition, String appId, int dagIdentifier, boolean sslShuffle) {
final String http_protocol = (sslShuffle) ? "https://" : "http://";
StringBuilder sb = new StringBuilder(http_protocol);
sb.append(hostIdentifier);
sb.append("/");
sb.append("mapOutput?job=");
sb.append(appId.replace("application", "job"));
+ sb.append("&dag=");
+ sb.append(String.valueOf(dagIdentifier));
sb.append("&reduce=");
sb.append(String.valueOf(partition));
sb.append("&map=");
http://git-wip-us.apache.org/repos/asf/tez/blob/fdb91771/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index ac7caca..437de76 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -393,7 +393,7 @@ public class ShuffleManager implements FetcherCallback {
}
FetcherBuilder fetcherBuilder = new FetcherBuilder(ShuffleManager.this,
- httpConnectionParams, inputManager, inputContext.getApplicationId(),
+ httpConnectionParams, inputManager, inputContext.getApplicationId(), inputContext.getDagIdentifier(),
jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator,
lockDisk, localDiskFetchEnabled, sharedFetchEnabled,
localhostName, shufflePort);
http://git-wip-us.apache.org/repos/asf/tez/blob/fdb91771/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
index 32ac766..9481e65 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
@@ -127,7 +127,7 @@ public class ShuffleInputEventHandlerOrderedGrouped {
@VisibleForTesting
URI getBaseURI(String host, int port, int partitionId) {
StringBuilder sb = ShuffleUtils.constructBaseURIForShuffleHandler(host, port,
- partitionId, inputContext.getApplicationId().toString(), sslShuffle);
+ partitionId, inputContext.getApplicationId().toString(), inputContext.getDagIdentifier(), sslShuffle);
URI u = URI.create(sb.toString());
return u;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/fdb91771/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
index 4ef187d..d2b0bde 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
@@ -70,7 +70,7 @@ public class TestFetcher {
final boolean DISABLE_LOCAL_FETCH = false;
Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
- ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST, PORT);
+ ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST, PORT);
builder.assignWork(HOST, PORT, 0, Arrays.asList(srcAttempts));
Fetcher fetcher = spy(builder.build());
@@ -119,7 +119,7 @@ public class TestFetcher {
// When disabled use http fetch
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false);
builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
- ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, DISABLE_LOCAL_FETCH, HOST, PORT);
+ ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, DISABLE_LOCAL_FETCH, HOST, PORT);
builder.assignWork(HOST, PORT, 0, Arrays.asList(srcAttempts));
fetcher = spy(builder.build());
@@ -152,7 +152,7 @@ public class TestFetcher {
int partition = 42;
FetcherCallback callback = mock(FetcherCallback.class);
Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(callback, null, null,
- ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, true, HOST, PORT);
+ ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, true, HOST, PORT);
builder.assignWork(HOST, PORT, partition, Arrays.asList(srcAttempts));
Fetcher fetcher = spy(builder.build());