You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by mi...@apache.org on 2016/10/18 15:48:33 UTC

tez git commit: TEZ-3430. Make split sorting optional. Add missing newly added test.

Repository: tez
Updated Branches:
  refs/heads/master c1a7f10e4 -> 04d609e7f


TEZ-3430. Make split sorting optional. Add missing newly added test.


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/04d609e7
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/04d609e7
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/04d609e7

Branch: refs/heads/master
Commit: 04d609e7f222d5c830e846a2bea4eb770279ed00
Parents: c1a7f10
Author: Ming Ma <mi...@twitter.com>
Authored: Tue Oct 18 08:47:42 2016 -0700
Committer: Ming Ma <mi...@twitter.com>
Committed: Tue Oct 18 08:48:18 2016 -0700

----------------------------------------------------------------------
 .../common/TestMRInputAMSplitGenerator.java     | 241 +++++++++++++++++++
 1 file changed, 241 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/04d609e7/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputAMSplitGenerator.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputAMSplitGenerator.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputAMSplitGenerator.java
new file mode 100644
index 0000000..bd4e5a9
--- /dev/null
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputAMSplitGenerator.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.common;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.split.TezGroupedSplit;
+import org.apache.tez.dag.api.DataSourceDescriptor;
+import org.apache.tez.dag.api.UserPayload;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.mapreduce.TezTestUtils;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.mapreduce.lib.MRInputUtils;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.InputInitializerContext;
+import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
+import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+import org.junit.Test;
+
+import com.google.protobuf.ByteString;
+
+public class TestMRInputAMSplitGenerator {
+
+  private static String SPLITS_LENGTHS = "splits.length";
+
+  @Test(timeout = 5000)
+  public void testGroupSplitsDisabledSortSplitsEnabled()
+      throws Exception {
+    testGroupSplitsAndSortSplits(false, true);
+  }
+
+  @Test(timeout = 5000)
+  public void testGroupSplitsDisabledSortSplitsDisabled()
+      throws Exception {
+    testGroupSplitsAndSortSplits(false, false);
+  }
+
+  @Test(timeout = 5000)
+  public void testGroupSplitsEnabledSortSplitsEnabled()
+      throws Exception {
+    testGroupSplitsAndSortSplits(true, true);
+  }
+
+  @Test(timeout = 5000)
+  public void testGroupSplitsEnabledSortSplitsDisabled()
+          throws Exception {
+    testGroupSplitsAndSortSplits(true, false);
+  }
+
+  private void testGroupSplitsAndSortSplits(boolean groupSplitsEnabled,
+      boolean sortSplitsEnabled) throws Exception {
+    Configuration conf = new Configuration();
+    String[] splitLengths = new String[] {"1000", "2000", "3000"};
+    conf.setStrings(SPLITS_LENGTHS, splitLengths);
+    DataSourceDescriptor dataSource = MRInput.createConfigBuilder(
+        conf, InputFormatForTest.class).
+        groupSplits(groupSplitsEnabled).sortSplits(sortSplitsEnabled).build();
+    UserPayload userPayload = dataSource.getInputDescriptor().getUserPayload();
+
+    InputInitializerContext context =
+        new TezTestUtils.TezRootInputInitializerContextForTest(userPayload);
+    MRInputAMSplitGenerator splitGenerator =
+        new MRInputAMSplitGenerator(context);
+
+    List<Event> events = splitGenerator.initialize();
+
+    assertEquals(splitLengths.length + 1, events.size());
+    assertTrue(events.get(0) instanceof InputConfigureVertexTasksEvent);
+    for (int i = 1; i < splitLengths.length + 1; i++) {
+      assertTrue(events.get(i) instanceof InputDataInformationEvent);
+      InputDataInformationEvent diEvent = (InputDataInformationEvent) (events.get(i));
+      assertNull(diEvent.getDeserializedUserPayload());
+      assertNotNull(diEvent.getUserPayload());
+      MRSplitProto eventProto = MRSplitProto.parseFrom(ByteString.copyFrom(
+          diEvent.getUserPayload()));
+      InputSplit is = MRInputUtils.getNewSplitDetailsFromEvent(eventProto, new Configuration());
+      if (groupSplitsEnabled) {
+        // For this configuration, there is no actual split grouping.
+        is = ((TezGroupedSplit)is).getGroupedSplits().get(0);
+      }
+      assertTrue(is instanceof InputSplitForTest);
+      // The splits in the list returned from InputFormat has ascending
+      // size in order. MRInputAMSplitGenerator might sort the list
+      // from InputFormat depending on sortSplitsEnabled.
+      if (i == 1) {
+        // The first split returned from MRInputAMSplitGenerator.
+        // When sort split is enabled, the first split returned from
+        // MRInputAMSplitGenerator is the last split in the list returned
+        // from InputFormat.
+        assertEquals(sortSplitsEnabled ? splitLengths.length : 1,
+            ((InputSplitForTest) is).getIdentifier());
+      } else if (i == splitLengths.length) {
+        // The last split returned from MRInputAMSplitGenerator
+        // When sort split is enabled, the last split returned from
+        // MRInputAMSplitGenerator is the first split in the list returned
+        // from InputFormat.
+        assertEquals(sortSplitsEnabled ? 1 : splitLengths.length,
+            ((InputSplitForTest) is).getIdentifier());
+      }
+    }
+  }
+
+  private static class InputFormatForTest
+      extends InputFormat<IntWritable, IntWritable> {
+
+    @Override
+    public RecordReader<IntWritable, IntWritable> createRecordReader(
+        org.apache.hadoop.mapreduce.InputSplit split,
+        TaskAttemptContext context) throws IOException,
+        InterruptedException {
+      return new RecordReader<IntWritable, IntWritable>() {
+
+        private boolean done = false;
+
+        @Override
+        public void close() throws IOException {
+        }
+
+        @Override
+        public IntWritable getCurrentKey() throws IOException,
+                InterruptedException {
+          return new IntWritable(0);
+        }
+
+        @Override
+        public IntWritable getCurrentValue() throws IOException,
+                InterruptedException {
+          return new IntWritable(0);
+        }
+
+        @Override
+        public float getProgress() throws IOException, InterruptedException {
+          return done ? 0 : 1;
+        }
+
+        @Override
+        public void initialize(org.apache.hadoop.mapreduce.InputSplit split, TaskAttemptContext context)
+            throws IOException, InterruptedException {
+        }
+
+        @Override
+        public boolean nextKeyValue() throws IOException, InterruptedException {
+          if (!done) {
+            done = true;
+            return true;
+          }
+          return false;
+        }
+      };
+    }
+
+    @Override
+    public List<org.apache.hadoop.mapreduce.InputSplit> getSplits(
+        JobContext context) throws IOException, InterruptedException {
+      List<org.apache.hadoop.mapreduce.InputSplit> list = new ArrayList<org.apache.hadoop.mapreduce.InputSplit>();
+      int[] lengths = context.getConfiguration().getInts(SPLITS_LENGTHS);
+      for (int i = 0; i < lengths.length; i++) {
+        list.add(new InputSplitForTest(i + 1, lengths[i]));
+      }
+      return list;
+    }
+  }
+
+  @Private
+  public static class InputSplitForTest extends InputSplit
+      implements Writable {
+
+    private int identifier;
+    private int length;
+
+    @SuppressWarnings("unused")
+    public InputSplitForTest() {
+      // For writable
+    }
+
+    public int getIdentifier() {
+      return this.identifier;
+    }
+    public InputSplitForTest(int identifier, int length) {
+      this.identifier = identifier;
+      this.length = length;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeInt(identifier);
+      out.writeInt(length);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      identifier = in.readInt();
+      length = in.readInt();
+    }
+
+    @Override
+    public long getLength() throws IOException {
+      return length;
+    }
+
+    @Override
+    public String[] getLocations() throws IOException {
+      return new String[] {"localhost"};
+    }
+  }
+}