You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/26 01:58:15 UTC

git commit: TEZ-502. Break reverse dependency between tez-runtime-internals and tez-runtime-library. (sseth)

Updated Branches:
  refs/heads/master d05a4968a -> 906be8f4b


TEZ-502. Break reverse dependency between tez-runtime-internals and
tez-runtime-library. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/906be8f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/906be8f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/906be8f4

Branch: refs/heads/master
Commit: 906be8f4b7c029f5b6880151540f36c1dabe53ee
Parents: d05a496
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Sep 25 16:57:56 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Sep 25 16:57:56 2013 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/mapred/YarnTezDagChild.java  | 17 ++++++++++++++---
 .../apache/tez/mapreduce/processor/MapUtils.java   | 10 +++++++++-
 .../processor/reduce/TestReduceProcessor.java      | 10 +++++++++-
 tez-runtime-internals/pom.xml                      |  4 ----
 .../tez/runtime/LogicalIOProcessorRuntimeTask.java | 10 ++--------
 5 files changed, 34 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/906be8f4/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index 4938d9e..3092837 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -25,12 +25,15 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -87,6 +90,7 @@ import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
 import org.apache.tez.runtime.common.objectregistry.ObjectRegistryModule;
 import org.apache.tez.runtime.library.common.security.JobTokenIdentifier;
 import org.apache.tez.runtime.library.common.security.TokenCache;
+import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
 
 import com.google.inject.Guice;
 import com.google.inject.Injector;
@@ -294,6 +298,11 @@ public class YarnTezDagChild {
     Token<JobTokenIdentifier> jobToken = TokenCache.getJobToken(credentials);
     SecurityUtil.setTokenService(jobToken, address);
     taskOwner.addToken(jobToken);
+    // Will jobToken change across DAGs ?
+    Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
+    serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID,
+        ShuffleUtils.convertJobTokenToBytes(jobToken));
+
     umbilical =
       taskOwner.doAs(new PrivilegedExceptionAction<TezTaskUmbilicalProtocol>() {
       @Override
@@ -395,8 +404,9 @@ public class YarnTezDagChild {
           }
           lastVertexId = newVertexId;
           updateLoggers(currentTaskAttemptID);
+          
           currentTask = createLogicalTask(attemptNumber, taskSpec,
-              defaultConf, tezUmbilical, jobToken);
+              defaultConf, tezUmbilical, serviceConsumerMetadata);
         } finally {
           taskLock.writeLock().unlock();
         }
@@ -489,7 +499,7 @@ public class YarnTezDagChild {
 
   private static LogicalIOProcessorRuntimeTask createLogicalTask(int attemptNum,
       TaskSpec taskSpec, Configuration conf, TezUmbilical tezUmbilical,
-      Token<JobTokenIdentifier> jobToken) throws IOException {
+      Map<String, ByteBuffer> serviceConsumerMetadata) throws IOException {
 
     // FIXME TODONEWTEZ
     conf.setBoolean("ipc.client.tcpnodelay", true);
@@ -515,8 +525,9 @@ public class YarnTezDagChild {
     String [] localDirs = StringUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS.name()));
     conf.setStrings(TezJobConfig.LOCAL_DIRS, localDirs);
     LOG.info("LocalDirs for child: " + Arrays.toString(localDirs));
+    
     return new LogicalIOProcessorRuntimeTask(taskSpec, attemptNum, conf,
-        tezUmbilical, jobToken);
+        tezUmbilical, serviceConsumerMetadata);
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/906be8f4/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
index 9590e72..128e6b1 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
@@ -19,7 +19,10 @@
 package org.apache.tez.mapreduce.processor;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 
 import org.apache.commons.logging.Log;
@@ -57,6 +60,7 @@ import org.apache.tez.runtime.api.impl.OutputSpec;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TezUmbilical;
 import org.apache.tez.runtime.library.common.security.JobTokenIdentifier;
+import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
 
 public class MapUtils {
 
@@ -204,12 +208,16 @@ public class MapUtils {
         inputSpecs,
         outputSpecs);
 
+    Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
+    serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID,
+        ShuffleUtils.convertJobTokenToBytes(shuffleToken));
+    
     LogicalIOProcessorRuntimeTask task = new LogicalIOProcessorRuntimeTask(
         taskSpec,
         0,
         jobConf,
         umbilical,
-        shuffleToken);
+        serviceConsumerMetadata);
     return task;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/906be8f4/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index d2c7952..3910b02 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -18,7 +18,10 @@
 package org.apache.tez.mapreduce.processor.reduce;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -58,6 +61,7 @@ import org.apache.tez.runtime.library.common.task.local.output.TezLocalTaskOutpu
 import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
 import org.apache.tez.runtime.library.input.LocalMergedInput;
 import org.apache.tez.runtime.library.output.LocalOnFileSorterOutput;
+import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -163,12 +167,16 @@ public class TestReduceProcessor {
         Collections.singletonList(reduceInputSpec),
         Collections.singletonList(reduceOutputSpec));
 
+    Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
+    serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID,
+        ShuffleUtils.convertJobTokenToBytes(shuffleToken));
+    
     LogicalIOProcessorRuntimeTask task = new LogicalIOProcessorRuntimeTask(
         taskSpec,
         0,
         reduceConf,
         new TestUmbilical(),
-        shuffleToken);
+        serviceConsumerMetadata);
     
     task.initialize();
     task.run();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/906be8f4/tez-runtime-internals/pom.xml
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/pom.xml b/tez-runtime-internals/pom.xml
index 4f64701..1a31562 100644
--- a/tez-runtime-internals/pom.xml
+++ b/tez-runtime-internals/pom.xml
@@ -38,10 +38,6 @@
       <artifactId>tez-api</artifactId>
     </dependency>
     <dependency>
-      <groupId>org.apache.tez</groupId>
-      <artifactId>tez-runtime-library</artifactId>
-    </dependency>
-    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-yarn-common</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/906be8f4/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 8aff6d1..5c8ec2b 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -23,7 +23,6 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -33,7 +32,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezUncheckedException;
@@ -58,8 +56,6 @@ import org.apache.tez.runtime.api.impl.TezOutputContextImpl;
 import org.apache.tez.runtime.api.impl.TezProcessorContextImpl;
 import org.apache.tez.runtime.api.impl.TezUmbilical;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
-import org.apache.tez.runtime.library.common.security.JobTokenIdentifier;
-import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -95,7 +91,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
 
   public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, int appAttemptNumber,
       Configuration tezConf, TezUmbilical tezUmbilical,
-      Token<JobTokenIdentifier> jobToken) throws IOException {
+      Map<String, ByteBuffer> serviceConsumerMetadata) throws IOException {
     // TODO Remove jobToken from here post TEZ-421
     super(taskSpec, tezConf, tezUmbilical);
     LOG.info("Initializing LogicalIOProcessorRuntimeTask with TaskSpec: "
@@ -108,9 +104,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     this.outputs = createOutputs(outputSpecs);
     this.processorDescriptor = taskSpec.getProcessorDescriptor();
     this.processor = createProcessor(processorDescriptor);
-    this.serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
-    this.serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID,
-        ShuffleUtils.convertJobTokenToBytes(jobToken));
+    this.serviceConsumerMetadata = serviceConsumerMetadata;
     this.eventsToBeProcessed = new LinkedBlockingQueue<TezEvent>();
     this.state = State.NEW;
     this.appAttemptNumber = appAttemptNumber;