You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2016/06/30 22:10:43 UTC

[2/2] tez git commit: TEZ-3314. Double counting input bytes in MultiMRInput (Contributed by Harish JP via rbalamohan)

TEZ-3314. Double counting input bytes in MultiMRInput (Contributed by Harish JP via rbalamohan)

(cherry picked from commit 540eab0183787b434cbe95154bf2d1884dfaf46d)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/883e76fd
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/883e76fd
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/883e76fd

Branch: refs/heads/branch-0.8
Commit: 883e76fd5c75614575c7e359cc63320668e02bb0
Parents: a557086
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Wed Jun 29 16:49:14 2016 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Jun 30 15:07:22 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../tez/mapreduce/input/MultiMRInput.java       |   4 -
 .../tez/mapreduce/input/TestMultiMRInput.java   | 163 ++++++++++---------
 3 files changed, 84 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/883e76fd/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5fd04ff..9e0b751 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3314. Double counting input bytes in MultiMRInput.
   TEZ-3308. Add counters to capture input split length.
   TEZ-3302. Add a version of processorContext.waitForAllInputsReady and waitForAnyInputReady with a timeout.
   TEZ-3291. Optimize splits grouping when locality information is not available.

http://git-wip-us.apache.org/repos/asf/tez/blob/883e76fd/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
index efbeeaa..de54b0d 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
@@ -188,10 +188,6 @@ public class MultiMRInput extends MRInputBase {
         LOG.debug(getContext().getSourceVertexName() + " split Details -> SplitClass: " +
             split.getClass().getName() + ", NewSplit: " + split + ", length: " + splitLength);
       }
-      if (splitLength != -1) {
-        getContext().getCounters().findCounter(TaskCounter.INPUT_SPLIT_LENGTH_BYTES)
-            .increment(splitLength);
-      }
     } else {
       org.apache.hadoop.mapred.InputSplit split =
           MRInputUtils.getOldSplitDetailsFromEvent(splitProto, localJobConf);

http://git-wip-us.apache.org/repos/asf/tez/blob/883e76fd/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
index ab4a5d9..8d77a05 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
@@ -29,6 +29,7 @@ import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -38,15 +39,18 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TaskCounter;
@@ -98,13 +102,7 @@ public class TestMultiMRInput {
     jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class);
     FileInputFormat.setInputPaths(jobConf, workDir);
 
-    MRInputUserPayloadProto.Builder builder = MRInputUserPayloadProto.newBuilder();
-    builder.setGroupingEnabled(false);
-    builder.setConfigurationBytes(TezUtils.createByteStringFromConf(jobConf));
-    byte[] payload = builder.build().toByteArray();
-
-    InputContext inputContext = createTezInputContext(payload);
-
+    InputContext inputContext = createTezInputContext(jobConf);
 
     MultiMRInput mMrInput = new MultiMRInput(inputContext, 0);
 
@@ -131,21 +129,14 @@ public class TestMultiMRInput {
     jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class);
     FileInputFormat.setInputPaths(jobConf, workDir);
 
-    MRInputUserPayloadProto.Builder builder = MRInputUserPayloadProto.newBuilder();
-    builder.setGroupingEnabled(false);
-    builder.setConfigurationBytes(TezUtils.createByteStringFromConf(jobConf));
-    byte[] payload = builder.build().toByteArray();
-
-    InputContext inputContext = createTezInputContext(payload);
+    InputContext inputContext = createTezInputContext(jobConf);
 
     MultiMRInput input = new MultiMRInput(inputContext, 1);
     input.initialize();
-    List<Event> eventList = new ArrayList<Event>();
 
-    String file1 = "file1";
-    AtomicLong file1Length = new AtomicLong();
-    LinkedHashMap<LongWritable, Text> data1 = createInputData(localFs, workDir, jobConf, file1, 0,
-        10, file1Length);
+    AtomicLong inputLength = new AtomicLong();
+    LinkedHashMap<LongWritable, Text> data = createSplits(1, workDir, jobConf, inputLength);
+
     SequenceFileInputFormat<LongWritable, Text> format =
         new SequenceFileInputFormat<LongWritable, Text>();
     InputSplit[] splits = format.getSplits(jobConf, 1);
@@ -156,34 +147,47 @@ public class TestMultiMRInput {
         InputDataInformationEvent.createWithSerializedPayload(0,
             splitProto.toByteString().asReadOnlyByteBuffer());
 
-    eventList.clear();
+    List<Event> eventList = new ArrayList<Event>();
     eventList.add(event);
     input.handleEvents(eventList);
 
-    int readerCount = 0;
-    int recordCount = 0;
-    for (KeyValueReader reader : input.getKeyValueReaders()) {
-      readerCount++;
-      while (reader.next()) {
-        verify(inputContext, times(++recordCount) ).notifyProgress();
-        if (data1.size() == 0) {
-          fail("Found more records than expected");
-        }
-        Object key = reader.getCurrentKey();
-        Object val = reader.getCurrentValue();
-        assertEquals(val, data1.remove(key));
-      }
-      try {
-        reader.next(); //should throw exception
-        fail();
-      } catch(IOException e) {
-        assertTrue(e.getMessage().contains("For usage, please refer to"));
-      }
-    }
-    assertEquals(1, readerCount);
-    long counterValue = input.getContext().getCounters()
-        .findCounter(TaskCounter.INPUT_SPLIT_LENGTH_BYTES).getValue();
-    assertEquals(file1Length.get(), counterValue);
+    assertReaders(input, data, 1, inputLength.get());
+  }
+
+  @Test
+  public void testNewFormatSplits() throws Exception {
+    Path workDir = new Path(TEST_ROOT_DIR, "testNewFormatSplits");
+    Job job = Job.getInstance(defaultConf);
+    job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class);
+    org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(job, workDir);
+    Configuration conf = job.getConfiguration();
+    conf.setBoolean("mapred.mapper.new-api", true);
+
+    // Create sequence file.
+    AtomicLong inputLength = new AtomicLong();
+    LinkedHashMap<LongWritable, Text> data = createSplits(1, workDir, conf, inputLength);
+
+    // Get split information.
+    org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat<LongWritable, Text> format =
+        new org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat<>();
+    List<org.apache.hadoop.mapreduce.InputSplit> splits = format.getSplits(job);
+    assertEquals(1, splits.size());
+
+    // Create the event.
+    MRSplitProto splitProto =
+        MRInputHelpers.createSplitProto(splits.get(0), new SerializationFactory(conf));
+    InputDataInformationEvent event = InputDataInformationEvent.createWithSerializedPayload(0,
+        splitProto.toByteString().asReadOnlyByteBuffer());
+
+    // Create input context.
+    InputContext inputContext = createTezInputContext(conf);
+
+    // Create the MR input object and process the event
+    MultiMRInput input = new MultiMRInput(inputContext, 1);
+    input.initialize();
+    input.handleEvents(Collections.<Event>singletonList(event));
+
+    assertReaders(input, data, 1, inputLength.get());
   }
 
   @Test(timeout = 5000)
@@ -194,31 +198,13 @@ public class TestMultiMRInput {
     jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class);
     FileInputFormat.setInputPaths(jobConf, workDir);
 
-    MRInputUserPayloadProto.Builder builder = MRInputUserPayloadProto.newBuilder();
-    builder.setGroupingEnabled(false);
-    builder.setConfigurationBytes(TezUtils.createByteStringFromConf(jobConf));
-    byte[] payload = builder.build().toByteArray();
-
-    InputContext inputContext = createTezInputContext(payload);
+    InputContext inputContext = createTezInputContext(jobConf);
 
     MultiMRInput input = new MultiMRInput(inputContext, 2);
     input.initialize();
-    List<Event> eventList = new ArrayList<Event>();
 
-    LinkedHashMap<LongWritable, Text> data = new LinkedHashMap<LongWritable, Text>();
-
-    String file1 = "file1";
-    AtomicLong file1Length = new AtomicLong();
-    LinkedHashMap<LongWritable, Text> data1 = createInputData(localFs, workDir, jobConf, file1, 0,
-        10, file1Length);
-
-    String file2 = "file2";
-    AtomicLong file2Length = new AtomicLong();
-    LinkedHashMap<LongWritable, Text> data2 = createInputData(localFs, workDir, jobConf, file2, 10,
-        20, file2Length);
-
-    data.putAll(data1);
-    data.putAll(data2);
+    AtomicLong inputLength = new AtomicLong();
+    LinkedHashMap<LongWritable, Text> data = createSplits(2, workDir, jobConf, inputLength);
 
     SequenceFileInputFormat<LongWritable, Text> format =
         new SequenceFileInputFormat<LongWritable, Text>();
@@ -235,15 +221,22 @@ public class TestMultiMRInput {
         InputDataInformationEvent.createWithSerializedPayload(0,
             splitProto2.toByteString().asReadOnlyByteBuffer());
 
-    eventList.clear();
+    List<Event> eventList = new ArrayList<Event>();
     eventList.add(event1);
     eventList.add(event2);
     input.handleEvents(eventList);
 
+    assertReaders(input, data, 2, inputLength.get());
+  }
+
+  private void assertReaders(MultiMRInput input, LinkedHashMap<LongWritable, Text> data,
+      int expectedReaderCounts, long inputBytes) throws Exception {
     int readerCount = 0;
+    int recordCount = 0;
     for (KeyValueReader reader : input.getKeyValueReaders()) {
       readerCount++;
       while (reader.next()) {
+        verify(input.getContext(), times(++recordCount + readerCount - 1)).notifyProgress();
         if (data.size() == 0) {
           fail("Found more records than expected");
         }
@@ -261,8 +254,8 @@ public class TestMultiMRInput {
     }
     long counterValue = input.getContext().getCounters()
         .findCounter(TaskCounter.INPUT_SPLIT_LENGTH_BYTES).getValue();
-    assertEquals(file1Length.get() + file2Length.get(), counterValue);
-    assertEquals(2, readerCount);
+    assertEquals(inputBytes, counterValue);
+    assertEquals(expectedReaderCounts, readerCount);
   }
 
   @Test(timeout = 5000)
@@ -272,19 +265,13 @@ public class TestMultiMRInput {
     jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class);
     FileInputFormat.setInputPaths(jobConf, workDir);
 
-    MRInputUserPayloadProto.Builder builder = MRInputUserPayloadProto.newBuilder();
-    builder.setGroupingEnabled(false);
-    builder.setConfigurationBytes(TezUtils.createByteStringFromConf(jobConf));
-    byte[] payload = builder.build().toByteArray();
-
-    InputContext inputContext = createTezInputContext(payload);
+    InputContext inputContext = createTezInputContext(jobConf);
 
     MultiMRInput input = new MultiMRInput(inputContext, 1);
     input.initialize();
-    List<Event> eventList = new ArrayList<Event>();
 
-    String file1 = "file1";
-    createInputData(localFs, workDir, jobConf, file1, 0, 10, new AtomicLong());
+    createSplits(1, workDir, jobConf, new AtomicLong());
+
     SequenceFileInputFormat<LongWritable, Text> format =
         new SequenceFileInputFormat<LongWritable, Text>();
     InputSplit[] splits = format.getSplits(jobConf, 1);
@@ -298,7 +285,7 @@ public class TestMultiMRInput {
         InputDataInformationEvent.createWithSerializedPayload(1,
             splitProto.toByteString().asReadOnlyByteBuffer());
 
-    eventList.clear();
+    List<Event> eventList = new ArrayList<Event>();
     eventList.add(event1);
     eventList.add(event2);
     try {
@@ -310,7 +297,23 @@ public class TestMultiMRInput {
     }
   }
 
-  private InputContext createTezInputContext(byte[] payload) {
+  private LinkedHashMap<LongWritable, Text> createSplits(int splitCount, Path workDir,
+      Configuration conf, AtomicLong totalSize) throws Exception {
+    LinkedHashMap<LongWritable, Text> data = new LinkedHashMap<LongWritable, Text>();
+    for (int i = 0; i < splitCount; ++i) {
+      int start = i * 10;
+      int end = start + 10;
+      data.putAll(createInputData(localFs, workDir, conf, "file" + i, start, end, totalSize));
+    }
+    return data;
+  }
+
+  private InputContext createTezInputContext(Configuration conf) throws Exception {
+    MRInputUserPayloadProto.Builder builder = MRInputUserPayloadProto.newBuilder();
+    builder.setGroupingEnabled(false);
+    builder.setConfigurationBytes(TezUtils.createByteStringFromConf(conf));
+    byte[] payload = builder.build().toByteArray();
+
     ApplicationId applicationId = ApplicationId.newInstance(10000, 1);
     TezCounters counters = new TezCounters();
 
@@ -336,7 +339,7 @@ public class TestMultiMRInput {
   }
 
   public static LinkedHashMap<LongWritable, Text> createInputData(FileSystem fs, Path workDir,
-      JobConf job, String filename, long startKey, long numKeys, AtomicLong fileLength)
+      Configuration job, String filename, long startKey, long numKeys, AtomicLong fileLength)
           throws IOException {
     LinkedHashMap<LongWritable, Text> data = new LinkedHashMap<LongWritable, Text>();
     Path file = new Path(workDir, filename);
@@ -356,7 +359,7 @@ public class TestMultiMRInput {
         writer.append(key, value);
         LOG.info("<k, v> : <" + key.get() + ", " + value + ">");
       }
-      fileLength.set(writer.getLength());
+      fileLength.addAndGet(writer.getLength());
     } finally {
       writer.close();
     }