You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/06 09:41:43 UTC

[41/50] [abbrv] tez git commit: TEZ-2388. Send dag identifier as part of the fetcher request string. (sseth)

TEZ-2388. Send dag identifier as part of the fetcher request string. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e8791286
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e8791286
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e8791286

Branch: refs/heads/TEZ-2003
Commit: e8791286ab87cf797f79d2d90b49f206febf151a
Parents: 48d9842
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Apr 29 08:20:05 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed May 6 00:18:08 2015 -0700

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                                  |  1 +
 .../tez/runtime/library/common/shuffle/Fetcher.java   | 14 ++++++++------
 .../runtime/library/common/shuffle/ShuffleUtils.java  |  8 +++++---
 .../library/common/shuffle/impl/ShuffleManager.java   |  2 +-
 .../ShuffleInputEventHandlerOrderedGrouped.java       |  2 +-
 .../runtime/library/common/shuffle/TestFetcher.java   |  6 +++---
 6 files changed, 19 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/e8791286/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index d42aaf8..9fc9ed3 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -19,5 +19,6 @@ ALL CHANGES:
   TEZ-2347. Expose additional information in TaskCommunicatorContext.
   TEZ-2361. Propagate dag completion to TaskCommunicator.
   TEZ-2381. Fixes after rebase 04/28.
+  TEZ-2388. Send dag identifier as part of the fetcher request string.
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/e8791286/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
index 48fe0f2..a553210 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
@@ -87,6 +87,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
   private final FetcherCallback fetcherCallback;
   private final FetchedInputAllocator inputManager;
   private final ApplicationId appId;
+  private final int dagIdentifier;
   
   private final String logIdentifier;
 
@@ -125,7 +126,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
   private final boolean isDebugEnabled = LOG.isDebugEnabled();
 
   private Fetcher(FetcherCallback fetcherCallback, HttpConnectionParams params,
-      FetchedInputAllocator inputManager, ApplicationId appId,
+      FetchedInputAllocator inputManager, ApplicationId appId, int dagIdentifier,
       JobTokenSecretManager jobTokenSecretManager, String srcNameTrimmed, Configuration conf,
       RawLocalFileSystem localFs,
       LocalDirAllocator localDirAllocator,
@@ -137,6 +138,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
     this.inputManager = inputManager;
     this.jobTokenSecretMgr = jobTokenSecretManager;
     this.appId = appId;
+    this.dagIdentifier = dagIdentifier;
     this.pathToAttemptMap = new HashMap<String, InputAttemptIdentifier>();
     this.httpConnectionParams = params;
     this.conf = conf;
@@ -401,7 +403,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
   private HostFetchResult setupConnection(List<InputAttemptIdentifier> attempts) {
     try {
       StringBuilder baseURI = ShuffleUtils.constructBaseURIForShuffleHandler(host,
-          port, partition, appId.toString(), httpConnectionParams.isSSLShuffleEnabled());
+          port, partition, appId.toString(), dagIdentifier, httpConnectionParams.isSSLShuffleEnabled());
       this.url = ShuffleUtils.constructInputURL(baseURI.toString(), attempts,
           httpConnectionParams.getKeepAlive());
 
@@ -901,21 +903,21 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
 
     public FetcherBuilder(FetcherCallback fetcherCallback,
         HttpConnectionParams params, FetchedInputAllocator inputManager,
-        ApplicationId appId, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed,
+        ApplicationId appId, int dagIdentifier, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed,
         Configuration conf, boolean localDiskFetchEnabled, String localHostname) {
-      this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId,
+      this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, dagIdentifier,
           jobTokenSecretMgr, srcNameTrimmed, conf, null, null, null, localDiskFetchEnabled,
           false, localHostname);
     }
 
     public FetcherBuilder(FetcherCallback fetcherCallback,
         HttpConnectionParams params, FetchedInputAllocator inputManager,
-        ApplicationId appId, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed,
+        ApplicationId appId, int dagIdentifier, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed,
         Configuration conf, RawLocalFileSystem localFs,
         LocalDirAllocator localDirAllocator, Path lockPath,
         boolean localDiskFetchEnabled, boolean sharedFetchEnabled,
         String localHostname) {
-      this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId,
+      this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, dagIdentifier,
           jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator,
           lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname);
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/e8791286/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index 46489ed..d7cb7c1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -174,19 +174,21 @@ public class ShuffleUtils {
 
   // TODO NEWTEZ handle ssl shuffle
   public static StringBuilder constructBaseURIForShuffleHandler(String host,
-      int port, int partition, String appId, boolean sslShuffle) {
+      int port, int partition, String appId, int dagIdentifier, boolean sslShuffle) {
     return constructBaseURIForShuffleHandler(host + ":" + String.valueOf(port),
-      partition, appId, sslShuffle);
+      partition, appId, dagIdentifier, sslShuffle);
   }
   
   public static StringBuilder constructBaseURIForShuffleHandler(String hostIdentifier,
-      int partition, String appId, boolean sslShuffle) {
+      int partition, String appId, int dagIdentifier, boolean sslShuffle) {
     final String http_protocol = (sslShuffle) ? "https://" : "http://";
     StringBuilder sb = new StringBuilder(http_protocol);
     sb.append(hostIdentifier);
     sb.append("/");
     sb.append("mapOutput?job=");
     sb.append(appId.replace("application", "job"));
+    sb.append("&dag=");
+    sb.append(String.valueOf(dagIdentifier));
     sb.append("&reduce=");
     sb.append(String.valueOf(partition));
     sb.append("&map=");

http://git-wip-us.apache.org/repos/asf/tez/blob/e8791286/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index d47e652..a8d3553 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -387,7 +387,7 @@ public class ShuffleManager implements FetcherCallback {
     }
 
     FetcherBuilder fetcherBuilder = new FetcherBuilder(ShuffleManager.this,
-      httpConnectionParams, inputManager, inputContext.getApplicationId(),
+      httpConnectionParams, inputManager, inputContext.getApplicationId(), inputContext.getDagIdentifier(),
         jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator,
         lockDisk, localDiskFetchEnabled, sharedFetchEnabled,
         inputContext.getExecutionContext().getHostName());

http://git-wip-us.apache.org/repos/asf/tez/blob/e8791286/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
index 32ac766..9481e65 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
@@ -127,7 +127,7 @@ public class ShuffleInputEventHandlerOrderedGrouped {
   @VisibleForTesting
   URI getBaseURI(String host, int port, int partitionId) {
     StringBuilder sb = ShuffleUtils.constructBaseURIForShuffleHandler(host, port,
-      partitionId, inputContext.getApplicationId().toString(), sslShuffle);
+      partitionId, inputContext.getApplicationId().toString(), inputContext.getDagIdentifier(), sslShuffle);
     URI u = URI.create(sb.toString());
     return u;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/e8791286/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
index e6f0c4a..081efb2 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
@@ -64,7 +64,7 @@ public class TestFetcher {
     FetcherCallback fetcherCallback = mock(FetcherCallback.class);
 
     Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
-        ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, true, HOST);
+        ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, true, HOST);
     builder.assignWork(HOST, PORT, 0, Arrays.asList(srcAttempts));
     Fetcher fetcher = spy(builder.build());
 
@@ -82,7 +82,7 @@ public class TestFetcher {
     // When disabled use http fetch
     conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, "false");
     builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
-        ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, false, HOST);
+        ApplicationId.newInstance(0, 1), 1,  null, "fetcherTest", conf, false, HOST);
     builder.assignWork(HOST, PORT, 0, Arrays.asList(srcAttempts));
     fetcher = spy(builder.build());
 
@@ -115,7 +115,7 @@ public class TestFetcher {
     int partition = 42;
     FetcherCallback callback = mock(FetcherCallback.class);
     Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(callback, null, null,
-        ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, true, HOST);
+        ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, true, HOST);
     builder.assignWork(HOST, PORT, partition, Arrays.asList(srcAttempts));
     Fetcher fetcher = spy(builder.build());