You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2016/08/12 18:12:03 UTC
tez git commit: TEZ-3238. TEZ-3238. Shuffle service name should be configureable and should not be hardcoded to ‘mapreduce_shuffle’ (jeagles)
Repository: tez
Updated Branches:
refs/heads/TEZ-3334 397d6af39 -> 53ea6f5b3
TEZ-3238. TEZ-3238. Shuffle service name should be configureable and should not be hardcoded to \u2018mapreduce_shuffle\u2019 (jeagles)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/53ea6f5b
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/53ea6f5b
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/53ea6f5b
Branch: refs/heads/TEZ-3334
Commit: 53ea6f5b3adfa71f3c235467a50757c72528d653
Parents: 397d6af
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Fri Aug 12 13:11:40 2016 -0500
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Fri Aug 12 13:11:40 2016 -0500
----------------------------------------------------------------------
TEZ-3334-CHANGES.txt | 1 +
.../org/apache/tez/client/TezClientUtils.java | 4 +++-
.../apache/tez/dag/api/TezConfiguration.java | 10 ++++++++++
.../org/apache/tez/dag/api/TezConstants.java | 6 ------
.../app/launcher/LocalContainerLauncher.java | 16 ++++++++-------
.../app/rm/container/AMContainerHelpers.java | 9 ++++++---
.../tez/service/impl/ContainerRunnerImpl.java | 8 +++++---
.../apache/tez/service/impl/TezTestService.java | 5 ++++-
.../tez/mapreduce/processor/MapUtils.java | 7 +++++--
.../processor/reduce/TestReduceProcessor.java | 7 +++++--
.../org/apache/tez/runtime/task/TezChild.java | 4 +++-
.../library/common/shuffle/ShuffleUtils.java | 21 ++++++++++++--------
.../common/shuffle/impl/ShuffleManager.java | 9 ++++++---
.../orderedgrouped/ShuffleScheduler.java | 7 +++++--
.../common/sort/impl/PipelinedSorter.java | 4 ++--
.../common/sort/impl/dflt/DefaultSorter.java | 2 +-
.../writers/UnorderedPartitionedKVWriter.java | 5 ++++-
.../output/OrderedPartitionedKVOutput.java | 2 +-
.../common/shuffle/TestShuffleUtils.java | 12 ++++++-----
.../impl/TestShuffleInputEventHandlerImpl.java | 7 +++++--
.../common/shuffle/impl/TestShuffleManager.java | 7 +++++--
.../common/sort/impl/TestPipelinedSorter.java | 8 +++++---
.../sort/impl/dflt/TestDefaultSorter.java | 4 +++-
.../TestUnorderedPartitionedKVWriter.java | 15 ++++++++------
.../library/output/TestOnFileSortedOutput.java | 4 +++-
.../output/TestOnFileUnorderedKVOutput.java | 4 +++-
26 files changed, 123 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/TEZ-3334-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-3334-CHANGES.txt b/TEZ-3334-CHANGES.txt
index ef0a1cb..2122cca 100644
--- a/TEZ-3334-CHANGES.txt
+++ b/TEZ-3334-CHANGES.txt
@@ -4,6 +4,7 @@ Apache Tez Change Log
INCOMPATIBLE CHANGES:
ALL CHANGES:
+ TEZ-3238. Shuffle service name should be configureable and should not be hardcoded to \u2018mapreduce_shuffle\u2019
TEZ-3390. Package Shuffle Handler as a shaded uber-jar
TEZ-3378. Move Shuffle Handler configuration into the Tez namespace
TEZ-3377. Remove ShuffleHandler dependency on mapred.FadvisedChunkedFile and mapred.FadvisedFileRegion
http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index eb1a95e..fbe36f1 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -692,7 +692,9 @@ public class TezClientUtils {
// provide this to AuxServices running on the AM node - in case tasks run within the AM,
// and no other task runs on this node.
Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
- serviceData.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
+ String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+ TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
+ serviceData.put(auxiliaryService,
TezCommonUtils.serializeServiceData(TokenCache.getSessionToken(amLaunchCredentials)));
// Setup ContainerLaunchContext for AM container
http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 11c50cf..732fee9 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -138,6 +138,16 @@ public class TezConfiguration extends Configuration {
public static final boolean TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE_DEFAULT = true;
/**
+ * String value. Specifies the name of the shuffle auxiliary service.
+ */
+ @ConfigurationScope(Scope.AM)
+ @ConfigurationProperty
+ public static final String TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID = TEZ_AM_PREFIX +
+ "shuffle.auxiliary-service.id";
+ public static final String TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT =
+ "mapreduce_shuffle";
+
+ /**
* String value. Specifies a directory where Tez can create temporary job artifacts.
*/
@ConfigurationScope(Scope.AM)
http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
index 43f09a2..31c1e66 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
@@ -69,12 +69,6 @@ public class TezConstants {
TezConfiguration.TEZ_SESSION_PREFIX + "local-resources.pb";
public static final String TEZ_APPLICATION_TYPE = "TEZ";
-
- /**
- * The service id for the NodeManager plugin used to share intermediate data
- * between vertices.
- */
- public static final String TEZ_SHUFFLE_HANDLER_SERVICE_ID = "tez_shuffle";
public static final String TEZ_PREWARM_DAG_NAME_PREFIX = "TezPreWarmDAG";
http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index 1e9d1e6..153b2e0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -118,13 +118,6 @@ public class LocalContainerLauncher extends ContainerLauncher {
this.tal = taskCommunicatorManagerInterface;
this.workingDirectory = workingDirectory;
this.isLocalMode = isLocalMode;
- if (isLocalMode) {
- localEnv = Maps.newHashMap();
- AuxiliaryServiceHelper.setServiceDataIntoEnv(
- ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, ByteBuffer.allocate(4).putInt(0), localEnv);
- } else {
- localEnv = System.getenv();
- }
// Check if the hostname is set in the environment before overriding it.
String host = isLocalMode ? InetAddress.getLocalHost().getHostName() :
@@ -138,6 +131,15 @@ public class LocalContainerLauncher extends ContainerLauncher {
throw new TezUncheckedException(
"Failed to parse user payload for " + LocalContainerLauncher.class.getSimpleName(), e);
}
+ if (isLocalMode) {
+ String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+ TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
+ localEnv = Maps.newHashMap();
+ AuxiliaryServiceHelper.setServiceDataIntoEnv(
+ auxiliaryService, ByteBuffer.allocate(4).putInt(0), localEnv);
+ } else {
+ localEnv = System.getenv();
+ }
numExecutors = conf.getInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS,
TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS_DEFAULT);
Preconditions.checkState(numExecutors >=1, "Must have at least 1 executor");
http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
index 11b5006..51e954d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
@@ -91,10 +91,11 @@ public class AMContainerHelpers {
* Create the common {@link ContainerLaunchContext} for all attempts.
*
* @param applicationACLs
+ * @param conf
*/
private static ContainerLaunchContext createCommonContainerLaunchContext(
Map<ApplicationAccessType, String> applicationACLs,
- Credentials credentials, Map<String, LocalResource> localResources) {
+ Credentials credentials, Map<String, LocalResource> localResources, Configuration conf) {
// Application environment
Map<String, String> environment = new HashMap<String, String>();
@@ -128,7 +129,9 @@ public class AMContainerHelpers {
if (LOG.isDebugEnabled()) {
LOG.debug("Putting shuffle token in serviceData in common CLC");
}
- serviceData.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
+ String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+ TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
+ serviceData.put(auxiliaryService,
TezCommonUtils.serializeServiceData(TokenCache.getSessionToken(containerCredentials)));
} catch (IOException e) {
throw new TezUncheckedException(e);
@@ -159,7 +162,7 @@ public class AMContainerHelpers {
synchronized (commonContainerSpecLock) {
if (!commonContainerSpecs.containsKey(tezDAGID)) {
commonContainerSpec =
- createCommonContainerLaunchContext(acls, credentials, commonDAGLRs);
+ createCommonContainerLaunchContext(acls, credentials, commonDAGLRs, conf);
commonContainerSpecs.put(tezDAGID, commonContainerSpec);
} else {
commonContainerSpec = commonContainerSpecs.get(tezDAGID);
http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
index f9de995..b3d6d54 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
@@ -132,9 +132,9 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
public void serviceStart() {
}
- public void setShufflePort(int shufflePort) {
+ public void setShufflePort(String auxiliaryService, int shufflePort) {
AuxiliaryServiceHelper.setServiceDataIntoEnv(
- TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
+ auxiliaryService,
ByteBuffer.allocate(4).putInt(shufflePort), localEnv);
}
@@ -417,7 +417,9 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
- serviceConsumerMetadata.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
+ String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+ TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
+ serviceConsumerMetadata.put(auxiliaryService,
TezCommonUtils.convertJobTokenToBytes(jobToken));
Multimap<String, String> startedInputsMap = HashMultimap.create();
http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java
index 322be00..8d9436e 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.service.ContainerRunner;
import org.apache.tez.shufflehandler.ShuffleHandler;
@@ -86,7 +87,9 @@ public class TezTestService extends AbstractService implements ContainerRunner {
@Override
public void serviceStart() throws Exception {
ShuffleHandler.initializeAndStart(shuffleHandlerConf);
- containerRunner.setShufflePort(ShuffleHandler.get().getPort());
+ String auxiliaryService = getConfig().get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+ TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
+ containerRunner.setShufflePort(auxiliaryService, ShuffleHandler.get().getPort());
server.start();
containerRunner.start();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
index 8309966..47ce377 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.hadoop.shim.DefaultHadoopShim;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -216,12 +217,14 @@ public class MapUtils {
outputSpecs, null, null);
Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
- serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID,
+ String auxiliaryService = jobConf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+ TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
+ serviceConsumerMetadata.put(auxiliaryService,
ShuffleUtils.convertJobTokenToBytes(shuffleToken));
Map<String, String> envMap = new HashMap<String, String>();
ByteBuffer shufflePortBb = ByteBuffer.allocate(4).putInt(0, 8000);
AuxiliaryServiceHelper
- .setServiceDataIntoEnv(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, shufflePortBb,
+ .setServiceDataIntoEnv(auxiliaryService, shufflePortBb,
envMap);
LogicalIOProcessorRuntimeTask task = new LogicalIOProcessorRuntimeTask(
http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index 1922c53..043bd94 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -26,6 +26,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.hadoop.shim.DefaultHadoopShim;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -208,12 +209,14 @@ public class TestReduceProcessor {
Collections.singletonList(reduceOutputSpec), null, null);
Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
- serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID,
+ String auxiliaryService = jobConf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+ TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
+ serviceConsumerMetadata.put(auxiliaryService,
ShuffleUtils.convertJobTokenToBytes(shuffleToken));
Map<String, String> serviceProviderEnvMap = new HashMap<String, String>();
ByteBuffer shufflePortBb = ByteBuffer.allocate(4).putInt(0, 8000);
AuxiliaryServiceHelper
- .setServiceDataIntoEnv(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, shufflePortBb,
+ .setServiceDataIntoEnv(auxiliaryService, shufflePortBb,
serviceProviderEnvMap);
LogicalIOProcessorRuntimeTask task = new LogicalIOProcessorRuntimeTask(
http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
index 07810d9..3ee89cf 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -175,7 +175,9 @@ public class TezChild {
UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(tokenIdentifier);
Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
- serviceConsumerMetadata.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
+ String auxiliaryService = defaultConf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+ TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
+ serviceConsumerMetadata.put(auxiliaryService,
TezCommonUtils.convertJobTokenToBytes(jobToken));
if (umbilical == null) {
http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index e194298..fa8533c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -39,6 +39,7 @@ import com.google.protobuf.ByteString;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.http.BaseHttpConnection;
import org.apache.tez.http.HttpConnection;
import org.apache.tez.http.HttpConnectionParams;
@@ -73,7 +74,6 @@ import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DetailedP
public class ShuffleUtils {
private static final Logger LOG = LoggerFactory.getLogger(ShuffleUtils.class);
- public static final String SHUFFLE_HANDLER_SERVICE_ID = "tez_shuffle";
private static final long MB = 1024l * 1024l;
//Shared by multiple threads
@@ -278,12 +278,13 @@ public class ShuffleUtils {
* @param finalMergeEnabled
* @param isLastEvent
* @param pathComponent
+ * @param conf
* @return ByteBuffer
* @throws IOException
*/
static ByteBuffer generateDMEPayload(boolean sendEmptyPartitionDetails,
- int numPhysicalOutputs, TezSpillRecord spillRecord, OutputContext context,
- int spillId, boolean finalMergeEnabled, boolean isLastEvent, String pathComponent)
+ int numPhysicalOutputs, TezSpillRecord spillRecord, OutputContext context,
+ int spillId, boolean finalMergeEnabled, boolean isLastEvent, String pathComponent, Configuration conf)
throws IOException {
DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto
.newBuilder();
@@ -312,8 +313,11 @@ public class ShuffleUtils {
if (!sendEmptyPartitionDetails || outputGenerated) {
String host = context.getExecutionContext().getHostName();
+ String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+ TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
+
ByteBuffer shuffleMetadata = context
- .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
+ .getServiceProviderMetaData(auxiliaryService);
int shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata);
payloadBuilder.setHost(host);
payloadBuilder.setPort(shufflePort);
@@ -398,12 +402,13 @@ public class ShuffleUtils {
* @param numPhysicalOutputs
* @param pathComponent
* @param partitionStats
+ * @param conf
* @throws IOException
*/
public static void generateEventOnSpill(List<Event> eventList, boolean finalMergeEnabled,
- boolean isLastEvent, OutputContext context, int spillId, TezSpillRecord spillRecord,
- int numPhysicalOutputs, boolean sendEmptyPartitionDetails, String pathComponent,
- @Nullable long[] partitionStats, boolean reportDetailedPartitionStats)
+ boolean isLastEvent, OutputContext context, int spillId, TezSpillRecord spillRecord,
+ int numPhysicalOutputs, boolean sendEmptyPartitionDetails, String pathComponent,
+ @Nullable long[] partitionStats, boolean reportDetailedPartitionStats, Configuration conf)
throws IOException {
Preconditions.checkArgument(eventList != null, "EventList can't be null");
@@ -421,7 +426,7 @@ public class ShuffleUtils {
ByteBuffer payload = generateDMEPayload(sendEmptyPartitionDetails, numPhysicalOutputs,
spillRecord, context, spillId,
- finalMergeEnabled, isLastEvent, pathComponent);
+ finalMergeEnabled, isLastEvent, pathComponent, conf);
if (finalMergeEnabled || isLastEvent) {
VertexManagerEvent vmEvent = generateVMEvent(context, partitionStats,
http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index c80713b..a8fea9e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -45,6 +45,7 @@ import java.util.concurrent.locks.ReentrantLock;
import javax.crypto.SecretKey;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.http.HttpConnectionParams;
import org.apache.tez.runtime.api.TaskFailureType;
import org.slf4j.Logger;
@@ -244,10 +245,12 @@ public class ShuffleManager implements FetcherCallback {
this.startTime = System.currentTimeMillis();
this.lastProgressTime = startTime;
-
+
+ String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+ TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
SecretKey shuffleSecret = ShuffleUtils
.getJobTokenSecretFromTokenBytes(inputContext
- .getServiceConsumerMetaData(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID));
+ .getServiceConsumerMetaData(auxiliaryService));
this.jobTokenSecretMgr = new JobTokenSecretManager(shuffleSecret);
this.asyncHttp = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP, false);
httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf);
@@ -261,7 +264,7 @@ public class ShuffleManager implements FetcherCallback {
localDirAllocator.getAllLocalPathsToRead(".", conf), Path.class);
this.localhostName = inputContext.getExecutionContext().getHostName();
final ByteBuffer shuffleMetaData =
- inputContext.getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
+ inputContext.getServiceProviderMetaData(auxiliaryService);
this.shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetaData);
/**
http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index afd280b..a83c301 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -55,6 +55,7 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.http.HttpConnectionParams;
import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.common.security.JobTokenSecretManager;
@@ -327,8 +328,10 @@ class ShuffleScheduler {
this.applicationId = inputContext.getApplicationId().toString();
this.dagId = inputContext.getDagIdentifier();
this.localHostname = inputContext.getExecutionContext().getHostName();
+ String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+ TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
final ByteBuffer shuffleMetadata =
- inputContext.getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
+ inputContext.getServiceProviderMetaData(auxiliaryService);
this.shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata);
this.referee = new Referee();
@@ -371,7 +374,7 @@ class ShuffleScheduler {
this.conf, UserGroupInformation.getCurrentUser().getShortUserName());
SecretKey jobTokenSecret = ShuffleUtils
.getJobTokenSecretFromTokenBytes(inputContext
- .getServiceConsumerMetaData(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID));
+ .getServiceConsumerMetaData(auxiliaryService));
this.jobTokenSecretManager = new JobTokenSecretManager(jobTokenSecret);
ExecutorService fetcherRawExecutor = Executors.newFixedThreadPool(numFetchers,
http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 897d7d7..e468a55 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -350,7 +350,7 @@ public class PipelinedSorter extends ExternalSorter {
ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), false,
outputContext, (numSpills - 1), indexCacheList.get(numSpills - 1),
partitions, sendEmptyPartitionDetails, pathComponent, partitionStats,
- reportDetailedPartitionStats());
+ reportDetailedPartitionStats(), this.conf);
outputContext.sendEvents(events);
LOG.info(outputContext.getDestinationVertexName() +
": Added spill event for spill (final update=false), spillId=" + (numSpills - 1));
@@ -673,7 +673,7 @@ public class PipelinedSorter extends ExternalSorter {
ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), isLastEvent,
outputContext, i, indexCacheList.get(i), partitions,
sendEmptyPartitionDetails, pathComponent, partitionStats,
- reportDetailedPartitionStats());
+ reportDetailedPartitionStats(), this.conf);
LOG.info(outputContext.getDestinationVertexName() + ": Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + i);
}
outputContext.sendEvents(events);
http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index 69bfdb8..21c40e9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -1133,7 +1133,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
String pathComponent = (outputContext.getUniqueIdentifier() + "_" + index);
ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), isLastEvent,
outputContext, index, spillRecord, partitions, sendEmptyPartitionDetails, pathComponent,
- partitionStats, reportDetailedPartitionStats());
+ partitionStats, reportDetailedPartitionStats(), this.conf);
LOG.info(outputContext.getDestinationVertexName() + ": " +
"Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + index);
http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 152096c..760daf5 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -52,6 +52,7 @@ import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.TaskFailureType;
import org.apache.tez.runtime.api.OutputContext;
@@ -1084,8 +1085,10 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
@VisibleForTesting
int getShufflePort() throws IOException {
+ String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+ TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
ByteBuffer shuffleMetadata = outputContext
- .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
+ .getServiceProviderMetaData(auxiliaryService);
int shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata);
return shufflePort;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
index 9a3d778..13e27eb 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
@@ -200,7 +200,7 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput {
ShuffleUtils.generateEventOnSpill(eventList, finalMergeEnabled, isLastEvent,
getContext(), 0, new TezSpillRecord(sorter.getFinalIndexFile(), conf),
getNumPhysicalOutputs(), sendEmptyPartitionDetails, getContext().getUniqueIdentifier(),
- sorter.getPartitionStats(), sorter.reportDetailedPartitionStats());
+ sorter.getPartitionStats(), sorter.reportDetailedPartitionStats(), this.conf);
}
return eventList;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
index 4233f5d..03ddfa5 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
@@ -17,6 +17,7 @@ import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.OutputContext;
@@ -100,7 +101,8 @@ public class TestShuffleUtils {
serviceProviderMetaData.writeInt(80);
doReturn(ByteBuffer.wrap(serviceProviderMetaData.getData())).when(outputContext)
.getServiceProviderMetaData
- (ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
+ (conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+ TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT));
doReturn(1).when(outputContext).getTaskVertexIndex();
@@ -115,8 +117,8 @@ public class TestShuffleUtils {
@Before
public void setup() throws Exception {
- outputContext = createTezOutputContext();
conf = new Configuration();
+ outputContext = createTezOutputContext();
conf.set("fs.defaultFS", "file:///");
localFs = FileSystem.getLocal(conf);
@@ -163,7 +165,7 @@ public class TestShuffleUtils {
String pathComponent = "/attempt_x_y_0/file.out";
ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent,
outputContext, spillId, new TezSpillRecord(indexFile, conf),
- physicalOutputs, true, pathComponent, null, false);
+ physicalOutputs, true, pathComponent, null, false, this.conf);
Assert.assertTrue(events.size() == 1);
Assert.assertTrue(events.get(0) instanceof CompositeDataMovementEvent);
@@ -202,7 +204,7 @@ public class TestShuffleUtils {
//normal code path where we do final merge all the time
ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent,
outputContext, spillId, new TezSpillRecord(indexFile, conf),
- physicalOutputs, true, pathComponent, null, false);
+ physicalOutputs, true, pathComponent, null, false, this.conf);
Assert.assertTrue(events.size() == 2); //one for VM
Assert.assertTrue(events.get(0) instanceof VertexManagerEvent);
@@ -243,7 +245,7 @@ public class TestShuffleUtils {
//normal code path where we do final merge all the time
ShuffleUtils.generateEventOnSpill(events, finalMergeDisabled, isLastEvent,
outputContext, spillId, new TezSpillRecord(indexFile, conf),
- physicalOutputs, true, pathComponent, null, false);
+ physicalOutputs, true, pathComponent, null, false, this.conf);
Assert.assertTrue(events.size() == 2); //one for VM
Assert.assertTrue(events.get(0) instanceof VertexManagerEvent);
http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
index 6bcbeb6..e085d1a 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
@@ -44,6 +44,7 @@ import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.ExecutionContext;
@@ -167,7 +168,8 @@ public class TestShuffleInputEventHandlerImpl {
doReturn(new TezCounters()).when(inputContext).getCounters();
doReturn("sourceVertex").when(inputContext).getSourceVertexName();
doReturn(shuffleMetaData).when(inputContext)
- .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
+ .getServiceProviderMetaData(conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+ TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT));
doReturn(executionContext).when(inputContext).getExecutionContext();
return inputContext;
}
@@ -183,7 +185,8 @@ public class TestShuffleInputEventHandlerImpl {
Token<JobTokenIdentifier> token = new Token(new JobTokenIdentifier(), new JobTokenSecretManager(null));
token.write(out);
doReturn(ByteBuffer.wrap(out.getData())).when(inputContext).getServiceConsumerMetaData(
- TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID);
+ conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+ TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT));
FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class);
ShuffleManager realShuffleManager = new ShuffleManager(inputContext, conf, 2,
http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
index a5608ef..34ca13f 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
@@ -44,6 +44,7 @@ import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.ExecutionContext;
@@ -144,7 +145,8 @@ public class TestShuffleManager {
doReturn(new TezCounters()).when(inputContext).getCounters();
doReturn("sourceVertex").when(inputContext).getSourceVertexName();
doReturn(shuffleMetaData).when(inputContext)
- .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
+ .getServiceProviderMetaData(conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+ TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT));
doReturn(executionContext).when(inputContext).getExecutionContext();
return inputContext;
}
@@ -165,7 +167,8 @@ public class TestShuffleManager {
token.write(out);
doReturn(ByteBuffer.wrap(out.getData())).when(inputContext).
getServiceConsumerMetaData(
- TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID);
+ conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+ TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT));
FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class);
return new ShuffleManagerForTest(inputContext, conf,
http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
index 80e7b14..c3f8dda 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
@@ -34,6 +34,7 @@ import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.OutputContext;
@@ -99,7 +100,7 @@ public class TestPipelinedSorter {
ApplicationId appId = ApplicationId.newInstance(10000, 1);
TezCounters counters = new TezCounters();
String uniqueId = UUID.randomUUID().toString();
- this.outputContext = createMockOutputContext(counters, appId, uniqueId);
+ this.outputContext = createMockOutputContext(counters, appId, uniqueId, getConf());
}
public static Configuration getConf() {
@@ -753,7 +754,7 @@ public class TestPipelinedSorter {
}
private static OutputContext createMockOutputContext(TezCounters counters, ApplicationId appId,
- String uniqueId) throws IOException {
+ String uniqueId, Configuration conf) throws IOException {
OutputContext outputContext = mock(OutputContext.class);
ExecutionContext execContext = new ExecutionContextImpl("localhost");
@@ -762,7 +763,8 @@ public class TestPipelinedSorter {
serviceProviderMetaData.writeInt(80);
doReturn(ByteBuffer.wrap(serviceProviderMetaData.getData())).when(outputContext)
.getServiceProviderMetaData
- (ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
+ (conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+ TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT));
doReturn(execContext).when(outputContext).getExecutionContext();
doReturn(mock(OutputStatisticsReporter.class)).when(outputContext).getStatisticsReporter();
http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
index e0374a3..73d249c 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
@@ -49,6 +49,7 @@ import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.ExecutionContext;
@@ -463,7 +464,8 @@ public class TestDefaultSorter {
doReturn("v1").when(context).getDestinationVertexName();
doReturn(ByteBuffer.wrap(serviceProviderMetaData.getData())).when(context)
.getServiceProviderMetaData
- (ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
+ (conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+ TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT));
doAnswer(new Answer() {
@Override public Object answer(InvocationOnMock invocation) throws Throwable {
long requestedSize = (Long) invocation.getArguments()[0];
http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
index 41b2b97..4a0d1d5 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
@@ -48,6 +48,7 @@ import java.util.Set;
import java.util.UUID;
import com.google.protobuf.ByteString;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.runtime.api.TaskFailureType;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
@@ -117,6 +118,7 @@ public class TestUnorderedPartitionedKVWriter {
private boolean shouldCompress;
private ReportPartitionStats reportPartitionStats;
+ private Configuration defaultConf = new Configuration();
public TestUnorderedPartitionedKVWriter(boolean shouldCompress,
ReportPartitionStats reportPartitionStats) {
@@ -160,7 +162,7 @@ public class TestUnorderedPartitionedKVWriter {
ApplicationId appId = ApplicationId.newInstance(10000000, 1);
TezCounters counters = new TezCounters();
String uniqueId = UUID.randomUUID().toString();
- OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId);
+ OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, defaultConf);
int maxSingleBufferSizeBytes = 2047;
Configuration conf = createConfiguration(outputContext, IntWritable.class, LongWritable.class,
@@ -256,7 +258,7 @@ public class TestUnorderedPartitionedKVWriter {
ApplicationId appId = ApplicationId.newInstance(10000000, 1);
TezCounters counters = new TezCounters();
String uniqueId = UUID.randomUUID().toString();
- OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId);
+ OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, defaultConf);
Random random = new Random();
Configuration conf = createConfiguration(outputContext, Text.class, Text.class, shouldCompress,
@@ -524,7 +526,7 @@ public class TestUnorderedPartitionedKVWriter {
ApplicationId appId = ApplicationId.newInstance(10000000, 1);
TezCounters counters = new TezCounters();
String uniqueId = UUID.randomUUID().toString();
- OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId);
+ OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, defaultConf);
Configuration conf = createConfiguration(outputContext, IntWritable.class, LongWritable.class,
shouldCompress, -1);
@@ -708,7 +710,7 @@ public class TestUnorderedPartitionedKVWriter {
ApplicationId appId = ApplicationId.newInstance(10000000, 1);
TezCounters counters = new TezCounters();
String uniqueId = UUID.randomUUID().toString();
- OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId);
+ OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, defaultConf);
Configuration conf = createConfiguration(outputContext, IntWritable.class, LongWritable.class,
shouldCompress, -1);
@@ -899,7 +901,7 @@ public class TestUnorderedPartitionedKVWriter {
}
private OutputContext createMockOutputContext(TezCounters counters, ApplicationId appId,
- String uniqueId) {
+ String uniqueId, Configuration conf) {
OutputContext outputContext = mock(OutputContext.class);
doReturn(counters).when(outputContext).getCounters();
doReturn(appId).when(outputContext).getApplicationId();
@@ -922,7 +924,8 @@ public class TestUnorderedPartitionedKVWriter {
portBuffer.reset();
return portBuffer;
}
- }).when(outputContext).getServiceProviderMetaData(eq(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID));
+ }).when(outputContext).getServiceProviderMetaData(eq(conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+ TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT)));
Path outDirBase = new Path(TEST_ROOT_DIR, "outDir_" + uniqueId);
String[] outDirs = new String[] { outDirBase.toString() };
http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
index 93c4f92..7762025 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.io.Text;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.Event;
@@ -407,7 +408,8 @@ public class TestOnFileSortedOutput {
doReturn("v1").when(context).getDestinationVertexName();
doReturn(ByteBuffer.wrap(serviceProviderMetaData.getData())).when(context)
.getServiceProviderMetaData
- (ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
+ (conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+ TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT));
doAnswer(new Answer() {
@Override public Object answer(InvocationOnMock invocation) throws Throwable {
long requestedSize = (Long) invocation.getArguments()[0];
http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
index 38a60a2..bf55cb3 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
@@ -41,6 +41,7 @@ import java.util.Map;
import com.google.protobuf.ByteString;
import org.apache.commons.lang.RandomStringUtils;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.hadoop.shim.DefaultHadoopShim;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -230,7 +231,8 @@ public class TestOnFileUnorderedKVOutput {
ByteBuffer bb = ByteBuffer.allocate(4);
bb.putInt(shufflePort);
bb.position(0);
- AuxiliaryServiceHelper.setServiceDataIntoEnv(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, bb, auxEnv);
+ AuxiliaryServiceHelper.setServiceDataIntoEnv(conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+ TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT), bb, auxEnv);
OutputDescriptor outputDescriptor = mock(OutputDescriptor.class);