You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2015/02/18 23:48:52 UTC
tez git commit: TEZ-167. Create tests for MR Combiner. (Devaraj K and
Tsuyoshi Ozawa via hitesh)
Repository: tez
Updated Branches:
refs/heads/master 269905b11 -> d78846937
TEZ-167. Create tests for MR Combiner. (Devaraj K and Tsuyoshi Ozawa via hitesh)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d7884693
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d7884693
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d7884693
Branch: refs/heads/master
Commit: d78846937c463cd882593247d071a7f2b716199d
Parents: 269905b
Author: Hitesh Shah <hi...@apache.org>
Authored: Wed Feb 18 14:47:59 2015 -0800
Committer: Hitesh Shah <hi...@apache.org>
Committed: Wed Feb 18 14:47:59 2015 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../tez/mapreduce/combine/TestMRCombiner.java | 188 +++++++++++++++++++
2 files changed, 189 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/d7884693/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ff16163..b1c8c99 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-167. Create tests for MR Combiner.
TEZ-2080. Localclient should be using tezconf in init instead of yarnconf.
TEZ-2072. Add missing Private annotation to createDAG in the DAG API class.
TEZ-2095. master branch fails to compile against hadoop-2.4.
http://git-wip-us.apache.org/repos/asf/tez/blob/d7884693/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/combine/TestMRCombiner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/combine/TestMRCombiner.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/combine/TestMRCombiner.java
new file mode 100644
index 0000000..a92f8dd
--- /dev/null
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/combine/TestMRCombiner.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.combine;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.api.TaskContext;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestMRCombiner {
+
+ @Test
+ public void testRunOldCombiner() throws IOException, InterruptedException {
+ TezConfiguration conf = new TezConfiguration();
+ setKeyAndValueClassTypes(conf);
+ conf.setClass("mapred.combiner.class", OldReducer.class, Object.class);
+ TaskContext taskContext = getTaskContext(conf);
+ MRCombiner combiner = new MRCombiner(taskContext);
+ Writer writer = Mockito.mock(Writer.class);
+ combiner.combine(new TezRawKeyValueIteratorTest(), writer);
+ // verify combiner output keys and values
+ verifyKeyAndValues(writer);
+ }
+
+ @Test
+ public void testRunNewCombiner() throws IOException, InterruptedException {
+ TezConfiguration conf = new TezConfiguration();
+ setKeyAndValueClassTypes(conf);
+ conf.setBoolean("mapred.mapper.new-api", true);
+ conf.setClass(MRJobConfig.COMBINE_CLASS_ATTR, NewReducer.class,
+ Object.class);
+ TaskContext taskContext = getTaskContext(conf);
+ MRCombiner combiner = new MRCombiner(taskContext);
+ Writer writer = Mockito.mock(Writer.class);
+ combiner.combine(new TezRawKeyValueIteratorTest(), writer);
+ // verify combiner output keys and values
+ verifyKeyAndValues(writer);
+ }
+
+ private void setKeyAndValueClassTypes(TezConfiguration conf) {
+ conf.setClass(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS,
+ Text.class, Object.class);
+ conf.setClass(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS,
+ IntWritable.class, Object.class);
+ }
+
+ private TaskContext getTaskContext(TezConfiguration conf)
+ throws IOException {
+ UserPayload payload = TezUtils.createUserPayloadFromConf(conf);
+ TaskContext taskContext = Mockito.mock(InputContext.class);
+ Mockito.when(taskContext.getUserPayload()).thenReturn(payload);
+ Mockito.when(taskContext.getCounters()).thenReturn(new TezCounters());
+ Mockito.when(taskContext.getApplicationId()).thenReturn(
+ ApplicationId.newInstance(123456, 1));
+ return taskContext;
+ }
+
+ private void verifyKeyAndValues(Writer writer) throws IOException {
+ Mockito.verify(writer, Mockito.atLeastOnce()).append(new Text("tez"),
+ new IntWritable(3));
+ Mockito.verify(writer, Mockito.atLeastOnce()).append(new Text("apache"),
+ new IntWritable(1));
+ Mockito.verify(writer, Mockito.atLeastOnce()).append(new Text("hadoop"),
+ new IntWritable(2));
+ }
+
+ private static class TezRawKeyValueIteratorTest implements
+ TezRawKeyValueIterator {
+
+ private int i = -1;
+ private String[] keys = { "tez", "tez", "tez", "apache", "hadoop", "hadoop" };
+
+ @Override
+ public boolean next() throws IOException {
+ if (i++ < keys.length - 1) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public DataInputBuffer getValue() throws IOException {
+ DataInputBuffer value = new DataInputBuffer();
+ IntWritable intValue = new IntWritable(1);
+ DataOutputBuffer out = new DataOutputBuffer();
+ intValue.write(out);
+ value.reset(out.getData(), out.getLength());
+ return value;
+ }
+
+ @Override
+ public Progress getProgress() {
+ return null;
+ }
+
+ @Override
+ public boolean isSameKey() throws IOException {
+ return false;
+ }
+
+ @Override
+ public DataInputBuffer getKey() throws IOException {
+ DataInputBuffer key = new DataInputBuffer();
+ Text text = new Text(keys[i]);
+ DataOutputBuffer out = new DataOutputBuffer();
+ text.write(out);
+ key.reset(out.getData(), out.getLength());
+ return key;
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+ }
+
+ private static class OldReducer implements
+ Reducer<Text, IntWritable, Text, IntWritable> {
+
+ @Override
+ public void configure(JobConf arg0) {
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public void reduce(Text key, Iterator<IntWritable> value,
+ OutputCollector<Text, IntWritable> collector, Reporter reporter)
+ throws IOException {
+ int count = 0;
+ while (value.hasNext()) {
+ count += value.next().get();
+ }
+ collector.collect(new Text(key.toString()), new IntWritable(count));
+ }
+ }
+
+ private static class NewReducer extends
+ org.apache.hadoop.mapreduce.Reducer<Text, IntWritable, Text, IntWritable> {
+ @Override
+ protected void reduce(Text key, Iterable<IntWritable> values,
+ Context context) throws IOException, InterruptedException {
+ int count = 0;
+ for (IntWritable value : values) {
+ count += value.get();
+ }
+ context.write(new Text(key.toString()), new IntWritable(count));
+ }
+ }
+}