You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2015/02/18 23:48:52 UTC

tez git commit: TEZ-167. Create tests for MR Combiner. (Devaraj K and Tsuyoshi Ozawa via hitesh)

Repository: tez
Updated Branches:
  refs/heads/master 269905b11 -> d78846937


TEZ-167. Create tests for MR Combiner. (Devaraj K and Tsuyoshi Ozawa via hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d7884693
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d7884693
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d7884693

Branch: refs/heads/master
Commit: d78846937c463cd882593247d071a7f2b716199d
Parents: 269905b
Author: Hitesh Shah <hi...@apache.org>
Authored: Wed Feb 18 14:47:59 2015 -0800
Committer: Hitesh Shah <hi...@apache.org>
Committed: Wed Feb 18 14:47:59 2015 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../tez/mapreduce/combine/TestMRCombiner.java   | 188 +++++++++++++++++++
 2 files changed, 189 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/d7884693/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ff16163..b1c8c99 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-167. Create tests for MR Combiner.
   TEZ-2080. Localclient should be using tezconf in init instead of yarnconf.
   TEZ-2072. Add missing Private annotation to createDAG in the DAG API class.
   TEZ-2095. master branch fails to compile against hadoop-2.4.

http://git-wip-us.apache.org/repos/asf/tez/blob/d7884693/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/combine/TestMRCombiner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/combine/TestMRCombiner.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/combine/TestMRCombiner.java
new file mode 100644
index 0000000..a92f8dd
--- /dev/null
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/combine/TestMRCombiner.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.combine;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.api.TaskContext;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestMRCombiner {
+
+  @Test
+  public void testRunOldCombiner() throws IOException, InterruptedException {
+    TezConfiguration conf = new TezConfiguration();
+    setKeyAndValueClassTypes(conf);
+    conf.setClass("mapred.combiner.class", OldReducer.class, Object.class);
+    TaskContext taskContext = getTaskContext(conf);
+    MRCombiner combiner = new MRCombiner(taskContext);
+    Writer writer = Mockito.mock(Writer.class);
+    combiner.combine(new TezRawKeyValueIteratorTest(), writer);
+    // verify combiner output keys and values
+    verifyKeyAndValues(writer);
+  }
+
+  @Test
+  public void testRunNewCombiner() throws IOException, InterruptedException {
+    TezConfiguration conf = new TezConfiguration();
+    setKeyAndValueClassTypes(conf);
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.setClass(MRJobConfig.COMBINE_CLASS_ATTR, NewReducer.class,
+        Object.class);
+    TaskContext taskContext = getTaskContext(conf);
+    MRCombiner combiner = new MRCombiner(taskContext);
+    Writer writer = Mockito.mock(Writer.class);
+    combiner.combine(new TezRawKeyValueIteratorTest(), writer);
+    // verify combiner output keys and values
+    verifyKeyAndValues(writer);
+  }
+
+  private void setKeyAndValueClassTypes(TezConfiguration conf) {
+    conf.setClass(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS,
+        Text.class, Object.class);
+    conf.setClass(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS,
+        IntWritable.class, Object.class);
+  }
+
+  private TaskContext getTaskContext(TezConfiguration conf)
+      throws IOException {
+    UserPayload payload = TezUtils.createUserPayloadFromConf(conf);
+    TaskContext taskContext = Mockito.mock(InputContext.class);
+    Mockito.when(taskContext.getUserPayload()).thenReturn(payload);
+    Mockito.when(taskContext.getCounters()).thenReturn(new TezCounters());
+    Mockito.when(taskContext.getApplicationId()).thenReturn(
+        ApplicationId.newInstance(123456, 1));
+    return taskContext;
+  }
+
+  private void verifyKeyAndValues(Writer writer) throws IOException {
+    Mockito.verify(writer, Mockito.atLeastOnce()).append(new Text("tez"),
+        new IntWritable(3));
+    Mockito.verify(writer, Mockito.atLeastOnce()).append(new Text("apache"),
+        new IntWritable(1));
+    Mockito.verify(writer, Mockito.atLeastOnce()).append(new Text("hadoop"),
+        new IntWritable(2));
+  }
+
+  private static class TezRawKeyValueIteratorTest implements
+      TezRawKeyValueIterator {
+
+    private int i = -1;
+    private String[] keys = { "tez", "tez", "tez", "apache", "hadoop", "hadoop" };
+
+    @Override
+    public boolean next() throws IOException {
+      if (i++ < keys.length - 1) {
+        return true;
+      }
+      return false;
+    }
+
+    @Override
+    public DataInputBuffer getValue() throws IOException {
+      DataInputBuffer value = new DataInputBuffer();
+      IntWritable intValue = new IntWritable(1);
+      DataOutputBuffer out = new DataOutputBuffer();
+      intValue.write(out);
+      value.reset(out.getData(), out.getLength());
+      return value;
+    }
+
+    @Override
+    public Progress getProgress() {
+      return null;
+    }
+
+    @Override
+    public boolean isSameKey() throws IOException {
+      return false;
+    }
+
+    @Override
+    public DataInputBuffer getKey() throws IOException {
+      DataInputBuffer key = new DataInputBuffer();
+      Text text = new Text(keys[i]);
+      DataOutputBuffer out = new DataOutputBuffer();
+      text.write(out);
+      key.reset(out.getData(), out.getLength());
+      return key;
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+  }
+
+  private static class OldReducer implements
+      Reducer<Text, IntWritable, Text, IntWritable> {
+
+    @Override
+    public void configure(JobConf arg0) {
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public void reduce(Text key, Iterator<IntWritable> value,
+        OutputCollector<Text, IntWritable> collector, Reporter reporter)
+        throws IOException {
+      int count = 0;
+      while (value.hasNext()) {
+        count += value.next().get();
+      }
+      collector.collect(new Text(key.toString()), new IntWritable(count));
+    }
+  }
+
+  private static class NewReducer extends
+      org.apache.hadoop.mapreduce.Reducer<Text, IntWritable, Text, IntWritable> {
+    @Override
+    protected void reduce(Text key, Iterable<IntWritable> values,
+        Context context) throws IOException, InterruptedException {
+      int count = 0;
+      for (IntWritable value : values) {
+        count += value.get();
+      }
+      context.write(new Text(key.toString()), new IntWritable(count));
+    }
+  }
+}