You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/06 09:41:43 UTC
[41/50] [abbrv] tez git commit: TEZ-2388. Send dag identifier as part
of the fetcher request string. (sseth)
TEZ-2388. Send dag identifier as part of the fetcher request string. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e8791286
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e8791286
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e8791286
Branch: refs/heads/TEZ-2003
Commit: e8791286ab87cf797f79d2d90b49f206febf151a
Parents: 48d9842
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Apr 29 08:20:05 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed May 6 00:18:08 2015 -0700
----------------------------------------------------------------------
TEZ-2003-CHANGES.txt | 1 +
.../tez/runtime/library/common/shuffle/Fetcher.java | 14 ++++++++------
.../runtime/library/common/shuffle/ShuffleUtils.java | 8 +++++---
.../library/common/shuffle/impl/ShuffleManager.java | 2 +-
.../ShuffleInputEventHandlerOrderedGrouped.java | 2 +-
.../runtime/library/common/shuffle/TestFetcher.java | 6 +++---
6 files changed, 19 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/e8791286/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index d42aaf8..9fc9ed3 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -19,5 +19,6 @@ ALL CHANGES:
TEZ-2347. Expose additional information in TaskCommunicatorContext.
TEZ-2361. Propagate dag completion to TaskCommunicator.
TEZ-2381. Fixes after rebase 04/28.
+ TEZ-2388. Send dag identifier as part of the fetcher request string.
INCOMPATIBLE CHANGES:
http://git-wip-us.apache.org/repos/asf/tez/blob/e8791286/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
index 48fe0f2..a553210 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
@@ -87,6 +87,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
private final FetcherCallback fetcherCallback;
private final FetchedInputAllocator inputManager;
private final ApplicationId appId;
+ private final int dagIdentifier;
private final String logIdentifier;
@@ -125,7 +126,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
private final boolean isDebugEnabled = LOG.isDebugEnabled();
private Fetcher(FetcherCallback fetcherCallback, HttpConnectionParams params,
- FetchedInputAllocator inputManager, ApplicationId appId,
+ FetchedInputAllocator inputManager, ApplicationId appId, int dagIdentifier,
JobTokenSecretManager jobTokenSecretManager, String srcNameTrimmed, Configuration conf,
RawLocalFileSystem localFs,
LocalDirAllocator localDirAllocator,
@@ -137,6 +138,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
this.inputManager = inputManager;
this.jobTokenSecretMgr = jobTokenSecretManager;
this.appId = appId;
+ this.dagIdentifier = dagIdentifier;
this.pathToAttemptMap = new HashMap<String, InputAttemptIdentifier>();
this.httpConnectionParams = params;
this.conf = conf;
@@ -401,7 +403,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
private HostFetchResult setupConnection(List<InputAttemptIdentifier> attempts) {
try {
StringBuilder baseURI = ShuffleUtils.constructBaseURIForShuffleHandler(host,
- port, partition, appId.toString(), httpConnectionParams.isSSLShuffleEnabled());
+ port, partition, appId.toString(), dagIdentifier, httpConnectionParams.isSSLShuffleEnabled());
this.url = ShuffleUtils.constructInputURL(baseURI.toString(), attempts,
httpConnectionParams.getKeepAlive());
@@ -901,21 +903,21 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
public FetcherBuilder(FetcherCallback fetcherCallback,
HttpConnectionParams params, FetchedInputAllocator inputManager,
- ApplicationId appId, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed,
+ ApplicationId appId, int dagIdentifier, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed,
Configuration conf, boolean localDiskFetchEnabled, String localHostname) {
- this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId,
+ this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, dagIdentifier,
jobTokenSecretMgr, srcNameTrimmed, conf, null, null, null, localDiskFetchEnabled,
false, localHostname);
}
public FetcherBuilder(FetcherCallback fetcherCallback,
HttpConnectionParams params, FetchedInputAllocator inputManager,
- ApplicationId appId, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed,
+ ApplicationId appId, int dagIdentifier, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed,
Configuration conf, RawLocalFileSystem localFs,
LocalDirAllocator localDirAllocator, Path lockPath,
boolean localDiskFetchEnabled, boolean sharedFetchEnabled,
String localHostname) {
- this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId,
+ this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, dagIdentifier,
jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator,
lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/e8791286/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index 46489ed..d7cb7c1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -174,19 +174,21 @@ public class ShuffleUtils {
// TODO NEWTEZ handle ssl shuffle
public static StringBuilder constructBaseURIForShuffleHandler(String host,
- int port, int partition, String appId, boolean sslShuffle) {
+ int port, int partition, String appId, int dagIdentifier, boolean sslShuffle) {
return constructBaseURIForShuffleHandler(host + ":" + String.valueOf(port),
- partition, appId, sslShuffle);
+ partition, appId, dagIdentifier, sslShuffle);
}
public static StringBuilder constructBaseURIForShuffleHandler(String hostIdentifier,
- int partition, String appId, boolean sslShuffle) {
+ int partition, String appId, int dagIdentifier, boolean sslShuffle) {
final String http_protocol = (sslShuffle) ? "https://" : "http://";
StringBuilder sb = new StringBuilder(http_protocol);
sb.append(hostIdentifier);
sb.append("/");
sb.append("mapOutput?job=");
sb.append(appId.replace("application", "job"));
+ sb.append("&dag=");
+ sb.append(String.valueOf(dagIdentifier));
sb.append("&reduce=");
sb.append(String.valueOf(partition));
sb.append("&map=");
http://git-wip-us.apache.org/repos/asf/tez/blob/e8791286/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index d47e652..a8d3553 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -387,7 +387,7 @@ public class ShuffleManager implements FetcherCallback {
}
FetcherBuilder fetcherBuilder = new FetcherBuilder(ShuffleManager.this,
- httpConnectionParams, inputManager, inputContext.getApplicationId(),
+ httpConnectionParams, inputManager, inputContext.getApplicationId(), inputContext.getDagIdentifier(),
jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator,
lockDisk, localDiskFetchEnabled, sharedFetchEnabled,
inputContext.getExecutionContext().getHostName());
http://git-wip-us.apache.org/repos/asf/tez/blob/e8791286/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
index 32ac766..9481e65 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
@@ -127,7 +127,7 @@ public class ShuffleInputEventHandlerOrderedGrouped {
@VisibleForTesting
URI getBaseURI(String host, int port, int partitionId) {
StringBuilder sb = ShuffleUtils.constructBaseURIForShuffleHandler(host, port,
- partitionId, inputContext.getApplicationId().toString(), sslShuffle);
+ partitionId, inputContext.getApplicationId().toString(), inputContext.getDagIdentifier(), sslShuffle);
URI u = URI.create(sb.toString());
return u;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/e8791286/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
index e6f0c4a..081efb2 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
@@ -64,7 +64,7 @@ public class TestFetcher {
FetcherCallback fetcherCallback = mock(FetcherCallback.class);
Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
- ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, true, HOST);
+ ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, true, HOST);
builder.assignWork(HOST, PORT, 0, Arrays.asList(srcAttempts));
Fetcher fetcher = spy(builder.build());
@@ -82,7 +82,7 @@ public class TestFetcher {
// When disabled use http fetch
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, "false");
builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
- ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, false, HOST);
+ ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, false, HOST);
builder.assignWork(HOST, PORT, 0, Arrays.asList(srcAttempts));
fetcher = spy(builder.build());
@@ -115,7 +115,7 @@ public class TestFetcher {
int partition = 42;
FetcherCallback callback = mock(FetcherCallback.class);
Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(callback, null, null,
- ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, true, HOST);
+ ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, true, HOST);
builder.assignWork(HOST, PORT, partition, Arrays.asList(srcAttempts));
Fetcher fetcher = spy(builder.build());