You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/27 00:45:34 UTC

[1/2] tez git commit: TEZ-2748. Fix master build against hadoop-2.2. (sseth)

Repository: tez
Updated Branches:
  refs/heads/master 9a3d8898b -> 6098f1bb9


TEZ-2748. Fix master build against hadoop-2.2. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/de751ec3
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/de751ec3
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/de751ec3

Branch: refs/heads/master
Commit: de751ec3da9d29fafefa78039450cfffc4fd4d80
Parents: 9a3d889
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Aug 26 15:44:10 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Aug 26 15:44:10 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../tez/shufflehandler/ShuffleHandler.java      | 107 ++++---------------
 tez-tools/analyzers/pom.xml                     |  26 ++++-
 3 files changed, 42 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/de751ec3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 42e7290..f91df1b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -17,6 +17,7 @@ INCOMPATIBLE CHANGES
   TEZ-2468. Change the minimum Java version to Java 7.
 
 ALL CHANGES:
+  TEZ-2748. Fix master build against hadoop-2.2. 
   TEZ-2743. Fix TezContainerLauncher logging tokens.
   TEZ-2708. Rename classes and variables post TEZ-2003 changes.
   TEZ-2740. Create a reconfigureVertex alias for deprecated

http://git-wip-us.apache.org/repos/asf/tez/blob/de751ec3/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java
index 046ce18..ebaf9fe 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java
@@ -54,17 +54,13 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.ReadaheadPool;
 import org.apache.hadoop.io.SecureIOUtils;
-import org.apache.hadoop.mapred.FadvisedChunkedFile;
-import org.apache.hadoop.mapred.FadvisedFileRegion;
-import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.mapreduce.hadoop.MRConfig;
 import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
 import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
 import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
@@ -79,6 +75,7 @@ import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.ChannelStateEvent;
 import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.DefaultFileRegion;
 import org.jboss.netty.channel.ExceptionEvent;
 import org.jboss.netty.channel.MessageEvent;
 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
@@ -95,7 +92,6 @@ import org.jboss.netty.handler.codec.http.HttpResponse;
 import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
 import org.jboss.netty.handler.codec.http.HttpResponseStatus;
 import org.jboss.netty.handler.codec.http.QueryStringDecoder;
-import org.jboss.netty.handler.ssl.SslHandler;
 import org.jboss.netty.handler.stream.ChunkedWriteHandler;
 import org.jboss.netty.util.CharsetUtil;
 import org.slf4j.Logger;
@@ -107,12 +103,6 @@ public class ShuffleHandler {
 
   public static final String SHUFFLE_HANDLER_LOCAL_DIRS = "tez.shuffle.handler.local-dirs";
 
-  public static final String SHUFFLE_MANAGE_OS_CACHE = "mapreduce.shuffle.manage.os.cache";
-  public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true;
-
-  public static final String SHUFFLE_READAHEAD_BYTES = "mapreduce.shuffle.readahead.bytes";
-  public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024;
-  
   // pattern to identify errors related to the client closing the socket early
   // idea borrowed from Netty SslHandler
   private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
@@ -123,7 +113,6 @@ public class ShuffleHandler {
   private final ChannelFactory selector;
   private final ChannelGroup accepted = new DefaultChannelGroup();
   protected HttpPipelineFactory pipelineFact;
-  private final int sslFileBufferSize;
   private final Configuration conf;
 
   private final ConcurrentMap<String, Boolean> registeredApps = new ConcurrentHashMap<String, Boolean>();
@@ -132,20 +121,11 @@ public class ShuffleHandler {
    * Should the shuffle use posix_fadvise calls to manage the OS cache during
    * sendfile
    */
-  private final boolean manageOsCache;
-  private final int readaheadLength;
   private final int maxShuffleConnections;
-  private final int shuffleBufferSize;
-  private final boolean shuffleTransferToAllowed;
-  private final ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
 
   private Map<String,String> userRsrc;
   private JobTokenSecretManager secretManager;
 
-  // TODO Fix this for tez.
-  public static final String MAPREDUCE_SHUFFLE_SERVICEID =
-      "mapreduce_shuffle";
-
   public static final String SHUFFLE_PORT_CONFIG_KEY = "tez.shuffle.port";
   public static final int DEFAULT_SHUFFLE_PORT = 15551;
 
@@ -165,11 +145,6 @@ public class ShuffleHandler {
 
   public static final String CONNECTION_CLOSE = "close";
 
-  public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY =
-    "mapreduce.shuffle.ssl.file.buffer.size";
-
-  public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024;
-
   public static final String MAX_SHUFFLE_CONNECTIONS = "mapreduce.shuffle.max.connections";
   public static final int DEFAULT_MAX_SHUFFLE_CONNECTIONS = 0; // 0 implies no limit
   
@@ -177,16 +152,6 @@ public class ShuffleHandler {
   // 0 implies Netty default of 2 * number of available processors
   public static final int DEFAULT_MAX_SHUFFLE_THREADS = 0;
   
-  public static final String SHUFFLE_BUFFER_SIZE = 
-      "mapreduce.shuffle.transfer.buffer.size";
-  public static final int DEFAULT_SHUFFLE_BUFFER_SIZE = 128 * 1024;
-  
-  public static final String  SHUFFLE_TRANSFERTO_ALLOWED = 
-      "mapreduce.shuffle.transferTo.allowed";
-  public static final boolean DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED = true;
-  public static final boolean WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED = 
-      false;
-
   final boolean connectionKeepAliveEnabled;
   final int connectionKeepAliveTimeOut;
   final int mapOutputMetaInfoCacheSize;
@@ -197,11 +162,6 @@ public class ShuffleHandler {
 
   public ShuffleHandler(Configuration conf) {
     this.conf = conf;
-    manageOsCache = conf.getBoolean(SHUFFLE_MANAGE_OS_CACHE,
-        DEFAULT_SHUFFLE_MANAGE_OS_CACHE);
-
-    readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES,
-        DEFAULT_SHUFFLE_READAHEAD_BYTES);
 
     maxShuffleConnections = conf.getInt(MAX_SHUFFLE_CONNECTIONS,
         DEFAULT_MAX_SHUFFLE_CONNECTIONS);
@@ -211,13 +171,6 @@ public class ShuffleHandler {
       maxShuffleThreads = 2 * Runtime.getRuntime().availableProcessors();
     }
 
-    shuffleBufferSize = conf.getInt(SHUFFLE_BUFFER_SIZE,
-        DEFAULT_SHUFFLE_BUFFER_SIZE);
-
-    shuffleTransferToAllowed = conf.getBoolean(SHUFFLE_TRANSFERTO_ALLOWED,
-        (Shell.WINDOWS)?WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED:
-            DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED);
-
     ThreadFactory bossFactory = new ThreadFactoryBuilder()
         .setNameFormat("ShuffleHandler Netty Boss #%d")
         .build();
@@ -230,8 +183,6 @@ public class ShuffleHandler {
         Executors.newCachedThreadPool(workerFactory),
         maxShuffleThreads);
 
-    sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
-        DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
     connectionKeepAliveEnabled =
         conf.getBoolean(SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED,
             DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED);
@@ -332,31 +283,23 @@ public class ShuffleHandler {
   class HttpPipelineFactory implements ChannelPipelineFactory {
 
     final Shuffle SHUFFLE;
-    private SSLFactory sslFactory;
 
     public HttpPipelineFactory(Configuration conf) throws Exception {
       SHUFFLE = getShuffle(conf);
       // TODO Setup SSL Shuffle
-//      if (conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
-//                          MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT)) {
-//        LOG.info("Encrypted shuffle is enabled.");
-//        sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
-//        sslFactory.init();
-//      }
+      if (conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
+          MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT)) {
+        throw new UnsupportedOperationException(
+            "SSL Shuffle is not currently supported for the test shuffle handler");
+      }
     }
 
     public void destroy() {
-      if (sslFactory != null) {
-        sslFactory.destroy();
-      }
     }
 
     @Override
     public ChannelPipeline getPipeline() throws Exception {
       ChannelPipeline pipeline = Channels.pipeline();
-      if (sslFactory != null) {
-        pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
-      }
       pipeline.addLast("decoder", new HttpRequestDecoder());
       pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16));
       pipeline.addLast("encoder", new HttpResponseEncoder());
@@ -689,31 +632,17 @@ public class ShuffleHandler {
         return null;
       }
       ChannelFuture writeFuture;
-      if (ch.getPipeline().get(SslHandler.class) == null) {
-        final FadvisedFileRegion partition = new FadvisedFileRegion(spill,
-            info.getStartOffset(), info.getPartLength(), manageOsCache, readaheadLength,
-            readaheadPool, spillfile.getAbsolutePath(), 
-            shuffleBufferSize, shuffleTransferToAllowed);
-        writeFuture = ch.write(partition);
-        writeFuture.addListener(new ChannelFutureListener() {
-            // TODO error handling; distinguish IO/connection failures,
-            //      attribute to appropriate spill output
-          @Override
-          public void operationComplete(ChannelFuture future) {
-            if (future.isSuccess()) {
-              partition.transferSuccessful();
-            }
-            partition.releaseExternalResources();
-          }
-        });
-      } else {
-        // HTTPS cannot be done with zero copy.
-        final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
-            info.getStartOffset(), info.getPartLength(), sslFileBufferSize,
-            manageOsCache, readaheadLength, readaheadPool,
-            spillfile.getAbsolutePath());
-        writeFuture = ch.write(chunk);
-      }
+      final DefaultFileRegion partition =
+          new DefaultFileRegion(spill.getChannel(), info.getStartOffset(), info.getPartLength());
+      writeFuture = ch.write(partition);
+      writeFuture.addListener(new ChannelFutureListener() {
+        // TODO error handling; distinguish IO/connection failures,
+        //      attribute to appropriate spill output
+        @Override
+        public void operationComplete(ChannelFuture future) {
+          partition.releaseExternalResources();
+        }
+      });
       return writeFuture;
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/de751ec3/tez-tools/analyzers/pom.xml
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/pom.xml b/tez-tools/analyzers/pom.xml
index aba8e74..bb5d896 100644
--- a/tez-tools/analyzers/pom.xml
+++ b/tez-tools/analyzers/pom.xml
@@ -25,7 +25,27 @@
   <artifactId>tez-perf-analyzer</artifactId>
   <packaging>pom</packaging>
 
-  <modules>
-    <module>job-analyzer</module>
-  </modules>
+  <profiles>
+    <profile>
+      <id>hadoop24</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <modules>
+        <module>job-analyzer</module>
+      </modules>
+    </profile>
+    <profile>
+      <id>hadoop26</id>
+      <activation>
+        <property>
+          <name>!skipATS</name>
+        </property>
+      </activation>
+      <modules>
+        <module>job-analyzer</module>
+      </modules>
+    </profile>
+  </profiles>
+
 </project>


[2/2] tez git commit: TEZ-2749. TaskInfo in history parser should not depend on the apache directory project. Fix master build against hadoop-2.4. (sseth)

Posted by ss...@apache.org.
TEZ-2749. TaskInfo in history parser should not depend on the apache
directory project. Fix master build against hadoop-2.4. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6098f1bb
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6098f1bb
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6098f1bb

Branch: refs/heads/master
Commit: 6098f1bb9cb8b03a0340ec6edc244a1f1e022440
Parents: de751ec
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Aug 26 15:45:28 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Aug 26 15:45:28 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                               |  1 +
 .../org/apache/tez/history/parser/datamodel/TaskInfo.java | 10 +++++++---
 2 files changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/6098f1bb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f91df1b..0c15c1f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -17,6 +17,7 @@ INCOMPATIBLE CHANGES
   TEZ-2468. Change the minimum Java version to Java 7.
 
 ALL CHANGES:
+  TEZ-2749. TaskInfo in history parser should not depend on the apache directory project. Fix master build against hadoop-2.4
   TEZ-2748. Fix master build against hadoop-2.2. 
   TEZ-2743. Fix TezContainerLauncher logging tokens.
   TEZ-2708. Rename classes and variables post TEZ-2003 changes.

http://git-wip-us.apache.org/repos/asf/tez/blob/6098f1bb/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java
index a30d311..7a89166 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java
@@ -28,7 +28,6 @@ import com.google.common.collect.Multimap;
 import com.google.common.collect.Multimaps;
 import com.google.common.collect.Ordering;
 
-import org.apache.directory.api.util.Strings;
 import org.apache.hadoop.util.StringInterner;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.codehaus.jettison.json.JSONException;
@@ -177,7 +176,8 @@ public class TaskInfo extends BaseInfo {
   public final List<TaskAttemptInfo> getTaskAttempts(final TaskAttemptState state) {
     return Collections.unmodifiableList(Lists.newLinkedList(Iterables.filter(Lists.newLinkedList
                     (attemptInfoMap.values()), new Predicate<TaskAttemptInfo>() {
-                  @Override public boolean apply(TaskAttemptInfo input) {
+                  @Override
+                  public boolean apply(TaskAttemptInfo input) {
                     return input.getStatus() != null && input.getStatus().equals(state.toString());
                   }
                 }
@@ -205,7 +205,7 @@ public class TaskInfo extends BaseInfo {
    * @return TaskAttemptInfo
    */
   public final TaskAttemptInfo getSuccessfulTaskAttempt() {
-    if (Strings.isNotEmpty(getSuccessfulAttemptId())) {
+    if (isNotNullOrEmpty(getSuccessfulAttemptId())) {
       for (TaskAttemptInfo attemptInfo : getTaskAttempts()) {
         if (attemptInfo.getTaskAttemptId().equals(getSuccessfulAttemptId())) {
           return attemptInfo;
@@ -350,4 +350,8 @@ public class TaskInfo extends BaseInfo {
     sb.append("]");
     return sb.toString();
   }
+
+  private static boolean isNotNullOrEmpty(String str) {
+    return str != null && !str.isEmpty();
+  }
 }