You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by mi...@apache.org on 2016/10/18 15:48:33 UTC
tez git commit: TEZ-3430. Make split sorting optional. Add missing
newly added test.
Repository: tez
Updated Branches:
refs/heads/master c1a7f10e4 -> 04d609e7f
TEZ-3430. Make split sorting optional. Add missing newly added test.
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/04d609e7
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/04d609e7
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/04d609e7
Branch: refs/heads/master
Commit: 04d609e7f222d5c830e846a2bea4eb770279ed00
Parents: c1a7f10
Author: Ming Ma <mi...@twitter.com>
Authored: Tue Oct 18 08:47:42 2016 -0700
Committer: Ming Ma <mi...@twitter.com>
Committed: Tue Oct 18 08:48:18 2016 -0700
----------------------------------------------------------------------
.../common/TestMRInputAMSplitGenerator.java | 241 +++++++++++++++++++
1 file changed, 241 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/04d609e7/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputAMSplitGenerator.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputAMSplitGenerator.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputAMSplitGenerator.java
new file mode 100644
index 0000000..bd4e5a9
--- /dev/null
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputAMSplitGenerator.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.common;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.split.TezGroupedSplit;
+import org.apache.tez.dag.api.DataSourceDescriptor;
+import org.apache.tez.dag.api.UserPayload;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.mapreduce.TezTestUtils;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.mapreduce.lib.MRInputUtils;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.InputInitializerContext;
+import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
+import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+import org.junit.Test;
+
+import com.google.protobuf.ByteString;
+
+public class TestMRInputAMSplitGenerator {
+
+ private static String SPLITS_LENGTHS = "splits.length";
+
+ @Test(timeout = 5000)
+ public void testGroupSplitsDisabledSortSplitsEnabled()
+ throws Exception {
+ testGroupSplitsAndSortSplits(false, true);
+ }
+
+ @Test(timeout = 5000)
+ public void testGroupSplitsDisabledSortSplitsDisabled()
+ throws Exception {
+ testGroupSplitsAndSortSplits(false, false);
+ }
+
+ @Test(timeout = 5000)
+ public void testGroupSplitsEnabledSortSplitsEnabled()
+ throws Exception {
+ testGroupSplitsAndSortSplits(true, true);
+ }
+
+ @Test(timeout = 5000)
+ public void testGroupSplitsEnabledSortSplitsDisabled()
+ throws Exception {
+ testGroupSplitsAndSortSplits(true, false);
+ }
+
+ private void testGroupSplitsAndSortSplits(boolean groupSplitsEnabled,
+ boolean sortSplitsEnabled) throws Exception {
+ Configuration conf = new Configuration();
+ String[] splitLengths = new String[] {"1000", "2000", "3000"};
+ conf.setStrings(SPLITS_LENGTHS, splitLengths);
+ DataSourceDescriptor dataSource = MRInput.createConfigBuilder(
+ conf, InputFormatForTest.class).
+ groupSplits(groupSplitsEnabled).sortSplits(sortSplitsEnabled).build();
+ UserPayload userPayload = dataSource.getInputDescriptor().getUserPayload();
+
+ InputInitializerContext context =
+ new TezTestUtils.TezRootInputInitializerContextForTest(userPayload);
+ MRInputAMSplitGenerator splitGenerator =
+ new MRInputAMSplitGenerator(context);
+
+ List<Event> events = splitGenerator.initialize();
+
+ assertEquals(splitLengths.length + 1, events.size());
+ assertTrue(events.get(0) instanceof InputConfigureVertexTasksEvent);
+ for (int i = 1; i < splitLengths.length + 1; i++) {
+ assertTrue(events.get(i) instanceof InputDataInformationEvent);
+ InputDataInformationEvent diEvent = (InputDataInformationEvent) (events.get(i));
+ assertNull(diEvent.getDeserializedUserPayload());
+ assertNotNull(diEvent.getUserPayload());
+ MRSplitProto eventProto = MRSplitProto.parseFrom(ByteString.copyFrom(
+ diEvent.getUserPayload()));
+ InputSplit is = MRInputUtils.getNewSplitDetailsFromEvent(eventProto, new Configuration());
+ if (groupSplitsEnabled) {
+ // For this configuration, there is no actual split grouping.
+ is = ((TezGroupedSplit)is).getGroupedSplits().get(0);
+ }
+ assertTrue(is instanceof InputSplitForTest);
+ // The splits in the list returned from InputFormat has ascending
+ // size in order. MRInputAMSplitGenerator might sort the list
+ // from InputFormat depending on sortSplitsEnabled.
+ if (i == 1) {
+ // The first split returned from MRInputAMSplitGenerator.
+ // When sort split is enabled, the first split returned from
+ // MRInputAMSplitGenerator is the last split in the list returned
+ // from InputFormat.
+ assertEquals(sortSplitsEnabled ? splitLengths.length : 1,
+ ((InputSplitForTest) is).getIdentifier());
+ } else if (i == splitLengths.length) {
+ // The last split returned from MRInputAMSplitGenerator
+ // When sort split is enabled, the last split returned from
+ // MRInputAMSplitGenerator is the first split in the list returned
+ // from InputFormat.
+ assertEquals(sortSplitsEnabled ? 1 : splitLengths.length,
+ ((InputSplitForTest) is).getIdentifier());
+ }
+ }
+ }
+
+ private static class InputFormatForTest
+ extends InputFormat<IntWritable, IntWritable> {
+
+ @Override
+ public RecordReader<IntWritable, IntWritable> createRecordReader(
+ org.apache.hadoop.mapreduce.InputSplit split,
+ TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ return new RecordReader<IntWritable, IntWritable>() {
+
+ private boolean done = false;
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public IntWritable getCurrentKey() throws IOException,
+ InterruptedException {
+ return new IntWritable(0);
+ }
+
+ @Override
+ public IntWritable getCurrentValue() throws IOException,
+ InterruptedException {
+ return new IntWritable(0);
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return done ? 0 : 1;
+ }
+
+ @Override
+ public void initialize(org.apache.hadoop.mapreduce.InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (!done) {
+ done = true;
+ return true;
+ }
+ return false;
+ }
+ };
+ }
+
+ @Override
+ public List<org.apache.hadoop.mapreduce.InputSplit> getSplits(
+ JobContext context) throws IOException, InterruptedException {
+ List<org.apache.hadoop.mapreduce.InputSplit> list = new ArrayList<org.apache.hadoop.mapreduce.InputSplit>();
+ int[] lengths = context.getConfiguration().getInts(SPLITS_LENGTHS);
+ for (int i = 0; i < lengths.length; i++) {
+ list.add(new InputSplitForTest(i + 1, lengths[i]));
+ }
+ return list;
+ }
+ }
+
+ @Private
+ public static class InputSplitForTest extends InputSplit
+ implements Writable {
+
+ private int identifier;
+ private int length;
+
+ @SuppressWarnings("unused")
+ public InputSplitForTest() {
+ // For writable
+ }
+
+ public int getIdentifier() {
+ return this.identifier;
+ }
+ public InputSplitForTest(int identifier, int length) {
+ this.identifier = identifier;
+ this.length = length;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(identifier);
+ out.writeInt(length);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ identifier = in.readInt();
+ length = in.readInt();
+ }
+
+ @Override
+ public long getLength() throws IOException {
+ return length;
+ }
+
+ @Override
+ public String[] getLocations() throws IOException {
+ return new String[] {"localhost"};
+ }
+ }
+}