You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ha...@apache.org on 2020/05/27 05:40:34 UTC
[tez] branch master updated: TEZ-4137 : Input/Output/Processor
should merge payload to local conf (Mustafa Iman via Laszlo Bodor,
Ashutosh Chauhan)
This is an automated email from the ASF dual-hosted git repository.
hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push:
new 0411a2e TEZ-4137 : Input/Output/Processor should merge payload to local conf (Mustafa Iman via Laszlo Bodor, Ashutosh Chauhan)
0411a2e is described below
commit 0411a2e0244dc4e222e63b1e6f56ca8524e2d3aa
Author: Mustafa Iman <mu...@gmail.com>
AuthorDate: Tue May 26 22:39:21 2020 -0700
TEZ-4137 : Input/Output/Processor should merge payload to local conf (Mustafa Iman via Laszlo Bodor, Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
---
.../main/java/org/apache/tez/common/TezUtils.java | 22 ++++++++++++
.../tez/runtime/api/InputInitializerContext.java | 9 ++++-
.../org/apache/tez/runtime/api/TaskContext.java | 7 ++++
.../impl/TezRootInputInitializerContextImpl.java | 8 ++++-
.../mapreduce/common/MRInputAMSplitGenerator.java | 5 ++-
.../tez/mapreduce/input/base/MRInputBase.java | 5 +--
.../org/apache/tez/mapreduce/output/MROutput.java | 6 ++--
.../org/apache/tez/mapreduce/TezTestUtils.java | 10 +++++-
.../common/TestMRInputAMSplitGenerator.java | 2 +-
.../common/TestMRInputSplitDistributor.java | 6 ++--
.../apache/tez/mapreduce/input/MRInputForTest.java | 35 ++++++++++++++++++
.../tez/mapreduce/input/MultiMRInputForTest.java | 35 ++++++++++++++++++
.../apache/tez/mapreduce/input/TestMRInput.java | 39 ++++++++++++++++++++
.../tez/mapreduce/input/TestMultiMRInput.java | 34 ++++++++++++++----
.../apache/tez/mapreduce/output/TestMROutput.java | 42 ++++++++++++++++++----
.../tez/mapreduce/output/TestMROutputLegacy.java | 2 ++
.../tez/mapreduce/output/TestMultiMROutput.java | 30 ++++++++++++++++
.../tez/runtime/api/impl/TezTaskContextImpl.java | 7 ++++
.../library/input/OrderedGroupedKVInput.java | 4 +--
.../runtime/library/input/UnorderedKVInput.java | 4 +--
.../library/output/OrderedPartitionedKVOutput.java | 4 +--
.../runtime/library/output/UnorderedKVOutput.java | 9 ++---
.../output/UnorderedPartitionedKVOutput.java | 8 +++--
.../library/input/TestOrderedGroupedKVInput.java | 30 ++++++++++++++++
.../runtime/library/output/OutputTestHelpers.java | 7 ++--
.../library/output/TestOnFileSortedOutput.java | 3 +-
.../output/TestOnFileUnorderedKVOutput.java | 36 ++++++++++++++-----
.../output/TestOrderedPartitionedKVOutput2.java | 19 ++++++++--
.../library/output/TestUnorderedKVOutput2.java | 17 ++++++++-
.../output/TestUnorderedPartitionedKVOutput2.java | 19 ++++++++++
30 files changed, 410 insertions(+), 54 deletions(-)
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
index 51311ff..23811aa 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
@@ -30,6 +30,7 @@ import java.util.Objects;
import com.google.protobuf.ByteString;
import com.google.protobuf.CodedInputStream;
+import org.apache.tez.runtime.api.TaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -120,6 +121,27 @@ public class TezUtils {
}
}
+ public static Configuration createConfFromBaseConfAndPayload(TaskContext context)
+ throws IOException {
+ Configuration baseConf = context.getContainerConfiguration();
+ Configuration configuration = new Configuration(baseConf);
+ UserPayload payload = context.getUserPayload();
+ ByteString byteString = ByteString.copyFrom(payload.getPayload());
+ try(SnappyInputStream uncompressIs = new SnappyInputStream(byteString.newInput())) {
+ DAGProtos.ConfigurationProto confProto = DAGProtos.ConfigurationProto.parseFrom(uncompressIs);
+ readConfFromPB(confProto, configuration);
+ return configuration;
+ }
+ }
+
+ public static void addToConfFromByteString(Configuration configuration, ByteString byteString)
+ throws IOException {
+ try(SnappyInputStream uncompressIs = new SnappyInputStream(byteString.newInput())) {
+ DAGProtos.ConfigurationProto confProto = DAGProtos.ConfigurationProto.parseFrom(uncompressIs);
+ readConfFromPB(confProto, configuration);
+ }
+ }
+
/**
* Convert an instance of {@link org.apache.tez.dag.api.UserPayload} to {@link
* org.apache.hadoop.conf.Configuration}
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializerContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializerContext.java
index ccfac46..7c9562e 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializerContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializerContext.java
@@ -24,6 +24,7 @@ import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.common.counters.TezCounters;
@@ -49,7 +50,13 @@ public interface InputInitializerContext {
* @return DAG name
*/
String getDAGName();
-
+
+ /**
+ * Get vertex configuration
+ * @return Vertex configuration
+ */
+ Configuration getVertexConfiguration();
+
/**
* Get the name of the input
* @return Input name
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java
index dd2951a..1ba1a90 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java
@@ -27,6 +27,7 @@ import javax.annotation.Nullable;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.UserPayload;
@@ -63,6 +64,12 @@ public interface TaskContext {
public int getTaskAttemptNumber();
/**
+ * Get container configuration
+ * @return Container configuration
+ */
+ public Configuration getContainerConfiguration();
+
+ /**
* Get the name of the DAG
* @return the DAG name
*/
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
index 4376487..a994359 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
@@ -23,6 +23,7 @@ package org.apache.tez.dag.app.dag.impl;
import java.util.Set;
import java.util.Objects;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.common.counters.TezCounters;
@@ -85,7 +86,12 @@ public class TezRootInputInitializerContextImpl implements
public UserPayload getUserPayload() {
return this.input.getControllerDescriptor().getUserPayload();
}
-
+
+ @Override
+ public Configuration getVertexConfiguration() {
+ return vertex.getConf();
+ }
+
@Override
public int getNumTasks() {
return vertex.getTotalTasks();
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
index dbfdcb3..d06a5f4 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.VertexLocationHint;
@@ -80,8 +79,8 @@ public class MRInputAMSplitGenerator extends InputInitializer {
+ sw.now(TimeUnit.MILLISECONDS));
}
sw.reset().start();
- Configuration conf = TezUtils.createConfFromByteString(userPayloadProto
- .getConfigurationBytes());
+ Configuration conf = new JobConf(getContext().getVertexConfiguration());
+ TezUtils.addToConfFromByteString(conf, userPayloadProto.getConfigurationBytes());
sendSerializedEvents = conf.getBoolean(
MRJobConfig.MR_TEZ_INPUT_INITIALIZER_SERIALIZE_EVENT_PAYLOAD,
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java
index d8c531e..ccae0b1 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java
@@ -72,8 +72,9 @@ public abstract class MRInputBase extends AbstractLogicalInput {
boolean isGrouped = mrUserPayload.getGroupingEnabled();
Preconditions.checkArgument(mrUserPayload.hasSplits() == false,
"Split information not expected in " + this.getClass().getName());
- Configuration conf = TezUtils
- .createConfFromByteString(mrUserPayload.getConfigurationBytes());
+
+ Configuration conf = new JobConf(getContext().getContainerConfiguration());
+ TezUtils.addToConfFromByteString(conf, mrUserPayload.getConfigurationBytes());
this.jobConf = new JobConf(conf);
useNewApi = this.jobConf.getUseNewMapper();
if (isGrouped) {
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index 18047eb..950e629 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -29,6 +29,7 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ByteString;
import org.apache.tez.common.Preconditions;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
@@ -398,8 +399,9 @@ public class MROutput extends AbstractLogicalOutput {
taskNumberFormat.setGroupingUsed(false);
nonTaskNumberFormat.setMinimumIntegerDigits(3);
nonTaskNumberFormat.setGroupingUsed(false);
- Configuration conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
- this.jobConf = new JobConf(conf);
+ UserPayload userPayload = getContext().getUserPayload();
+ this.jobConf = new JobConf(getContext().getContainerConfiguration());
+ TezUtils.addToConfFromByteString(this.jobConf, ByteString.copyFrom(userPayload.getPayload()));
// Add tokens to the jobConf - in case they are accessed within the RW / OF
jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());
this.isMapperOutput = jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR,
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java
index 369afbe..83c28dd 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java
@@ -17,6 +17,7 @@
*/
package org.apache.tez.mapreduce;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.common.counters.TezCounters;
@@ -59,10 +60,12 @@ public class TezTestUtils {
private final ApplicationId appId;
private final UserPayload payload;
+ private final Configuration vertexConfig;
- public TezRootInputInitializerContextForTest(UserPayload payload) throws IOException {
+ public TezRootInputInitializerContextForTest(UserPayload payload, Configuration vertexConfig) throws IOException {
appId = ApplicationId.newInstance(1000, 200);
this.payload = payload == null ? UserPayload.create(null) : payload;
+ this.vertexConfig = vertexConfig;
}
@Override
@@ -76,6 +79,11 @@ public class TezTestUtils {
}
@Override
+ public Configuration getVertexConfiguration() {
+ return vertexConfig;
+ }
+
+ @Override
public String getInputName() {
return "MRInput";
}
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputAMSplitGenerator.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputAMSplitGenerator.java
index 6cf2700..9f6ac3b 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputAMSplitGenerator.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputAMSplitGenerator.java
@@ -96,7 +96,7 @@ public class TestMRInputAMSplitGenerator {
UserPayload userPayload = dataSource.getInputDescriptor().getUserPayload();
InputInitializerContext context =
- new TezTestUtils.TezRootInputInitializerContextForTest(userPayload);
+ new TezTestUtils.TezRootInputInitializerContextForTest(userPayload, new Configuration(false));
MRInputAMSplitGenerator splitGenerator =
new MRInputAMSplitGenerator(context);
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
index 3772cde..4aaa7e2 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
@@ -70,7 +70,8 @@ public class TestMRInputSplitDistributor {
UserPayload userPayload =
UserPayload.create(payloadProto.build().toByteString().asReadOnlyByteBuffer());
- InputInitializerContext context = new TezTestUtils.TezRootInputInitializerContextForTest(userPayload);
+ InputInitializerContext context = new TezTestUtils.TezRootInputInitializerContextForTest(userPayload,
+ new Configuration(false));
MRInputSplitDistributor splitDist = new MRInputSplitDistributor(context);
List<Event> events = splitDist.initialize();
@@ -119,7 +120,8 @@ public class TestMRInputSplitDistributor {
UserPayload userPayload =
UserPayload.create(payloadProto.build().toByteString().asReadOnlyByteBuffer());
- InputInitializerContext context = new TezTestUtils.TezRootInputInitializerContextForTest(userPayload);
+ InputInitializerContext context = new TezTestUtils.TezRootInputInitializerContextForTest(userPayload,
+ new Configuration(false));
MRInputSplitDistributor splitDist = new MRInputSplitDistributor(context);
List<Event> events = splitDist.initialize();
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/MRInputForTest.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/MRInputForTest.java
new file mode 100644
index 0000000..0d1d24f
--- /dev/null
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/MRInputForTest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.input;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.runtime.api.InputContext;
+
+/**
+ * This is used for inspecting jobConf in test.
+ */
+public class MRInputForTest extends MRInput {
+ public MRInputForTest(InputContext inputContext, int numPhysicalInputs) {
+ super(inputContext, numPhysicalInputs);
+ }
+
+ public Configuration getConfiguration() {
+ return jobConf;
+ }
+}
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/MultiMRInputForTest.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/MultiMRInputForTest.java
new file mode 100644
index 0000000..f0f0a77
--- /dev/null
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/MultiMRInputForTest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.input;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.runtime.api.InputContext;
+
+/**
+ * This is used for inspecting jobConf in test.
+ */
+public class MultiMRInputForTest extends MultiMRInput {
+ public MultiMRInputForTest(InputContext inputContext, int numPhysicalInputs) {
+ super(inputContext, numPhysicalInputs);
+ }
+
+ public Configuration getConfiguration() {
+ return jobConf;
+ }
+}
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java
index 9109cd9..5ca5c26 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java
@@ -69,6 +69,7 @@ public class TestMRInput {
doReturn(1).when(inputContext).getTaskIndex();
doReturn(1).when(inputContext).getTaskAttemptNumber();
doReturn(new TezCounters()).when(inputContext).getCounters();
+ doReturn(new JobConf(false)).when(inputContext).getContainerConfiguration();
MRInput mrInput = new MRInput(inputContext, 0);
@@ -120,6 +121,7 @@ public class TestMRInput {
doReturn(TEST_ATTRIBUTES_INPUT_NAME).when(inputContext).getSourceVertexName();
doReturn(TEST_ATTRIBUTES_APPLICATION_ID).when(inputContext).getApplicationId();
doReturn(TEST_ATTRIBUTES_UNIQUE_IDENTIFIER).when(inputContext).getUniqueIdentifier();
+ doReturn(new Configuration(false)).when(inputContext).getContainerConfiguration();
DataSourceDescriptor dsd = MRInput.createConfigBuilder(new Configuration(false),
@@ -147,6 +149,43 @@ public class TestMRInput {
assertTrue(TestInputFormat.invoked.get());
}
+ @Test(timeout = 5000)
+ public void testConfigMerge() throws Exception {
+ JobConf jobConf = new JobConf(false);
+ jobConf.set("payload-key", "payload-value");
+
+ Configuration localConfig = new Configuration(false);
+ localConfig.set("local-key", "local-value");
+
+ InputContext inputContext = mock(InputContext.class);
+
+ DataSourceDescriptor dsd = MRInput.createConfigBuilder(jobConf,
+ TestInputFormat.class).groupSplits(false).build();
+
+ doReturn(dsd.getInputDescriptor().getUserPayload()).when(inputContext).getUserPayload();
+ doReturn(TEST_ATTRIBUTES_DAG_INDEX).when(inputContext).getDagIdentifier();
+ doReturn(TEST_ATTRIBUTES_VERTEX_INDEX).when(inputContext).getTaskVertexIndex();
+ doReturn(TEST_ATTRIBUTES_TASK_INDEX).when(inputContext).getTaskIndex();
+ doReturn(TEST_ATTRIBUTES_TASK_ATTEMPT_INDEX).when(inputContext).getTaskAttemptNumber();
+ doReturn(TEST_ATTRIBUTES_INPUT_INDEX).when(inputContext).getInputIndex();
+ doReturn(TEST_ATTRIBUTES_DAG_ATTEMPT_NUMBER).when(inputContext).getDAGAttemptNumber();
+ doReturn(TEST_ATTRIBUTES_DAG_NAME).when(inputContext).getDAGName();
+ doReturn(TEST_ATTRIBUTES_VERTEX_NAME).when(inputContext).getTaskVertexName();
+ doReturn(TEST_ATTRIBUTES_INPUT_NAME).when(inputContext).getSourceVertexName();
+ doReturn(TEST_ATTRIBUTES_APPLICATION_ID).when(inputContext).getApplicationId();
+ doReturn(TEST_ATTRIBUTES_UNIQUE_IDENTIFIER).when(inputContext).getUniqueIdentifier();
+ doReturn(localConfig).when(inputContext).getContainerConfiguration();
+ doReturn(new TezCounters()).when(inputContext).getCounters();
+
+ MRInputForTest input = new MRInputForTest(inputContext, 1);
+ input.initialize();
+
+ Configuration mergedConfig = input.getConfiguration();
+
+ assertEquals("local-value", mergedConfig.get("local-key"));
+ assertEquals("payload-value", mergedConfig.get("payload-key"));
+ }
+
/**
* Test class to verify
*/
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
index 8d77a05..bd6e891 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
@@ -102,7 +102,7 @@ public class TestMultiMRInput {
jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class);
FileInputFormat.setInputPaths(jobConf, workDir);
- InputContext inputContext = createTezInputContext(jobConf);
+ InputContext inputContext = createTezInputContext(jobConf, new Configuration(false));
MultiMRInput mMrInput = new MultiMRInput(inputContext, 0);
@@ -122,6 +122,25 @@ public class TestMultiMRInput {
}
@Test(timeout = 5000)
+ public void testConfigMerge() throws Exception {
+ JobConf jobConf = new JobConf(false);
+ jobConf.set("payload-key", "payload-value");
+
+ Configuration localConfig = new Configuration(false);
+ localConfig.set("local-key", "local-value");
+
+ InputContext inputContext = createTezInputContext(jobConf, localConfig);
+
+ MultiMRInputForTest input = new MultiMRInputForTest(inputContext, 1);
+ input.initialize();
+
+ Configuration mergedConfig = input.getConfiguration();
+
+ assertEquals("local-value", mergedConfig.get("local-key"));
+ assertEquals("payload-value", mergedConfig.get("payload-key"));
+ }
+
+ @Test(timeout = 5000)
public void testSingleSplit() throws Exception {
Path workDir = new Path(TEST_ROOT_DIR, "testSingleSplit");
@@ -129,7 +148,7 @@ public class TestMultiMRInput {
jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class);
FileInputFormat.setInputPaths(jobConf, workDir);
- InputContext inputContext = createTezInputContext(jobConf);
+ InputContext inputContext = createTezInputContext(jobConf, new Configuration(false));
MultiMRInput input = new MultiMRInput(inputContext, 1);
input.initialize();
@@ -180,7 +199,7 @@ public class TestMultiMRInput {
splitProto.toByteString().asReadOnlyByteBuffer());
// Create input context.
- InputContext inputContext = createTezInputContext(conf);
+ InputContext inputContext = createTezInputContext(conf, new Configuration(false));
// Create the MR input object and process the event
MultiMRInput input = new MultiMRInput(inputContext, 1);
@@ -198,7 +217,7 @@ public class TestMultiMRInput {
jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class);
FileInputFormat.setInputPaths(jobConf, workDir);
- InputContext inputContext = createTezInputContext(jobConf);
+ InputContext inputContext = createTezInputContext(jobConf, new Configuration(false));
MultiMRInput input = new MultiMRInput(inputContext, 2);
input.initialize();
@@ -265,7 +284,7 @@ public class TestMultiMRInput {
jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class);
FileInputFormat.setInputPaths(jobConf, workDir);
- InputContext inputContext = createTezInputContext(jobConf);
+ InputContext inputContext = createTezInputContext(jobConf, new Configuration(false));
MultiMRInput input = new MultiMRInput(inputContext, 1);
input.initialize();
@@ -308,10 +327,10 @@ public class TestMultiMRInput {
return data;
}
- private InputContext createTezInputContext(Configuration conf) throws Exception {
+ private InputContext createTezInputContext(Configuration payloadConf, Configuration baseConf) throws Exception {
MRInputUserPayloadProto.Builder builder = MRInputUserPayloadProto.newBuilder();
builder.setGroupingEnabled(false);
- builder.setConfigurationBytes(TezUtils.createByteStringFromConf(conf));
+ builder.setConfigurationBytes(TezUtils.createByteStringFromConf(payloadConf));
byte[] payload = builder.build().toByteArray();
ApplicationId applicationId = ApplicationId.newInstance(10000, 1);
@@ -330,6 +349,7 @@ public class TestMultiMRInput {
doReturn(UUID.randomUUID().toString()).when(inputContext).getUniqueIdentifier();
doReturn("taskVertexName").when(inputContext).getTaskVertexName();
doReturn(UserPayload.create(ByteBuffer.wrap(payload))).when(inputContext).getUserPayload();
+ doReturn(baseConf).when(inputContext).getContainerConfiguration();
return inputContext;
}
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
index c60ca22..bfc09dc 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
@@ -94,7 +94,8 @@ public class TestMROutput {
tmpDir.getPath())
.build();
- OutputContext outputContext = createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload());
+ OutputContext outputContext = createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload(),
+ new Configuration(false));
MROutput output = new MROutput(outputContext, 2);
output.initialize();
@@ -109,6 +110,27 @@ public class TestMROutput {
assertEquals(FileOutputCommitter.class, output.committer.getClass());
}
+ @Test
+ public void testMergeConfig() throws Exception {
+ String outputPath = "/tmp/output";
+ Configuration localConf = new Configuration(false);
+ localConf.set("local-key", "local-value");
+ DataSinkDescriptor dataSink = MROutput
+ .createConfigBuilder(localConf, org.apache.hadoop.mapred.TextOutputFormat.class, outputPath)
+ .build();
+
+ Configuration baseConf = new Configuration(false);
+ baseConf.set("base-key", "base-value");
+
+ OutputContext outputContext = createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload(), baseConf);
+ MROutput output = new MROutput(outputContext, 2);
+ output.initialize();
+
+ Configuration mergedConf = output.jobConf;
+ assertEquals("local-value", mergedConf.get("local-key"));
+ assertEquals("base-value", mergedConf.get("base-key"));
+ }
+
@Test(timeout = 5000)
public void testOldAPI_TextOutputFormat() throws Exception {
Configuration conf = new Configuration();
@@ -119,7 +141,8 @@ public class TestMROutput {
tmpDir.getPath())
.build();
- OutputContext outputContext = createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload());
+ OutputContext outputContext = createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload(),
+ new Configuration(false));
MROutput output = new MROutput(outputContext, 2);
output.initialize();
@@ -144,7 +167,8 @@ public class TestMROutput {
tmpDir.getPath())
.build();
- OutputContext outputContext = createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload());
+ OutputContext outputContext = createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload(),
+ new Configuration(false));
MROutput output = new MROutput(outputContext, 2);
output.initialize();
assertEquals(true, output.useNewApi);
@@ -169,7 +193,8 @@ public class TestMROutput {
tmpDir.getPath())
.build();
- OutputContext outputContext = createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload());
+ OutputContext outputContext = createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload(),
+ new Configuration(false));
MROutput output = new MROutput(outputContext, 2);
output.initialize();
assertEquals(false, output.useNewApi);
@@ -194,7 +219,8 @@ public class TestMROutput {
tmpDir.getPath())
.build();
- OutputContext outputContext = createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload());
+ OutputContext outputContext = createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload(),
+ new Configuration(false));
MROutput output = new MROutput(outputContext, 2);
output.initialize();
@@ -220,7 +246,8 @@ public class TestMROutput {
tmpDir.getPath())
.build();
- OutputContext outputContext = createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload());
+ OutputContext outputContext = createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload(),
+ new Configuration(false));
MROutput output = new MROutput(outputContext, 2);
output.initialize();
@@ -235,7 +262,7 @@ public class TestMROutput {
assertEquals(org.apache.hadoop.mapred.FileOutputCommitter.class, output.committer.getClass());
}
- private OutputContext createMockOutputContext(UserPayload payload) {
+ private OutputContext createMockOutputContext(UserPayload payload, Configuration baseConf) {
OutputContext outputContext = mock(OutputContext.class);
ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
when(outputContext.getUserPayload()).thenReturn(payload);
@@ -243,6 +270,7 @@ public class TestMROutput {
when(outputContext.getTaskVertexIndex()).thenReturn(1);
when(outputContext.getTaskAttemptNumber()).thenReturn(1);
when(outputContext.getCounters()).thenReturn(new TezCounters());
+ when(outputContext.getContainerConfiguration()).thenReturn(baseConf);
return outputContext;
}
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutputLegacy.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutputLegacy.java
index 01b5c84..60596be 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutputLegacy.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutputLegacy.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
@@ -182,6 +183,7 @@ public class TestMROutputLegacy {
when(outputContext.getTaskVertexIndex()).thenReturn(1);
when(outputContext.getTaskAttemptNumber()).thenReturn(1);
when(outputContext.getCounters()).thenReturn(new TezCounters());
+ when(outputContext.getContainerConfiguration()).thenReturn(new Configuration(false));
return outputContext;
}
}
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMultiMROutput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMultiMROutput.java
index c8eca16..2662827 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMultiMROutput.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMultiMROutput.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
@@ -107,6 +108,34 @@ public class TestMultiMROutput {
}
}
+ @Test
+ public void testMergeConf() throws Exception {
+ JobConf payloadConf = new JobConf();
+ payloadConf.set("local-key", "local-value");
+ DataSinkDescriptor dataSink = MultiMROutput.createConfigBuilder(
+ payloadConf, SequenceFileOutputFormat.class, "/output", false).build();
+
+ Configuration baseConf = new Configuration(false);
+ baseConf.set("base-key", "base-value");
+
+ OutputContext outputContext = mock(OutputContext.class);
+ ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ when(outputContext.getUserPayload()).thenReturn(dataSink.getOutputDescriptor().getUserPayload());
+ when(outputContext.getApplicationId()).thenReturn(appId);
+ when(outputContext.getTaskVertexIndex()).thenReturn(1);
+ when(outputContext.getTaskAttemptNumber()).thenReturn(1);
+ when(outputContext.getCounters()).thenReturn(new TezCounters());
+ when(outputContext.getStatisticsReporter()).thenReturn(mock(OutputStatisticsReporter.class));
+ when(outputContext.getContainerConfiguration()).thenReturn(baseConf);
+
+ MultiMROutput output = new MultiMROutput(outputContext, 2);
+ output.initialize();
+
+ Configuration mergedConf = output.jobConf;
+ assertEquals("base-value", mergedConf.get("base-key"));
+ assertEquals("local-value", mergedConf.get("local-key"));
+ }
+
private OutputContext createMockOutputContext(UserPayload payload) {
OutputContext outputContext = mock(OutputContext.class);
ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
@@ -117,6 +146,7 @@ public class TestMultiMROutput {
when(outputContext.getCounters()).thenReturn(new TezCounters());
when(outputContext.getStatisticsReporter()).thenReturn(
mock(OutputStatisticsReporter.class));
+ when(outputContext.getContainerConfiguration()).thenReturn(new Configuration(false));
return outputContext;
}
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
index dccde82..a47dac1 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
@@ -56,6 +56,7 @@ public abstract class TezTaskContextImpl implements TaskContext, Closeable {
protected final String taskVertexName;
protected final TezTaskAttemptID taskAttemptID;
private final TezCounters counters;
+ private Configuration configuration;
private String[] workDirs;
private String uniqueIdentifier;
protected final LogicalIOProcessorRuntimeTask runtimeTask;
@@ -91,6 +92,7 @@ public abstract class TezTaskContextImpl implements TaskContext, Closeable {
Objects.requireNonNull(descriptor, "descriptor is null");
Objects.requireNonNull(sharedExecutor, "sharedExecutor is null");
this.dagName = dagName;
+ this.configuration = conf;
this.taskVertexName = taskVertexName;
this.taskAttemptID = taskAttemptID;
this.counters = counters;
@@ -136,6 +138,11 @@ public abstract class TezTaskContextImpl implements TaskContext, Closeable {
}
@Override
+ public Configuration getContainerConfiguration() {
+ return configuration;
+ }
+
+ @Override
public String getDAGName() {
return dagName;
}
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
index c1879bc..2b405bb 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
@@ -28,6 +28,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.runtime.api.ProgressFailedException;
import org.apache.tez.runtime.library.api.IOInterruptedException;
import org.apache.tez.runtime.library.common.Constants;
@@ -37,7 +38,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.RawComparator;
-import org.apache.tez.common.TezUtils;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
@@ -97,7 +97,7 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
@Override
public synchronized List<Event> initialize() throws IOException {
- this.conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+ this.conf = TezUtils.createConfFromBaseConfAndPayload(getContext());
if (this.getNumPhysicalInputs() == 0) {
getContext().requestInitialMemory(0l, null);
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
index 401066d..1db7869 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
@@ -24,6 +24,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.runtime.api.ProgressFailedException;
import org.apache.tez.runtime.library.common.Constants;
@@ -36,7 +37,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.TezUtils;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
@@ -88,7 +88,7 @@ public class UnorderedKVInput extends AbstractLogicalInput {
@Override
public synchronized List<Event> initialize() throws Exception {
Preconditions.checkArgument(getNumPhysicalInputs() != -1, "Number of Inputs has not been set");
- this.conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+ this.conf = TezUtils.createConfFromBaseConfAndPayload(getContext());
if (getNumPhysicalInputs() == 0) {
getContext().requestInitialMemory(0l, null);
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
index 86c20dd..676fe17 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
@@ -30,6 +30,7 @@ import java.util.zip.Deflater;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.runtime.library.conf.OrderedPartitionedKVOutputConfig.SorterImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,7 +41,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
-import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.runtime.api.AbstractLogicalOutput;
import org.apache.tez.runtime.api.Event;
@@ -90,7 +90,7 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput {
@Override
public synchronized List<Event> initialize() throws IOException {
this.startTime = System.nanoTime();
- this.conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+ this.conf = TezUtils.createConfFromBaseConfAndPayload(getContext());
this.localFs = (RawLocalFileSystem) FileSystem.getLocal(conf).getRaw();
// Initializing this parametr in this conf since it is used in multiple
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
index 85368f6..e7a4429 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,7 +33,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.TezUtils;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.counters.TaskCounter;
@@ -62,8 +62,9 @@ public class UnorderedKVOutput extends AbstractLogicalOutput {
@VisibleForTesting
UnorderedPartitionedKVWriter kvWriter;
-
- private Configuration conf;
+
+ @VisibleForTesting
+ Configuration conf;
private MemoryUpdateCallbackHandler memoryUpdateCallbackHandler;
private final AtomicBoolean isStarted = new AtomicBoolean(false);
@@ -76,7 +77,7 @@ public class UnorderedKVOutput extends AbstractLogicalOutput {
@Override
public synchronized List<Event> initialize()
throws Exception {
- this.conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+ this.conf = TezUtils.createConfFromBaseConfAndPayload(getContext());
this.conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS,
getContext().getWorkDirs());
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
index 5e223d6..439b732 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
@@ -26,14 +26,15 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.tez.common.Preconditions;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.TezUtils;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.counters.TaskCounter;
@@ -57,7 +58,8 @@ public class UnorderedPartitionedKVOutput extends AbstractLogicalOutput {
private static final Logger LOG = LoggerFactory.getLogger(UnorderedPartitionedKVOutput.class);
- private Configuration conf;
+ @VisibleForTesting
+ Configuration conf;
private MemoryUpdateCallbackHandler memoryUpdateCallbackHandler;
private UnorderedPartitionedKVWriter kvWriter;
private final AtomicBoolean isStarted = new AtomicBoolean(false);
@@ -68,7 +70,7 @@ public class UnorderedPartitionedKVOutput extends AbstractLogicalOutput {
@Override
public synchronized List<Event> initialize() throws Exception {
- this.conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+ this.conf = TezUtils.createConfFromBaseConfAndPayload(getContext());
this.conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, getContext().getWorkDirs());
this.conf.setInt(TezRuntimeFrameworkConfigs.TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS,
getNumPhysicalOutputs());
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestOrderedGroupedKVInput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestOrderedGroupedKVInput.java
index d4be802..56b6805 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestOrderedGroupedKVInput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestOrderedGroupedKVInput.java
@@ -14,6 +14,7 @@
package org.apache.tez.runtime.library.input;
+import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
@@ -57,6 +58,34 @@ public class TestOrderedGroupedKVInput {
}
+ @Test
+ public void testMergeConfig() throws IOException, TezException {
+ Configuration baseConf = new Configuration(false);
+ baseConf.set("base-key", "base-value");
+
+ Configuration payloadConf = new Configuration(false);
+ payloadConf.set("local-key", "local-value");
+
+ InputContext inputContext = mock(InputContext.class);
+
+ UserPayload payLoad = TezUtils.createUserPayloadFromConf(payloadConf);
+ String[] workingDirs = new String[]{"workDir1"};
+ TezCounters counters = new TezCounters();
+
+
+ doReturn(payLoad).when(inputContext).getUserPayload();
+ doReturn(workingDirs).when(inputContext).getWorkDirs();
+ doReturn(counters).when(inputContext).getCounters();
+ doReturn(baseConf).when(inputContext).getContainerConfiguration();
+
+ OrderedGroupedKVInput input = new OrderedGroupedKVInput(inputContext, 1);
+ input.initialize();
+
+ Configuration mergedConf = input.conf;
+ assertEquals("base-value", mergedConf.get("base-key"));
+ assertEquals("local-value", mergedConf.get("local-key"));
+ }
+
private InputContext createMockInputContext() throws IOException {
InputContext inputContext = mock(InputContext.class);
@@ -70,6 +99,7 @@ public class TestOrderedGroupedKVInput {
doReturn(workingDirs).when(inputContext).getWorkDirs();
doReturn(200 * 1024 * 1024l).when(inputContext).getTotalMemoryAvailableToTask();
doReturn(counters).when(inputContext).getCounters();
+ doReturn(new Configuration(false)).when(inputContext).getContainerConfiguration();
doAnswer(new Answer() {
@Override
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java
index 573d53e..b81c2bd 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java
@@ -51,10 +51,12 @@ class OutputTestHelpers {
doReturn(200 * 1024 * 1024l).when(outputContext).getTotalMemoryAvailableToTask();
doReturn(counters).when(outputContext).getCounters();
doReturn(statsReporter).when(outputContext).getStatisticsReporter();
+ doReturn(new Configuration(false)).when(outputContext).getContainerConfiguration();
return outputContext;
}
- static OutputContext createOutputContext(Configuration conf, Path workingDir) throws IOException {
+ static OutputContext createOutputContext(Configuration conf, Configuration userPayloadConf, Path workingDir)
+ throws IOException {
OutputContext ctx = mock(OutputContext.class);
doAnswer(new Answer<Void>() {
@Override public Void answer(InvocationOnMock invocation) throws Throwable {
@@ -65,7 +67,8 @@ class OutputTestHelpers {
return null;
}
}).when(ctx).requestInitialMemory(anyLong(), any(MemoryUpdateCallback.class));
- doReturn(TezUtils.createUserPayloadFromConf(conf)).when(ctx).getUserPayload();
+ doReturn(conf).when(ctx).getContainerConfiguration();
+ doReturn(TezUtils.createUserPayloadFromConf(userPayloadConf)).when(ctx).getUserPayload();
doReturn("destinationVertex").when(ctx).getDestinationVertexName();
doReturn("UUID").when(ctx).getUniqueIdentifier();
doReturn(new String[] { workingDir.toString() }).when(ctx).getWorkDirs();
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
index 7762025..2c9c3b2 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
@@ -44,7 +44,6 @@ import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
import org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter;
import org.apache.tez.runtime.library.conf.OrderedPartitionedKVOutputConfig.SorterImpl;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
-import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.junit.After;
import org.junit.Assert;
@@ -378,6 +377,7 @@ public class TestOnFileSortedOutput {
private OutputContext createTezOutputContext() throws IOException {
String[] workingDirs = { workingDir.toString() };
+ Configuration localConf = new Configuration(false);
UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf);
DataOutputBuffer serviceProviderMetaData = new DataOutputBuffer();
serviceProviderMetaData.writeInt(PORT);
@@ -400,6 +400,7 @@ public class TestOnFileSortedOutput {
OutputContext context = mock(OutputContext.class);
+ doReturn(localConf).when(context).getContainerConfiguration();
doReturn(counters).when(context).getCounters();
doReturn(workingDirs).when(context).getWorkDirs();
doReturn(payLoad).when(context).getUserPayload();
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
index 393ac2e..963300c 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
@@ -128,7 +128,7 @@ public class TestOnFileUnorderedKVOutput {
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, IntWritable.class.getName());
TezSharedExecutor sharedExecutor = new TezSharedExecutor(conf);
- OutputContext outputContext = createOutputContext(conf, sharedExecutor);
+ OutputContext outputContext = createOutputContext(conf, new Configuration(false), sharedExecutor);
UnorderedKVOutput kvOutput = new UnorderedKVOutput(outputContext, 1);
@@ -161,6 +161,26 @@ public class TestOnFileUnorderedKVOutput {
sharedExecutor.shutdownNow();
}
+ @Test
+ public void testMergeConfig() throws Exception {
+ Configuration baseConf = new Configuration(false);
+ baseConf.set("local-key", "local-value");
+
+ Configuration payloadConf = new Configuration(false);
+ payloadConf.set("base-key", "base-value");
+
+ TezSharedExecutor sharedExecutor = new TezSharedExecutor(baseConf);
+ OutputContext outputContext = createOutputContext(payloadConf, baseConf, sharedExecutor);
+
+ UnorderedKVOutput kvOutput = new UnorderedKVOutput(outputContext, 1);
+
+ kvOutput.initialize();
+
+ Configuration mergedConf = kvOutput.conf;
+ assertEquals("local-value", mergedConf.get("local-key"));
+ assertEquals("base-value", mergedConf.get("base-key"));
+ }
+
@Test(timeout = 30000)
@SuppressWarnings("unchecked")
public void testWithPipelinedShuffle() throws Exception {
@@ -173,7 +193,7 @@ public class TestOnFileUnorderedKVOutput {
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB, 1);
TezSharedExecutor sharedExecutor = new TezSharedExecutor(conf);
- OutputContext outputContext = createOutputContext(conf, sharedExecutor);
+ OutputContext outputContext = createOutputContext(conf, new Configuration(false), sharedExecutor);
UnorderedKVOutput kvOutput = new UnorderedKVOutput(outputContext, 1);
@@ -211,8 +231,8 @@ public class TestOnFileUnorderedKVOutput {
sharedExecutor.shutdownNow();
}
- private OutputContext createOutputContext(Configuration conf, TezSharedExecutor sharedExecutor)
- throws IOException {
+ private OutputContext createOutputContext(Configuration payloadConf, Configuration baseConf,
+ TezSharedExecutor sharedExecutor) throws IOException {
int appAttemptNumber = 1;
TezUmbilical tezUmbilical = mock(TezUmbilical.class);
String dagName = "currentDAG";
@@ -222,7 +242,7 @@ public class TestOnFileUnorderedKVOutput {
TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 1);
- UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
+ UserPayload userPayload = TezUtils.createUserPayloadFromConf(payloadConf);
TaskSpec mockSpec = mock(TaskSpec.class);
when(mockSpec.getInputs()).thenReturn(Collections.singletonList(mock(InputSpec.class)));
@@ -237,17 +257,17 @@ public class TestOnFileUnorderedKVOutput {
ByteBuffer bb = ByteBuffer.allocate(4);
bb.putInt(shufflePort);
bb.position(0);
- AuxiliaryServiceHelper.setServiceDataIntoEnv(conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+ AuxiliaryServiceHelper.setServiceDataIntoEnv(payloadConf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT), bb, auxEnv);
OutputDescriptor outputDescriptor = mock(OutputDescriptor.class);
when(outputDescriptor.getClassName()).thenReturn("OutputDescriptor");
- OutputContext realOutputContext = new TezOutputContextImpl(conf, new String[] {workDir.toString()},
+ OutputContext realOutputContext = new TezOutputContextImpl(baseConf, new String[] {workDir.toString()},
appAttemptNumber, tezUmbilical, dagName, taskVertexName, destinationVertexName,
-1, taskAttemptID, 0, userPayload, runtimeTask,
- null, auxEnv, new MemoryDistributor(1, 1, conf) , outputDescriptor, null,
+ null, auxEnv, new MemoryDistributor(1, 1, payloadConf), outputDescriptor, null,
new ExecutionContextImpl("localhost"), 2048, new TezSharedExecutor(defaultConf));
verify(runtimeTask, times(1)).addAndGetTezCounter(destinationVertexName);
verify(runtimeTask, times(1)).getTaskStatistics();
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOrderedPartitionedKVOutput2.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOrderedPartitionedKVOutput2.java
index f226b7c..29ce890 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOrderedPartitionedKVOutput2.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOrderedPartitionedKVOutput2.java
@@ -69,7 +69,7 @@ public class TestOrderedPartitionedKVOutput2 {
@Test(timeout = 5000)
public void testNonStartedOutput() throws IOException {
- OutputContext outputContext = OutputTestHelpers.createOutputContext(conf, workingDir);
+ OutputContext outputContext = OutputTestHelpers.createOutputContext(conf, conf, workingDir);
int numPartitions = 10;
OrderedPartitionedKVOutput output = new OrderedPartitionedKVOutput(outputContext, numPartitions);
output.initialize();
@@ -94,9 +94,24 @@ public class TestOrderedPartitionedKVOutput2 {
}
}
+ @Test(timeout = 5000)
+ public void testConfigMerge() throws IOException {
+ Configuration localConf = new Configuration(conf);
+ localConf.set("config-from-local", "config-from-local-value");
+ Configuration payload = new Configuration(false);
+ payload.set("config-from-payload", "config-from-payload-value");
+ OutputContext outputContext = OutputTestHelpers.createOutputContext(localConf, payload, workingDir);
+ int numPartitions = 10;
+ OrderedPartitionedKVOutput output = new OrderedPartitionedKVOutput(outputContext, numPartitions);
+ output.initialize();
+ Configuration configAfterMerge = output.conf;
+ assertEquals("config-from-local-value", configAfterMerge.get("config-from-local"));
+ assertEquals("config-from-payload-value", configAfterMerge.get("config-from-payload"));
+ }
+
@Test(timeout = 10000)
public void testClose() throws Exception {
- OutputContext outputContext = OutputTestHelpers.createOutputContext(conf, workingDir);
+ OutputContext outputContext = OutputTestHelpers.createOutputContext(conf, conf, workingDir);
int numPartitions = 10;
OrderedPartitionedKVOutput output = new OrderedPartitionedKVOutput(outputContext, numPartitions);
output.initialize();
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedKVOutput2.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedKVOutput2.java
index 792b03f..a52788e 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedKVOutput2.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedKVOutput2.java
@@ -93,9 +93,24 @@ public class TestUnorderedKVOutput2 {
}
}
+ @Test(timeout = 5000)
+ public void testConfigMerge() throws Exception {
+ Configuration localConf = new Configuration(conf);
+ localConf.set("config-from-local", "config-from-local-value");
+ Configuration payload = new Configuration(false);
+ payload.set("config-from-payload", "config-from-payload-value");
+ OutputContext outputContext = OutputTestHelpers.createOutputContext(localConf, payload, workingDir);
+ int numPartitions = 10;
+ UnorderedKVOutput output = new UnorderedKVOutput(outputContext, numPartitions);
+ output.initialize();
+ Configuration configAfterMerge = output.conf;
+ assertEquals("config-from-local-value", configAfterMerge.get("config-from-local"));
+ assertEquals("config-from-payload-value", configAfterMerge.get("config-from-payload"));
+ }
+
@Test(timeout = 10000)
public void testClose() throws Exception {
- OutputContext outputContext = OutputTestHelpers.createOutputContext(conf, workingDir);
+ OutputContext outputContext = OutputTestHelpers.createOutputContext(conf, conf, workingDir);
int numPartitions = 1;
UnorderedKVOutput output = new UnorderedKVOutput(outputContext, numPartitions);
output.initialize();
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedPartitionedKVOutput2.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedPartitionedKVOutput2.java
index eec4bf5..52e0630 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedPartitionedKVOutput2.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedPartitionedKVOutput2.java
@@ -22,6 +22,8 @@ import java.util.BitSet;
import java.util.List;
import com.google.protobuf.ByteString;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.runtime.api.Event;
@@ -59,4 +61,21 @@ public class TestUnorderedPartitionedKVOutput2 {
assertTrue(emptyPartionsBitSet.get(i));
}
}
+
+ @Test
+ public void testConfigMerge() throws Exception {
+ Configuration userPayloadConf = new Configuration(false);
+ Configuration baseConf = new Configuration(false);
+
+ userPayloadConf.set("local-key", "local-value");
+ baseConf.set("base-key", "base-value");
+ OutputContext outputContext = OutputTestHelpers.createOutputContext(
+ userPayloadConf, baseConf, new Path("/"));
+ UnorderedPartitionedKVOutput output =
+ new UnorderedPartitionedKVOutput(outputContext, 1);
+ output.initialize();
+ Configuration mergedConf = output.conf;
+ assertEquals("base-value", mergedConf.get("base-key"));
+ assertEquals("local-value", mergedConf.get("local-key"));
+ }
}