You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/27 00:45:34 UTC
[1/2] tez git commit: TEZ-2748. Fix master build against hadoop-2.2.
(sseth)
Repository: tez
Updated Branches:
refs/heads/master 9a3d8898b -> 6098f1bb9
TEZ-2748. Fix master build against hadoop-2.2. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/de751ec3
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/de751ec3
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/de751ec3
Branch: refs/heads/master
Commit: de751ec3da9d29fafefa78039450cfffc4fd4d80
Parents: 9a3d889
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Aug 26 15:44:10 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Aug 26 15:44:10 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../tez/shufflehandler/ShuffleHandler.java | 107 ++++---------------
tez-tools/analyzers/pom.xml | 26 ++++-
3 files changed, 42 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/de751ec3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 42e7290..f91df1b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -17,6 +17,7 @@ INCOMPATIBLE CHANGES
TEZ-2468. Change the minimum Java version to Java 7.
ALL CHANGES:
+ TEZ-2748. Fix master build against hadoop-2.2.
TEZ-2743. Fix TezContainerLauncher logging tokens.
TEZ-2708. Rename classes and variables post TEZ-2003 changes.
TEZ-2740. Create a reconfigureVertex alias for deprecated
http://git-wip-us.apache.org/repos/asf/tez/blob/de751ec3/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java
index 046ce18..ebaf9fe 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java
@@ -54,17 +54,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.ReadaheadPool;
import org.apache.hadoop.io.SecureIOUtils;
-import org.apache.hadoop.mapred.FadvisedChunkedFile;
-import org.apache.hadoop.mapred.FadvisedFileRegion;
-import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.mapreduce.hadoop.MRConfig;
import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
@@ -79,6 +75,7 @@ import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.DefaultFileRegion;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
@@ -95,7 +92,6 @@ import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.QueryStringDecoder;
-import org.jboss.netty.handler.ssl.SslHandler;
import org.jboss.netty.handler.stream.ChunkedWriteHandler;
import org.jboss.netty.util.CharsetUtil;
import org.slf4j.Logger;
@@ -107,12 +103,6 @@ public class ShuffleHandler {
public static final String SHUFFLE_HANDLER_LOCAL_DIRS = "tez.shuffle.handler.local-dirs";
- public static final String SHUFFLE_MANAGE_OS_CACHE = "mapreduce.shuffle.manage.os.cache";
- public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true;
-
- public static final String SHUFFLE_READAHEAD_BYTES = "mapreduce.shuffle.readahead.bytes";
- public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024;
-
// pattern to identify errors related to the client closing the socket early
// idea borrowed from Netty SslHandler
private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
@@ -123,7 +113,6 @@ public class ShuffleHandler {
private final ChannelFactory selector;
private final ChannelGroup accepted = new DefaultChannelGroup();
protected HttpPipelineFactory pipelineFact;
- private final int sslFileBufferSize;
private final Configuration conf;
private final ConcurrentMap<String, Boolean> registeredApps = new ConcurrentHashMap<String, Boolean>();
@@ -132,20 +121,11 @@ public class ShuffleHandler {
* Should the shuffle use posix_fadvise calls to manage the OS cache during
* sendfile
*/
- private final boolean manageOsCache;
- private final int readaheadLength;
private final int maxShuffleConnections;
- private final int shuffleBufferSize;
- private final boolean shuffleTransferToAllowed;
- private final ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
private Map<String,String> userRsrc;
private JobTokenSecretManager secretManager;
- // TODO Fix this for tez.
- public static final String MAPREDUCE_SHUFFLE_SERVICEID =
- "mapreduce_shuffle";
-
public static final String SHUFFLE_PORT_CONFIG_KEY = "tez.shuffle.port";
public static final int DEFAULT_SHUFFLE_PORT = 15551;
@@ -165,11 +145,6 @@ public class ShuffleHandler {
public static final String CONNECTION_CLOSE = "close";
- public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY =
- "mapreduce.shuffle.ssl.file.buffer.size";
-
- public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024;
-
public static final String MAX_SHUFFLE_CONNECTIONS = "mapreduce.shuffle.max.connections";
public static final int DEFAULT_MAX_SHUFFLE_CONNECTIONS = 0; // 0 implies no limit
@@ -177,16 +152,6 @@ public class ShuffleHandler {
// 0 implies Netty default of 2 * number of available processors
public static final int DEFAULT_MAX_SHUFFLE_THREADS = 0;
- public static final String SHUFFLE_BUFFER_SIZE =
- "mapreduce.shuffle.transfer.buffer.size";
- public static final int DEFAULT_SHUFFLE_BUFFER_SIZE = 128 * 1024;
-
- public static final String SHUFFLE_TRANSFERTO_ALLOWED =
- "mapreduce.shuffle.transferTo.allowed";
- public static final boolean DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED = true;
- public static final boolean WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED =
- false;
-
final boolean connectionKeepAliveEnabled;
final int connectionKeepAliveTimeOut;
final int mapOutputMetaInfoCacheSize;
@@ -197,11 +162,6 @@ public class ShuffleHandler {
public ShuffleHandler(Configuration conf) {
this.conf = conf;
- manageOsCache = conf.getBoolean(SHUFFLE_MANAGE_OS_CACHE,
- DEFAULT_SHUFFLE_MANAGE_OS_CACHE);
-
- readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES,
- DEFAULT_SHUFFLE_READAHEAD_BYTES);
maxShuffleConnections = conf.getInt(MAX_SHUFFLE_CONNECTIONS,
DEFAULT_MAX_SHUFFLE_CONNECTIONS);
@@ -211,13 +171,6 @@ public class ShuffleHandler {
maxShuffleThreads = 2 * Runtime.getRuntime().availableProcessors();
}
- shuffleBufferSize = conf.getInt(SHUFFLE_BUFFER_SIZE,
- DEFAULT_SHUFFLE_BUFFER_SIZE);
-
- shuffleTransferToAllowed = conf.getBoolean(SHUFFLE_TRANSFERTO_ALLOWED,
- (Shell.WINDOWS)?WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED:
- DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED);
-
ThreadFactory bossFactory = new ThreadFactoryBuilder()
.setNameFormat("ShuffleHandler Netty Boss #%d")
.build();
@@ -230,8 +183,6 @@ public class ShuffleHandler {
Executors.newCachedThreadPool(workerFactory),
maxShuffleThreads);
- sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
- DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
connectionKeepAliveEnabled =
conf.getBoolean(SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED,
DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED);
@@ -332,31 +283,23 @@ public class ShuffleHandler {
class HttpPipelineFactory implements ChannelPipelineFactory {
final Shuffle SHUFFLE;
- private SSLFactory sslFactory;
public HttpPipelineFactory(Configuration conf) throws Exception {
SHUFFLE = getShuffle(conf);
// TODO Setup SSL Shuffle
-// if (conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
-// MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT)) {
-// LOG.info("Encrypted shuffle is enabled.");
-// sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
-// sslFactory.init();
-// }
+ if (conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
+ MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT)) {
+ throw new UnsupportedOperationException(
+ "SSL Shuffle is not currently supported for the test shuffle handler");
+ }
}
public void destroy() {
- if (sslFactory != null) {
- sslFactory.destroy();
- }
}
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
- if (sslFactory != null) {
- pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
- }
pipeline.addLast("decoder", new HttpRequestDecoder());
pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16));
pipeline.addLast("encoder", new HttpResponseEncoder());
@@ -689,31 +632,17 @@ public class ShuffleHandler {
return null;
}
ChannelFuture writeFuture;
- if (ch.getPipeline().get(SslHandler.class) == null) {
- final FadvisedFileRegion partition = new FadvisedFileRegion(spill,
- info.getStartOffset(), info.getPartLength(), manageOsCache, readaheadLength,
- readaheadPool, spillfile.getAbsolutePath(),
- shuffleBufferSize, shuffleTransferToAllowed);
- writeFuture = ch.write(partition);
- writeFuture.addListener(new ChannelFutureListener() {
- // TODO error handling; distinguish IO/connection failures,
- // attribute to appropriate spill output
- @Override
- public void operationComplete(ChannelFuture future) {
- if (future.isSuccess()) {
- partition.transferSuccessful();
- }
- partition.releaseExternalResources();
- }
- });
- } else {
- // HTTPS cannot be done with zero copy.
- final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
- info.getStartOffset(), info.getPartLength(), sslFileBufferSize,
- manageOsCache, readaheadLength, readaheadPool,
- spillfile.getAbsolutePath());
- writeFuture = ch.write(chunk);
- }
+ final DefaultFileRegion partition =
+ new DefaultFileRegion(spill.getChannel(), info.getStartOffset(), info.getPartLength());
+ writeFuture = ch.write(partition);
+ writeFuture.addListener(new ChannelFutureListener() {
+ // TODO error handling; distinguish IO/connection failures,
+ // attribute to appropriate spill output
+ @Override
+ public void operationComplete(ChannelFuture future) {
+ partition.releaseExternalResources();
+ }
+ });
return writeFuture;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/de751ec3/tez-tools/analyzers/pom.xml
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/pom.xml b/tez-tools/analyzers/pom.xml
index aba8e74..bb5d896 100644
--- a/tez-tools/analyzers/pom.xml
+++ b/tez-tools/analyzers/pom.xml
@@ -25,7 +25,27 @@
<artifactId>tez-perf-analyzer</artifactId>
<packaging>pom</packaging>
- <modules>
- <module>job-analyzer</module>
- </modules>
+ <profiles>
+ <profile>
+ <id>hadoop24</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <modules>
+ <module>job-analyzer</module>
+ </modules>
+ </profile>
+ <profile>
+ <id>hadoop26</id>
+ <activation>
+ <property>
+ <name>!skipATS</name>
+ </property>
+ </activation>
+ <modules>
+ <module>job-analyzer</module>
+ </modules>
+ </profile>
+ </profiles>
+
</project>
[2/2] tez git commit: TEZ-2749. TaskInfo in history parser should not
depend on the apache directory project. Fix master build against hadoop-2.4.
(sseth)
Posted by ss...@apache.org.
TEZ-2749. TaskInfo in history parser should not depend on the apache
directory project. Fix master build against hadoop-2.4. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6098f1bb
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6098f1bb
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6098f1bb
Branch: refs/heads/master
Commit: 6098f1bb9cb8b03a0340ec6edc244a1f1e022440
Parents: de751ec
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Aug 26 15:45:28 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Aug 26 15:45:28 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/tez/history/parser/datamodel/TaskInfo.java | 10 +++++++---
2 files changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/6098f1bb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f91df1b..0c15c1f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -17,6 +17,7 @@ INCOMPATIBLE CHANGES
TEZ-2468. Change the minimum Java version to Java 7.
ALL CHANGES:
+ TEZ-2749. TaskInfo in history parser should not depend on the apache directory project. Fix master build against hadoop-2.4
TEZ-2748. Fix master build against hadoop-2.2.
TEZ-2743. Fix TezContainerLauncher logging tokens.
TEZ-2708. Rename classes and variables post TEZ-2003 changes.
http://git-wip-us.apache.org/repos/asf/tez/blob/6098f1bb/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java
index a30d311..7a89166 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java
@@ -28,7 +28,6 @@ import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.Ordering;
-import org.apache.directory.api.util.Strings;
import org.apache.hadoop.util.StringInterner;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.codehaus.jettison.json.JSONException;
@@ -177,7 +176,8 @@ public class TaskInfo extends BaseInfo {
public final List<TaskAttemptInfo> getTaskAttempts(final TaskAttemptState state) {
return Collections.unmodifiableList(Lists.newLinkedList(Iterables.filter(Lists.newLinkedList
(attemptInfoMap.values()), new Predicate<TaskAttemptInfo>() {
- @Override public boolean apply(TaskAttemptInfo input) {
+ @Override
+ public boolean apply(TaskAttemptInfo input) {
return input.getStatus() != null && input.getStatus().equals(state.toString());
}
}
@@ -205,7 +205,7 @@ public class TaskInfo extends BaseInfo {
* @return TaskAttemptInfo
*/
public final TaskAttemptInfo getSuccessfulTaskAttempt() {
- if (Strings.isNotEmpty(getSuccessfulAttemptId())) {
+ if (isNotNullOrEmpty(getSuccessfulAttemptId())) {
for (TaskAttemptInfo attemptInfo : getTaskAttempts()) {
if (attemptInfo.getTaskAttemptId().equals(getSuccessfulAttemptId())) {
return attemptInfo;
@@ -350,4 +350,8 @@ public class TaskInfo extends BaseInfo {
sb.append("]");
return sb.toString();
}
+
+ private static boolean isNotNullOrEmpty(String str) {
+ return str != null && !str.isEmpty();
+ }
}