You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/02/26 00:17:03 UTC

git commit: TEZ-889. Fixes a bug in MRInputSplitDistributor (caused by TEZ-880). (sseth)

Repository: incubator-tez
Updated Branches:
  refs/heads/master 5c5b0da39 -> a5713a41f


TEZ-889. Fixes a bug in MRInputSplitDistributor (caused by TEZ-880).
(sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/a5713a41
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/a5713a41
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/a5713a41

Branch: refs/heads/master
Commit: a5713a41fb9f5f7b885109f09ee7c03d1da03b66
Parents: 5c5b0da
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Feb 25 15:16:37 2014 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue Feb 25 15:16:37 2014 -0800

----------------------------------------------------------------------
 .../common/MRInputSplitDistributor.java         |   2 +-
 .../common/TestMRInputSplitDistributor.java     | 231 +++++++++++++++++++
 2 files changed, 232 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a5713a41/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
index b1a0a4d..92debe9 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
@@ -103,8 +103,8 @@ public class MRInputSplitDistributor implements TezRootInputInitializer {
               mrSplit, conf);
           diEvent = new RootInputDataInformationEvent(count++, oldInputSplit);
         }
-        events.add(diEvent);
       }
+      events.add(diEvent);
     }
 
     return events;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a5713a41/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
new file mode 100644
index 0000000..abe45f7
--- /dev/null
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.common;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TezRootInputInitializerContext;
+import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
+import org.apache.tez.runtime.api.events.RootInputUpdatePayloadEvent;
+import org.junit.Test;
+
+import com.google.protobuf.ByteString;
+
+public class TestMRInputSplitDistributor {
+
+  @Test
+  public void testSerializedPayload() throws IOException {
+
+    Configuration conf = new Configuration(false);
+    conf.setBoolean(MRJobConfig.MR_TEZ_INPUT_INITIALIZER_SERIALIZE_EVENT_PAYLAOD, true);
+    ByteString confByteString = MRHelpers.createByteStringFromConf(conf);
+    InputSplit split1 = new InputSplitForTest(1);
+    InputSplit split2 = new InputSplitForTest(2);
+    MRSplitProto proto1 = MRHelpers.createSplitProto(split1);
+    MRSplitProto proto2 = MRHelpers.createSplitProto(split2);
+    MRSplitsProto.Builder splitsProtoBuilder = MRSplitsProto.newBuilder();
+    splitsProtoBuilder.addSplits(proto1);
+    splitsProtoBuilder.addSplits(proto2);
+    MRInputUserPayloadProto.Builder payloadProto = MRInputUserPayloadProto.newBuilder();
+    payloadProto.setSplits(splitsProtoBuilder.build());
+    payloadProto.setConfigurationBytes(confByteString);
+    byte[] userPayload = payloadProto.build().toByteArray();
+
+    TezRootInputInitializerContext context = new TezRootInputInitializerContextForTest(userPayload);
+    MRInputSplitDistributor splitDist = new MRInputSplitDistributor();
+
+    List<Event> events = splitDist.initialize(context);
+
+    assertEquals(3, events.size());
+    assertTrue(events.get(0) instanceof RootInputUpdatePayloadEvent);
+    assertTrue(events.get(1) instanceof RootInputDataInformationEvent);
+    assertTrue(events.get(2) instanceof RootInputDataInformationEvent);
+
+    RootInputDataInformationEvent diEvent1 = (RootInputDataInformationEvent) (events.get(1));
+    RootInputDataInformationEvent diEvent2 = (RootInputDataInformationEvent) (events.get(2));
+
+    assertNull(diEvent1.getDeserializedUserPayload());
+    assertNull(diEvent2.getDeserializedUserPayload());
+
+    assertNotNull(diEvent1.getUserPayload());
+    assertNotNull(diEvent2.getUserPayload());
+
+    MRSplitProto event1Proto = MRSplitProto.parseFrom(diEvent1.getUserPayload());
+    InputSplit is1 = MRInput.getOldSplitDetailsFromEvent(event1Proto, new Configuration());
+    assertTrue(is1 instanceof InputSplitForTest);
+    assertEquals(1, ((InputSplitForTest) is1).identifier);
+
+    MRSplitProto event2Proto = MRSplitProto.parseFrom(diEvent2.getUserPayload());
+    InputSplit is2 = MRInput.getOldSplitDetailsFromEvent(event2Proto, new Configuration());
+    assertTrue(is2 instanceof InputSplitForTest);
+    assertEquals(2, ((InputSplitForTest) is2).identifier);
+  }
+
+  @Test
+  public void testDeserializedPayload() throws IOException {
+
+    Configuration conf = new Configuration(false);
+    conf.setBoolean(MRJobConfig.MR_TEZ_INPUT_INITIALIZER_SERIALIZE_EVENT_PAYLAOD, false);
+    ByteString confByteString = MRHelpers.createByteStringFromConf(conf);
+    InputSplit split1 = new InputSplitForTest(1);
+    InputSplit split2 = new InputSplitForTest(2);
+    MRSplitProto proto1 = MRHelpers.createSplitProto(split1);
+    MRSplitProto proto2 = MRHelpers.createSplitProto(split2);
+    MRSplitsProto.Builder splitsProtoBuilder = MRSplitsProto.newBuilder();
+    splitsProtoBuilder.addSplits(proto1);
+    splitsProtoBuilder.addSplits(proto2);
+    MRInputUserPayloadProto.Builder payloadProto = MRInputUserPayloadProto.newBuilder();
+    payloadProto.setSplits(splitsProtoBuilder.build());
+    payloadProto.setConfigurationBytes(confByteString);
+    byte[] userPayload = payloadProto.build().toByteArray();
+
+    TezRootInputInitializerContext context = new TezRootInputInitializerContextForTest(userPayload);
+    MRInputSplitDistributor splitDist = new MRInputSplitDistributor();
+
+    List<Event> events = splitDist.initialize(context);
+
+    assertEquals(3, events.size());
+    assertTrue(events.get(0) instanceof RootInputUpdatePayloadEvent);
+    assertTrue(events.get(1) instanceof RootInputDataInformationEvent);
+    assertTrue(events.get(2) instanceof RootInputDataInformationEvent);
+
+    RootInputDataInformationEvent diEvent1 = (RootInputDataInformationEvent) (events.get(1));
+    RootInputDataInformationEvent diEvent2 = (RootInputDataInformationEvent) (events.get(2));
+
+    assertNull(diEvent1.getUserPayload());
+    assertNull(diEvent2.getUserPayload());
+
+    assertNotNull(diEvent1.getDeserializedUserPayload());
+    assertNotNull(diEvent2.getDeserializedUserPayload());
+
+    assertTrue(diEvent1.getDeserializedUserPayload() instanceof InputSplitForTest);
+    assertEquals(1, ((InputSplitForTest) diEvent1.getDeserializedUserPayload()).identifier);
+
+    assertTrue(diEvent2.getDeserializedUserPayload() instanceof InputSplitForTest);
+    assertEquals(2, ((InputSplitForTest) diEvent2.getDeserializedUserPayload()).identifier);
+  }
+
+  private static class TezRootInputInitializerContextForTest implements
+      TezRootInputInitializerContext {
+
+    private final ApplicationId appId;
+    private final byte[] payload;
+
+    TezRootInputInitializerContextForTest(byte[] payload) throws IOException {
+      appId = ApplicationId.newInstance(1000, 200);
+      this.payload = payload;
+    }
+
+    @Override
+    public ApplicationId getApplicationId() {
+      return appId;
+    }
+
+    @Override
+    public String getDAGName() {
+      return "FakeDAG";
+    }
+
+    @Override
+    public String getInputName() {
+      return "MRInput";
+    }
+
+    @Override
+    public byte[] getUserPayload() {
+      return payload;
+    }
+
+    @Override
+    public int getNumTasks() {
+      return 100;
+    }
+
+    @Override
+    public Resource getVertexTaskResource() {
+      return Resource.newInstance(1024, 1);
+    }
+
+    @Override
+    public Resource getTotalAvailableResource() {
+      return Resource.newInstance(10240, 10);
+    }
+
+    @Override
+    public int getNumClusterNodes() {
+      return 10;
+    }
+  }
+
+  @Private
+  private static class InputSplitForTest implements InputSplit {
+
+    private int identifier;
+
+    @SuppressWarnings("unused")
+    public InputSplitForTest() {
+      // For writable
+    }
+
+    public InputSplitForTest(int identifier) {
+      this.identifier = identifier;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeInt(identifier);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      identifier = in.readInt();
+    }
+
+    @Override
+    public long getLength() throws IOException {
+      return 1000;
+    }
+
+    @Override
+    public String[] getLocations() throws IOException {
+      return null;
+    }
+
+  }
+}