You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2016/06/29 23:49:26 UTC
tez git commit: TEZ-3314. Double counting input bytes in MultiMRInput
(Contributed by Harish JP via rbalamohan)
Repository: tez
Updated Branches:
refs/heads/master c6a7d76ea -> 540eab018
TEZ-3314. Double counting input bytes in MultiMRInput (Contributed by Harish JP via rbalamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/540eab01
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/540eab01
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/540eab01
Branch: refs/heads/master
Commit: 540eab0183787b434cbe95154bf2d1884dfaf46d
Parents: c6a7d76
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Wed Jun 29 16:49:14 2016 -0700
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Wed Jun 29 16:49:14 2016 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../tez/mapreduce/input/MultiMRInput.java | 4 -
.../tez/mapreduce/input/TestMultiMRInput.java | 163 ++++++++++---------
3 files changed, 84 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/540eab01/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1617c91..deb02fc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3314. Double counting input bytes in MultiMRInput.
TEZ-3308. Add counters to capture input split length.
TEZ-3302. Add a version of processorContext.waitForAllInputsReady and waitForAnyInputReady with a timeout.
TEZ-3291. Optimize splits grouping when locality information is not available.
http://git-wip-us.apache.org/repos/asf/tez/blob/540eab01/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
index efbeeaa..de54b0d 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
@@ -188,10 +188,6 @@ public class MultiMRInput extends MRInputBase {
LOG.debug(getContext().getSourceVertexName() + " split Details -> SplitClass: " +
split.getClass().getName() + ", NewSplit: " + split + ", length: " + splitLength);
}
- if (splitLength != -1) {
- getContext().getCounters().findCounter(TaskCounter.INPUT_SPLIT_LENGTH_BYTES)
- .increment(splitLength);
- }
} else {
org.apache.hadoop.mapred.InputSplit split =
MRInputUtils.getOldSplitDetailsFromEvent(splitProto, localJobConf);
http://git-wip-us.apache.org/repos/asf/tez/blob/540eab01/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
index ab4a5d9..8d77a05 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
@@ -29,6 +29,7 @@ import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
@@ -38,15 +39,18 @@ import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.TaskCounter;
@@ -98,13 +102,7 @@ public class TestMultiMRInput {
jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class);
FileInputFormat.setInputPaths(jobConf, workDir);
- MRInputUserPayloadProto.Builder builder = MRInputUserPayloadProto.newBuilder();
- builder.setGroupingEnabled(false);
- builder.setConfigurationBytes(TezUtils.createByteStringFromConf(jobConf));
- byte[] payload = builder.build().toByteArray();
-
- InputContext inputContext = createTezInputContext(payload);
-
+ InputContext inputContext = createTezInputContext(jobConf);
MultiMRInput mMrInput = new MultiMRInput(inputContext, 0);
@@ -131,21 +129,14 @@ public class TestMultiMRInput {
jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class);
FileInputFormat.setInputPaths(jobConf, workDir);
- MRInputUserPayloadProto.Builder builder = MRInputUserPayloadProto.newBuilder();
- builder.setGroupingEnabled(false);
- builder.setConfigurationBytes(TezUtils.createByteStringFromConf(jobConf));
- byte[] payload = builder.build().toByteArray();
-
- InputContext inputContext = createTezInputContext(payload);
+ InputContext inputContext = createTezInputContext(jobConf);
MultiMRInput input = new MultiMRInput(inputContext, 1);
input.initialize();
- List<Event> eventList = new ArrayList<Event>();
- String file1 = "file1";
- AtomicLong file1Length = new AtomicLong();
- LinkedHashMap<LongWritable, Text> data1 = createInputData(localFs, workDir, jobConf, file1, 0,
- 10, file1Length);
+ AtomicLong inputLength = new AtomicLong();
+ LinkedHashMap<LongWritable, Text> data = createSplits(1, workDir, jobConf, inputLength);
+
SequenceFileInputFormat<LongWritable, Text> format =
new SequenceFileInputFormat<LongWritable, Text>();
InputSplit[] splits = format.getSplits(jobConf, 1);
@@ -156,34 +147,47 @@ public class TestMultiMRInput {
InputDataInformationEvent.createWithSerializedPayload(0,
splitProto.toByteString().asReadOnlyByteBuffer());
- eventList.clear();
+ List<Event> eventList = new ArrayList<Event>();
eventList.add(event);
input.handleEvents(eventList);
- int readerCount = 0;
- int recordCount = 0;
- for (KeyValueReader reader : input.getKeyValueReaders()) {
- readerCount++;
- while (reader.next()) {
- verify(inputContext, times(++recordCount) ).notifyProgress();
- if (data1.size() == 0) {
- fail("Found more records than expected");
- }
- Object key = reader.getCurrentKey();
- Object val = reader.getCurrentValue();
- assertEquals(val, data1.remove(key));
- }
- try {
- reader.next(); //should throw exception
- fail();
- } catch(IOException e) {
- assertTrue(e.getMessage().contains("For usage, please refer to"));
- }
- }
- assertEquals(1, readerCount);
- long counterValue = input.getContext().getCounters()
- .findCounter(TaskCounter.INPUT_SPLIT_LENGTH_BYTES).getValue();
- assertEquals(file1Length.get(), counterValue);
+ assertReaders(input, data, 1, inputLength.get());
+ }
+
+ @Test
+ public void testNewFormatSplits() throws Exception {
+ Path workDir = new Path(TEST_ROOT_DIR, "testNewFormatSplits");
+ Job job = Job.getInstance(defaultConf);
+ job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class);
+ org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(job, workDir);
+ Configuration conf = job.getConfiguration();
+ conf.setBoolean("mapred.mapper.new-api", true);
+
+ // Create sequence file.
+ AtomicLong inputLength = new AtomicLong();
+ LinkedHashMap<LongWritable, Text> data = createSplits(1, workDir, conf, inputLength);
+
+ // Get split information.
+ org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat<LongWritable, Text> format =
+ new org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat<>();
+ List<org.apache.hadoop.mapreduce.InputSplit> splits = format.getSplits(job);
+ assertEquals(1, splits.size());
+
+ // Create the event.
+ MRSplitProto splitProto =
+ MRInputHelpers.createSplitProto(splits.get(0), new SerializationFactory(conf));
+ InputDataInformationEvent event = InputDataInformationEvent.createWithSerializedPayload(0,
+ splitProto.toByteString().asReadOnlyByteBuffer());
+
+ // Create input context.
+ InputContext inputContext = createTezInputContext(conf);
+
+ // Create the MR input object and process the event
+ MultiMRInput input = new MultiMRInput(inputContext, 1);
+ input.initialize();
+ input.handleEvents(Collections.<Event>singletonList(event));
+
+ assertReaders(input, data, 1, inputLength.get());
}
@Test(timeout = 5000)
@@ -194,31 +198,13 @@ public class TestMultiMRInput {
jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class);
FileInputFormat.setInputPaths(jobConf, workDir);
- MRInputUserPayloadProto.Builder builder = MRInputUserPayloadProto.newBuilder();
- builder.setGroupingEnabled(false);
- builder.setConfigurationBytes(TezUtils.createByteStringFromConf(jobConf));
- byte[] payload = builder.build().toByteArray();
-
- InputContext inputContext = createTezInputContext(payload);
+ InputContext inputContext = createTezInputContext(jobConf);
MultiMRInput input = new MultiMRInput(inputContext, 2);
input.initialize();
- List<Event> eventList = new ArrayList<Event>();
- LinkedHashMap<LongWritable, Text> data = new LinkedHashMap<LongWritable, Text>();
-
- String file1 = "file1";
- AtomicLong file1Length = new AtomicLong();
- LinkedHashMap<LongWritable, Text> data1 = createInputData(localFs, workDir, jobConf, file1, 0,
- 10, file1Length);
-
- String file2 = "file2";
- AtomicLong file2Length = new AtomicLong();
- LinkedHashMap<LongWritable, Text> data2 = createInputData(localFs, workDir, jobConf, file2, 10,
- 20, file2Length);
-
- data.putAll(data1);
- data.putAll(data2);
+ AtomicLong inputLength = new AtomicLong();
+ LinkedHashMap<LongWritable, Text> data = createSplits(2, workDir, jobConf, inputLength);
SequenceFileInputFormat<LongWritable, Text> format =
new SequenceFileInputFormat<LongWritable, Text>();
@@ -235,15 +221,22 @@ public class TestMultiMRInput {
InputDataInformationEvent.createWithSerializedPayload(0,
splitProto2.toByteString().asReadOnlyByteBuffer());
- eventList.clear();
+ List<Event> eventList = new ArrayList<Event>();
eventList.add(event1);
eventList.add(event2);
input.handleEvents(eventList);
+ assertReaders(input, data, 2, inputLength.get());
+ }
+
+ private void assertReaders(MultiMRInput input, LinkedHashMap<LongWritable, Text> data,
+ int expectedReaderCounts, long inputBytes) throws Exception {
int readerCount = 0;
+ int recordCount = 0;
for (KeyValueReader reader : input.getKeyValueReaders()) {
readerCount++;
while (reader.next()) {
+ verify(input.getContext(), times(++recordCount + readerCount - 1)).notifyProgress();
if (data.size() == 0) {
fail("Found more records than expected");
}
@@ -261,8 +254,8 @@ public class TestMultiMRInput {
}
long counterValue = input.getContext().getCounters()
.findCounter(TaskCounter.INPUT_SPLIT_LENGTH_BYTES).getValue();
- assertEquals(file1Length.get() + file2Length.get(), counterValue);
- assertEquals(2, readerCount);
+ assertEquals(inputBytes, counterValue);
+ assertEquals(expectedReaderCounts, readerCount);
}
@Test(timeout = 5000)
@@ -272,19 +265,13 @@ public class TestMultiMRInput {
jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class);
FileInputFormat.setInputPaths(jobConf, workDir);
- MRInputUserPayloadProto.Builder builder = MRInputUserPayloadProto.newBuilder();
- builder.setGroupingEnabled(false);
- builder.setConfigurationBytes(TezUtils.createByteStringFromConf(jobConf));
- byte[] payload = builder.build().toByteArray();
-
- InputContext inputContext = createTezInputContext(payload);
+ InputContext inputContext = createTezInputContext(jobConf);
MultiMRInput input = new MultiMRInput(inputContext, 1);
input.initialize();
- List<Event> eventList = new ArrayList<Event>();
- String file1 = "file1";
- createInputData(localFs, workDir, jobConf, file1, 0, 10, new AtomicLong());
+ createSplits(1, workDir, jobConf, new AtomicLong());
+
SequenceFileInputFormat<LongWritable, Text> format =
new SequenceFileInputFormat<LongWritable, Text>();
InputSplit[] splits = format.getSplits(jobConf, 1);
@@ -298,7 +285,7 @@ public class TestMultiMRInput {
InputDataInformationEvent.createWithSerializedPayload(1,
splitProto.toByteString().asReadOnlyByteBuffer());
- eventList.clear();
+ List<Event> eventList = new ArrayList<Event>();
eventList.add(event1);
eventList.add(event2);
try {
@@ -310,7 +297,23 @@ public class TestMultiMRInput {
}
}
- private InputContext createTezInputContext(byte[] payload) {
+ private LinkedHashMap<LongWritable, Text> createSplits(int splitCount, Path workDir,
+ Configuration conf, AtomicLong totalSize) throws Exception {
+ LinkedHashMap<LongWritable, Text> data = new LinkedHashMap<LongWritable, Text>();
+ for (int i = 0; i < splitCount; ++i) {
+ int start = i * 10;
+ int end = start + 10;
+ data.putAll(createInputData(localFs, workDir, conf, "file" + i, start, end, totalSize));
+ }
+ return data;
+ }
+
+ private InputContext createTezInputContext(Configuration conf) throws Exception {
+ MRInputUserPayloadProto.Builder builder = MRInputUserPayloadProto.newBuilder();
+ builder.setGroupingEnabled(false);
+ builder.setConfigurationBytes(TezUtils.createByteStringFromConf(conf));
+ byte[] payload = builder.build().toByteArray();
+
ApplicationId applicationId = ApplicationId.newInstance(10000, 1);
TezCounters counters = new TezCounters();
@@ -336,7 +339,7 @@ public class TestMultiMRInput {
}
public static LinkedHashMap<LongWritable, Text> createInputData(FileSystem fs, Path workDir,
- JobConf job, String filename, long startKey, long numKeys, AtomicLong fileLength)
+ Configuration job, String filename, long startKey, long numKeys, AtomicLong fileLength)
throws IOException {
LinkedHashMap<LongWritable, Text> data = new LinkedHashMap<LongWritable, Text>();
Path file = new Path(workDir, filename);
@@ -356,7 +359,7 @@ public class TestMultiMRInput {
writer.append(key, value);
LOG.info("<k, v> : <" + key.get() + ", " + value + ">");
}
- fileLength.set(writer.getLength());
+ fileLength.addAndGet(writer.getLength());
} finally {
writer.close();
}