You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2014/07/17 19:45:01 UTC

svn commit: r1611413 [17/18] - in /hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client: ./ hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client-nativetask/ hadoop-mapreduce-client-nati...

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/TestInput.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/TestInput.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/TestInput.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/TestInput.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.testutil;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.io.BytesWritable;
+
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class TestInput {
+
+  public static class KV<K, V> {
+    public K key;
+    public V value;
+  }
+
+  public static char[] CHAR_SET = new char[] { 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N',
+    'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j',
+    'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '0', '1', '2', '3', '4', '5',
+    '6', '7', '8', '9', '*', '/' };
+
+  public static KV[] getMapInputs(int size) {
+
+    final KV[] dataInput = new KV[size];
+
+    for (int i = 0; i < size; i++) {
+      dataInput[i] = getSingleMapInput(i);
+    }
+    return dataInput;
+  }
+
+  private static KV getSingleMapInput(int i) {
+    final char character = CHAR_SET[i % CHAR_SET.length];
+    final byte b = (byte) character;
+
+    final byte[] bytes = new byte[i];
+    Arrays.fill(bytes, b);
+    final BytesWritable result = new BytesWritable(bytes);
+    final KV kv = new KV();
+    kv.key = result;
+    kv.value = result;
+    return kv;
+  }
+
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/utils/TestBytesUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/utils/TestBytesUtil.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/utils/TestBytesUtil.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/utils/TestBytesUtil.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.utils;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.mapred.nativetask.util.BytesUtil;
+
+@SuppressWarnings({ "deprecation" })
+public class TestBytesUtil extends TestCase {
+
+  public void testBytesStringConversion() {
+
+    final String str = "I am good!";
+    final byte[] bytes = BytesUtil.toBytes(str);
+
+    Assert.assertEquals(str, BytesUtil.fromBytes(bytes));
+ }
+
+  public void testBytesIntConversion() {
+    final int a = 1000;
+    final byte[] intBytes = BytesUtil.toBytes(a);
+
+    Assert.assertEquals(a, BytesUtil.toInt(intBytes));
+  }
+
+  public void testBytesLongConversion() {
+    final long l = 1000000L;
+    final byte[] longBytes = BytesUtil.toBytes(l);
+
+    Assert.assertEquals(l, BytesUtil.toLong(longBytes));
+  }
+
+  public void testBytesFloatConversion() {
+    final float f = 3.14f;
+    final byte[] floatBytes = BytesUtil.toBytes(f);
+
+    Assert.assertEquals(f, BytesUtil.toFloat(floatBytes));
+  }
+
+  public void testBytesDoubleConversion() {
+    final double d = 3.14;
+    final byte[] doubleBytes = BytesUtil.toBytes(d);
+
+    Assert.assertEquals(d, BytesUtil.toDouble(doubleBytes));
+  }
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/utils/TestReadWriteBuffer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/utils/TestReadWriteBuffer.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/utils/TestReadWriteBuffer.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/utils/TestReadWriteBuffer.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.utils;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer;
+import org.junit.Assert;
+
+public class TestReadWriteBuffer extends TestCase {
+
+  private static byte[] bytes = new byte[] { '0', 'a', 'b', 'c', 'd', '9' };
+
+  public void testReadWriteBuffer() {
+
+    final ReadWriteBuffer buffer = new ReadWriteBuffer();
+
+    Assert.assertFalse(buffer.getBuff() == null);
+
+    Assert.assertEquals(buffer.getWritePoint(), 0);
+    Assert.assertEquals(buffer.getReadPoint(), 0);
+
+    buffer.writeInt(3);
+
+    buffer.writeString("goodboy");
+
+    buffer.writeLong(10L);
+    buffer.writeBytes(bytes, 0, bytes.length);
+    buffer.writeLong(100L);
+
+    Assert.assertEquals(buffer.getWritePoint(), 41);
+    Assert.assertEquals(buffer.getReadPoint(), 0);
+    Assert.assertTrue(buffer.getBuff().length >= 41);
+
+    Assert.assertEquals(buffer.readInt(), 3);
+    Assert.assertEquals(buffer.readString(), "goodboy");
+    Assert.assertEquals(buffer.readLong(), 10L);
+
+    final byte[] read = buffer.readBytes();
+    for (int i = 0; i < bytes.length; i++) {
+      Assert.assertEquals(bytes[i], read[i]);
+    }
+
+    Assert.assertEquals(100L, buffer.readLong());
+    Assert.assertEquals(41, buffer.getReadPoint());
+  }
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/utils/TestSizedWritable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/utils/TestSizedWritable.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/utils/TestSizedWritable.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/utils/TestSizedWritable.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.utils;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.nativetask.util.SizedWritable;
+import org.junit.Assert;
+
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class TestSizedWritable extends TestCase {
+
+  public void testSizedWritable() {
+    final SizedWritable w = new SizedWritable(BytesWritable.class);
+    Assert.assertTrue(w.length == SizedWritable.INVALID_LENGTH);
+    Assert.assertFalse(w.v == null);
+  }
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/data/testGlibcBugSpill.out
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/data/testGlibcBugSpill.out?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/data/testGlibcBugSpill.out (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/data/testGlibcBugSpill.out Thu Jul 17 17:44:55 2014
@@ -0,0 +1,2 @@

[... 4 lines stripped ...]
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/combinertest/CombinerTest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/combinertest/CombinerTest.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/combinertest/CombinerTest.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/combinertest/CombinerTest.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.combinertest;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapred.nativetask.combinertest.WordCount.IntSumReducer;
+import org.apache.hadoop.mapred.nativetask.combinertest.WordCount.TokenizerMapper;
+import org.apache.hadoop.mapred.nativetask.kvtest.TestInputFile;
+import org.apache.hadoop.mapred.nativetask.testutil.ResultVerifier;
+import org.apache.hadoop.mapred.nativetask.testutil.ScenarioConfiguration;
+import org.apache.hadoop.mapred.nativetask.testutil.TestConstants;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CombinerTest {
+  private FileSystem fs;
+  private String inputpath;
+  private String nativeoutputpath;
+  private String hadoopoutputpath;
+
+  @Test
+  public void testWordCountCombiner() {
+    try {
+
+      final Configuration nativeConf = ScenarioConfiguration.getNativeConfiguration();
+      nativeConf.addResource(TestConstants.COMBINER_CONF_PATH);
+      final Job nativejob = getJob("nativewordcount", nativeConf, inputpath, nativeoutputpath);
+
+      final Configuration commonConf = ScenarioConfiguration.getNormalConfiguration();
+      commonConf.addResource(TestConstants.COMBINER_CONF_PATH);
+
+      final Job normaljob = getJob("normalwordcount", commonConf, inputpath, hadoopoutputpath);
+
+      nativejob.waitForCompletion(true);
+            
+      Counter nativeReduceGroups = nativejob.getCounters().findCounter(Task.Counter.REDUCE_INPUT_RECORDS);
+      
+      normaljob.waitForCompletion(true);
+      Counter normalReduceGroups = normaljob.getCounters().findCounter(Task.Counter.REDUCE_INPUT_RECORDS);
+       
+      assertEquals(true, ResultVerifier.verify(nativeoutputpath, hadoopoutputpath));
+      assertEquals("Native Reduce reduce group counter should equal orignal reduce group counter", 
+          nativeReduceGroups.getValue(), normalReduceGroups.getValue());
+      
+    } catch (final Exception e) {
+      e.printStackTrace();
+      assertEquals("run exception", true, false);
+    }
+  }
+
+  @Before
+  public void startUp() throws Exception {
+    final ScenarioConfiguration conf = new ScenarioConfiguration();
+    conf.addcombinerConf();
+
+    this.fs = FileSystem.get(conf);
+
+    this.inputpath = conf.get(TestConstants.NATIVETASK_TEST_COMBINER_INPUTPATH_KEY,
+        TestConstants.NATIVETASK_TEST_COMBINER_INPUTPATH_DEFAULTV) + "/wordcount";
+
+    if (!fs.exists(new Path(inputpath))) {
+      new TestInputFile(
+          conf.getInt(TestConstants.NATIVETASK_COMBINER_WORDCOUNT_FILESIZE, 1000000),
+          Text.class.getName(),
+          Text.class.getName(), conf).createSequenceTestFile(inputpath, 1, (byte)('a'));
+    }
+
+    this.nativeoutputpath = conf.get(TestConstants.NATIVETASK_TEST_COMBINER_OUTPUTPATH,
+        TestConstants.NATIVETASK_TEST_COMBINER_OUTPUTPATH_DEFAULTV) + "/nativewordcount";
+    this.hadoopoutputpath = conf.get(TestConstants.NORMAL_TEST_COMBINER_OUTPUTPATH,
+        TestConstants.NORMAL_TEST_COMBINER_OUTPUTPATH_DEFAULTV) + "/normalwordcount";
+  }
+
+  protected static Job getJob(String jobname, Configuration inputConf, String inputpath, String outputpath)
+      throws Exception {
+    final Configuration conf = new Configuration(inputConf);
+    conf.set("fileoutputpath", outputpath);
+    final FileSystem fs = FileSystem.get(conf);
+    if (fs.exists(new Path(outputpath))) {
+      fs.delete(new Path(outputpath));
+    }
+    fs.close();
+    final Job job = new Job(conf, jobname);
+    job.setJarByClass(WordCount.class);
+    job.setMapperClass(TokenizerMapper.class);
+    job.setCombinerClass(IntSumReducer.class);
+    job.setReducerClass(IntSumReducer.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(IntWritable.class);
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    FileInputFormat.addInputPath(job, new Path(inputpath));
+    FileOutputFormat.setOutputPath(job, new Path(outputpath));
+    return job;
+  }
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/combinertest/LargeKVCombinerTest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/combinertest/LargeKVCombinerTest.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/combinertest/LargeKVCombinerTest.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/combinertest/LargeKVCombinerTest.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.combinertest;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapred.nativetask.kvtest.TestInputFile;
+import org.apache.hadoop.mapred.nativetask.testutil.ResultVerifier;
+import org.apache.hadoop.mapred.nativetask.testutil.ScenarioConfiguration;
+import org.apache.hadoop.mapred.nativetask.testutil.TestConstants;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.Test;
+
+public class LargeKVCombinerTest {
+
+  @Test
+  public void testLargeValueCombiner(){
+    final Configuration normalConf = ScenarioConfiguration.getNormalConfiguration();
+    final Configuration nativeConf = ScenarioConfiguration.getNativeConfiguration();
+    normalConf.addResource(TestConstants.COMBINER_CONF_PATH);
+    nativeConf.addResource(TestConstants.COMBINER_CONF_PATH);
+    final int deafult_KVSize_Maximum = 1 << 22; // 4M
+    final int KVSize_Maximu = normalConf.getInt(TestConstants.NATIVETASK_KVSIZE_MAX_LARGEKV_TEST,
+        deafult_KVSize_Maximum);
+    final String inputPath = normalConf.get(TestConstants.NATIVETASK_TEST_COMBINER_INPUTPATH_KEY,
+        TestConstants.NATIVETASK_TEST_COMBINER_INPUTPATH_DEFAULTV) + "/largeKV";
+    final String nativeOutputPath = normalConf.get(TestConstants.NATIVETASK_TEST_COMBINER_OUTPUTPATH,
+        TestConstants.NATIVETASK_TEST_COMBINER_OUTPUTPATH_DEFAULTV) + "/nativeLargeKV";
+    final String hadoopOutputPath = normalConf.get(TestConstants.NORMAL_TEST_COMBINER_OUTPUTPATH,
+        TestConstants.NORMAL_TEST_COMBINER_OUTPUTPATH_DEFAULTV) + "/normalLargeKV";
+    try {
+      final FileSystem fs = FileSystem.get(normalConf);
+      for (int i = 65536; i <= KVSize_Maximu; i *= 4) {
+        
+        int max = i;
+        int min = Math.max(i / 4, max - 10);
+        
+        System.out.println("===KV Size Test: min size: " + min + ", max size: " + max);
+        
+        normalConf.set(TestConstants.NATIVETASK_KVSIZE_MIN, String.valueOf(min));
+        normalConf.set(TestConstants.NATIVETASK_KVSIZE_MAX, String.valueOf(max));
+        nativeConf.set(TestConstants.NATIVETASK_KVSIZE_MIN, String.valueOf(min));
+        nativeConf.set(TestConstants.NATIVETASK_KVSIZE_MAX, String.valueOf(max));
+        fs.delete(new Path(inputPath), true);
+        new TestInputFile(normalConf.getInt(TestConstants.NATIVETASK_COMBINER_WORDCOUNT_FILESIZE,
+            1000000), IntWritable.class.getName(),
+            Text.class.getName(), normalConf).createSequenceTestFile(inputPath, 1);
+        
+        final Job normaljob = CombinerTest.getJob("normalwordcount", normalConf, inputPath, hadoopOutputPath);
+        final Job nativejob = CombinerTest.getJob("nativewordcount", nativeConf, inputPath, nativeOutputPath);
+        
+        nativejob.waitForCompletion(true);
+        Counter nativeReduceGroups = nativejob.getCounters().findCounter(Task.Counter.REDUCE_INPUT_RECORDS);
+        
+        normaljob.waitForCompletion(true);
+        Counter normalReduceGroups = normaljob.getCounters().findCounter(Task.Counter.REDUCE_INPUT_RECORDS);
+        
+        final boolean compareRet = ResultVerifier.verify(nativeOutputPath, hadoopOutputPath);
+        
+        final String reason = "LargeKVCombinerTest failed with, min size: " + min
+            + ", max size: " + max + ", normal out: " + hadoopOutputPath + ", native Out: " + nativeOutputPath;
+        
+        assertEquals(reason, true, compareRet);
+//        assertEquals("Native Reduce reduce group counter should equal orignal reduce group counter", 
+//            nativeReduceGroups.getValue(), normalReduceGroups.getValue());
+      }
+      fs.close();
+    } catch (final Exception e) {
+      e.printStackTrace();
+      assertEquals("run exception", true, false);
+    }
+  }
+  
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/combinertest/OldAPICombinerTest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/combinertest/OldAPICombinerTest.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/combinertest/OldAPICombinerTest.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/combinertest/OldAPICombinerTest.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.combinertest;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.mapred.nativetask.kvtest.TestInputFile;
+import org.apache.hadoop.mapred.nativetask.testutil.ResultVerifier;
+import org.apache.hadoop.mapred.nativetask.testutil.ScenarioConfiguration;
+import org.apache.hadoop.mapred.nativetask.testutil.TestConstants;
+import org.apache.hadoop.mapreduce.Counter;
+import org.junit.Before;
+import org.junit.Test;
+
+public class OldAPICombinerTest {
+  private FileSystem fs;
+  private String inputpath;
+
+  @Test
+  public void testWordCountCombinerWithOldAPI() throws Exception {
+    final Configuration nativeConf = ScenarioConfiguration.getNativeConfiguration();
+    nativeConf.addResource(TestConstants.COMBINER_CONF_PATH);
+    final String nativeoutput = nativeConf.get(TestConstants.OLDAPI_NATIVETASK_TEST_COMBINER_OUTPUTPATH);
+    final JobConf nativeJob = getOldAPIJobconf(nativeConf, "nativeCombinerWithOldAPI", inputpath, nativeoutput);
+    RunningJob nativeRunning = JobClient.runJob(nativeJob);
+
+    Counter nativeReduceGroups = nativeRunning.getCounters().findCounter(Task.Counter.REDUCE_INPUT_RECORDS);
+    
+    final Configuration normalConf = ScenarioConfiguration.getNormalConfiguration();
+    normalConf.addResource(TestConstants.COMBINER_CONF_PATH);
+    final String normaloutput = normalConf.get(TestConstants.OLDAPI_NORMAL_TEST_COMBINER_OUTPUTPATH);
+    final JobConf normalJob = getOldAPIJobconf(normalConf, "normalCombinerWithOldAPI", inputpath, normaloutput);
+    
+    RunningJob normalRunning = JobClient.runJob(normalJob);
+    Counter normalReduceGroups = normalRunning.getCounters().findCounter(Task.Counter.REDUCE_INPUT_RECORDS);
+    
+    final boolean compareRet = ResultVerifier.verify(nativeoutput, normaloutput);
+    assertEquals("file compare result: if they are the same ,then return true", true, compareRet);
+    
+    assertEquals("The input reduce record count must be same", nativeReduceGroups.getValue(), normalReduceGroups.getValue());
+  }
+
+  @Before
+  public void startUp() throws Exception {
+    final ScenarioConfiguration conf = new ScenarioConfiguration();
+    conf.addcombinerConf();
+    this.fs = FileSystem.get(conf);
+    this.inputpath = conf.get(TestConstants.NATIVETASK_TEST_COMBINER_INPUTPATH_KEY,
+        TestConstants.NATIVETASK_TEST_COMBINER_INPUTPATH_DEFAULTV) + "/wordcount";
+
+    if (!fs.exists(new Path(inputpath))) {
+      new TestInputFile(conf.getInt("nativetask.combiner.wordcount.filesize", 1000000), Text.class.getName(),
+          Text.class.getName(), conf).createSequenceTestFile(inputpath, 1, (byte)('a'));
+    }
+  }
+
+  private static JobConf getOldAPIJobconf(Configuration configuration, String name, String input, String output)
+      throws Exception {
+    final JobConf jobConf = new JobConf(configuration);
+    final FileSystem fs = FileSystem.get(configuration);
+    if (fs.exists(new Path(output))) {
+      fs.delete(new Path(output), true);
+    }
+    fs.close();
+    jobConf.setJobName(name);
+    jobConf.setOutputKeyClass(Text.class);
+    jobConf.setOutputValueClass(IntWritable.class);
+    jobConf.setMapperClass(WordCountWithOldAPI.TokenizerMapperWithOldAPI.class);
+    jobConf.setCombinerClass(WordCountWithOldAPI.IntSumReducerWithOldAPI.class);
+    jobConf.setReducerClass(WordCountWithOldAPI.IntSumReducerWithOldAPI.class);
+
+    jobConf.setInputFormat(SequenceFileInputFormat.class);
+    jobConf.setOutputFormat(TextOutputFormat.class);
+
+    FileInputFormat.setInputPaths(jobConf, new Path(input));
+    FileOutputFormat.setOutputPath(jobConf, new Path(output));
+    return jobConf;
+  }
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/combinertest/WordCount.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/combinertest/WordCount.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/combinertest/WordCount.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/combinertest/WordCount.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.combinertest;
+
+import java.io.IOException;
+import java.util.StringTokenizer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.GenericOptionsParser;
+
+public class WordCount {
+
+  private static Log LOG = LogFactory.getLog(WordCount.class);
+  
+  public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
+
+    private final static IntWritable one = new IntWritable(1);
+    private final Text word = new Text();
+
+    @Override
+    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
+      final StringTokenizer itr = new StringTokenizer(value.toString());
+      while (itr.hasMoreTokens()) {
+        word.set(itr.nextToken());
+        context.write(word, one);
+      }
+    }
+  }
+
+  public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
+    private final IntWritable result = new IntWritable();
+
+    @Override
+    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,
+    InterruptedException {
+      int sum = 0;
+      for (final IntWritable val : values) {
+        sum += val.get();
+      }
+      result.set(sum);
+      context.write(key, result);
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    final Configuration conf = new Configuration();
+    final String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+    if (otherArgs.length != 2) {
+      System.err.println("Usage: wordcount <in> <out>");
+      System.exit(2);
+    }
+    final Job job = new Job(conf, conf.get(MRJobConfig.JOB_NAME, "word count"));
+    job.setJarByClass(WordCount.class);
+    job.setMapperClass(TokenizerMapper.class);
+    job.setCombinerClass(IntSumReducer.class);
+    job.setReducerClass(IntSumReducer.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(IntWritable.class);
+    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
+    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
+    System.exit(job.waitForCompletion(true) ? 0 : 1);
+  }
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/combinertest/WordCountWithOldAPI.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/combinertest/WordCountWithOldAPI.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/combinertest/WordCountWithOldAPI.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/combinertest/WordCountWithOldAPI.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.combinertest;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+public class WordCountWithOldAPI {
+
+  public static class TokenizerMapperWithOldAPI extends MapReduceBase implements
+  Mapper<Object, Text, Text, IntWritable> {
+    private final static IntWritable one = new IntWritable(1);
+    private final Text word = new Text();
+
+    @Override
+    public void map(Object key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter)
+        throws IOException {
+      final StringTokenizer itr = new StringTokenizer(value.toString());
+      while (itr.hasMoreTokens()) {
+        word.set(itr.nextToken());
+        output.collect(word, one);
+      }
+    }
+  }
+
+  public static class IntSumReducerWithOldAPI extends MapReduceBase implements
+  Reducer<Text, IntWritable, Text, IntWritable> {
+    private final IntWritable result = new IntWritable();
+
+    @Override
+    public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output,
+        Reporter reporter) throws IOException {
+      int sum = 0;
+      while (values.hasNext()) {
+        sum += values.next().get();
+      }
+      result.set(sum);
+      output.collect(key, result);
+    }
+  }
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/compresstest/CompressMapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/compresstest/CompressMapper.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/compresstest/CompressMapper.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/compresstest/CompressMapper.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.compresstest;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.nativetask.testutil.ScenarioConfiguration;
+import org.apache.hadoop.mapred.nativetask.testutil.TestConstants;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+public class CompressMapper {
+  public static final String inputFile = "./compress/input.txt";
+  public static final String outputFileDir = "./compress/output/";
+
+  public static class TextCompressMapper extends Mapper<Text, Text, Text, Text> {
+
+    @Override
+    protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
+       context.write(key, value);
+    }
+  }
+
+  public static Job getCompressJob(String jobname, Configuration conf) {
+    Job job = null;
+    try {
+      job = new Job(conf, jobname + "-CompressMapperJob");
+      job.setJarByClass(CompressMapper.class);
+      job.setMapperClass(TextCompressMapper.class);
+      job.setOutputKeyClass(Text.class);
+      job.setMapOutputValueClass(Text.class);
+      final Path outputpath = new Path(outputFileDir + jobname);
+      // if output file exists ,delete it
+      final FileSystem hdfs = FileSystem.get(new ScenarioConfiguration());
+      if (hdfs.exists(outputpath)) {
+        hdfs.delete(outputpath);
+      }
+      hdfs.close();
+      job.setInputFormatClass(SequenceFileInputFormat.class);
+      FileInputFormat.addInputPath(job, new Path(inputFile));
+      FileOutputFormat.setOutputPath(job, outputpath);
+    } catch (final Exception e) {
+      e.printStackTrace();
+    }
+    return job;
+  }
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/compresstest/CompressTest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/compresstest/CompressTest.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/compresstest/CompressTest.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/compresstest/CompressTest.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.compresstest;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.nativetask.kvtest.TestInputFile;
+import org.apache.hadoop.mapred.nativetask.testutil.ResultVerifier;
+import org.apache.hadoop.mapred.nativetask.testutil.ScenarioConfiguration;
+import org.apache.hadoop.mapred.nativetask.testutil.TestConstants;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CompressTest {
+
+  @Test
+  public void testSnappyCompress() throws Exception {
+    final Configuration conf = ScenarioConfiguration.getNativeConfiguration();
+    conf.addResource(TestConstants.SNAPPY_COMPRESS_CONF_PATH);
+    final Job job = CompressMapper.getCompressJob("nativesnappy", conf);
+    job.waitForCompletion(true);
+
+    final Configuration hadoopconf = ScenarioConfiguration.getNormalConfiguration();
+    hadoopconf.addResource(TestConstants.SNAPPY_COMPRESS_CONF_PATH);
+    final Job hadoopjob = CompressMapper.getCompressJob("hadoopsnappy", hadoopconf);
+    hadoopjob.waitForCompletion(true);
+
+    final boolean compareRet = ResultVerifier.verify(CompressMapper.outputFileDir + "nativesnappy",
+        CompressMapper.outputFileDir + "hadoopsnappy");
+    assertEquals("file compare result: if they are the same ,then return true", true, compareRet);
+  }
+
+  @Test
+  public void testGzipCompress() throws Exception {
+    final Configuration conf = ScenarioConfiguration.getNativeConfiguration();
+    conf.addResource(TestConstants.GZIP_COMPRESS_CONF_PATH);
+    final Job job = CompressMapper.getCompressJob("nativegzip", conf);
+    job.waitForCompletion(true);
+
+    final Configuration hadoopconf = ScenarioConfiguration.getNormalConfiguration();
+    hadoopconf.addResource(TestConstants.GZIP_COMPRESS_CONF_PATH);
+    final Job hadoopjob = CompressMapper.getCompressJob("hadoopgzip", hadoopconf);
+    hadoopjob.waitForCompletion(true);
+
+    final boolean compareRet = ResultVerifier.verify(CompressMapper.outputFileDir + "nativegzip",
+        CompressMapper.outputFileDir + "hadoopgzip");
+    assertEquals("file compare result: if they are the same ,then return true", true, compareRet);
+  }
+
+  @Test
+  public void testBzip2Compress() throws Exception {
+    final Configuration nativeconf = ScenarioConfiguration.getNativeConfiguration();
+    nativeconf.addResource(TestConstants.BZIP2_COMPRESS_CONF_PATH);
+    final Job nativejob = CompressMapper.getCompressJob("nativebzip2", nativeconf);
+    nativejob.waitForCompletion(true);
+
+    final Configuration hadoopconf = ScenarioConfiguration.getNormalConfiguration();
+    hadoopconf.addResource(TestConstants.BZIP2_COMPRESS_CONF_PATH);
+    final Job hadoopjob = CompressMapper.getCompressJob("hadoopbzip2", hadoopconf);
+    hadoopjob.waitForCompletion(true);
+
+    final boolean compareRet = ResultVerifier.verify(CompressMapper.outputFileDir + "nativebzip2",
+        CompressMapper.outputFileDir + "hadoopbzip2");
+    assertEquals("file compare result: if they are the same ,then return true", true, compareRet);
+  }
+
+  @Test
+  public void testLz4Compress() throws Exception {
+    final Configuration nativeConf = ScenarioConfiguration.getNativeConfiguration();
+    nativeConf.addResource(TestConstants.LZ4_COMPRESS_CONF_PATH);
+    final Job nativeJob = CompressMapper.getCompressJob("nativelz4", nativeConf);
+    nativeJob.waitForCompletion(true);
+
+    final Configuration hadoopConf = ScenarioConfiguration.getNormalConfiguration();
+    hadoopConf.addResource(TestConstants.LZ4_COMPRESS_CONF_PATH);
+    final Job hadoopJob = CompressMapper.getCompressJob("hadooplz4", hadoopConf);
+    hadoopJob.waitForCompletion(true);
+    final boolean compareRet = ResultVerifier.verify(CompressMapper.outputFileDir + "nativelz4",
+        CompressMapper.outputFileDir + "hadooplz4");
+    assertEquals("file compare result: if they are the same ,then return true", true, compareRet);
+  }
+
+  @Test
+  public void testDefaultCompress() throws Exception {
+    final Configuration nativeConf = ScenarioConfiguration.getNativeConfiguration();
+    nativeConf.addResource(TestConstants.DEFAULT_COMPRESS_CONF_PATH);
+    final Job nativeJob = CompressMapper.getCompressJob("nativedefault", nativeConf);
+    nativeJob.waitForCompletion(true);
+
+    final Configuration hadoopConf = ScenarioConfiguration.getNormalConfiguration();
+    hadoopConf.addResource(TestConstants.DEFAULT_COMPRESS_CONF_PATH);
+    final Job hadoopJob = CompressMapper.getCompressJob("hadoopdefault", hadoopConf);
+    hadoopJob.waitForCompletion(true);
+    final boolean compareRet = ResultVerifier.verify(CompressMapper.outputFileDir + "nativedefault",
+        CompressMapper.outputFileDir + "hadoopdefault");
+    assertEquals("file compare result: if they are the same ,then return true", true, compareRet);
+  }
+
+  @Before
+  public void startUp() throws Exception {
+    final ScenarioConfiguration conf = new ScenarioConfiguration();
+    final FileSystem fs = FileSystem.get(conf);
+    final Path path = new Path(CompressMapper.inputFile);
+    fs.delete(path);
+    if (!fs.exists(path)) {
+      new TestInputFile(ScenarioConfiguration.getNormalConfiguration().getInt(
+          TestConstants.NATIVETASK_COMPRESS_FILESIZE, 100000),
+          Text.class.getName(), Text.class.getName(), conf)
+      .createSequenceTestFile(CompressMapper.inputFile);
+
+    }
+    fs.close();
+  }
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/kvtest/HashSumReducer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/kvtest/HashSumReducer.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/kvtest/HashSumReducer.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/kvtest/HashSumReducer.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.kvtest;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Reducer;
+
+public class HashSumReducer<KTYPE, VTYPE> extends Reducer<KTYPE, VTYPE, KTYPE, IntWritable> {
+
+  ByteArrayOutputStream os = new ByteArrayOutputStream();
+  DataOutputStream dos = new DataOutputStream(os);
+
+  @Override
+  public void reduce(KTYPE key, Iterable<VTYPE> values, Context context) throws IOException, InterruptedException {
+    int hashSum = 0;
+    for (final VTYPE val : values) {
+      if (val instanceof Writable) {
+        os.reset();
+        ((Writable) val).write(dos);
+        final int hash = Arrays.hashCode(os.toByteArray());
+        hashSum += hash;
+      }
+    }
+
+    context.write(key, new IntWritable(hashSum));
+  }
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/kvtest/KVJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/kvtest/KVJob.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/kvtest/KVJob.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/kvtest/KVJob.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.kvtest;
+
+import java.io.IOException;
+import java.util.zip.CRC32;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.nativetask.testutil.BytesFactory;
+import org.apache.hadoop.mapred.nativetask.testutil.TestConstants;
+import org.apache.hadoop.mapred.nativetask.util.BytesUtil;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+public class KVJob {
+  public static final String INPUTPATH = "nativetask.kvtest.inputfile.path";
+  public static final String OUTPUTPATH = "nativetask.kvtest.outputfile.path";
+  Job job = null;
+
+  public static class ValueMapper<KTYPE, VTYPE> extends Mapper<KTYPE, VTYPE, KTYPE, VTYPE> {
+    @Override
+    public void map(KTYPE key, VTYPE value, Context context) throws IOException, InterruptedException {
+      context.write(key, value);
+    }
+  }
+
+  public static class KVMReducer<KTYPE, VTYPE> extends Reducer<KTYPE, VTYPE, KTYPE, VTYPE> {
+    public void reduce(KTYPE key, VTYPE value, Context context) throws IOException, InterruptedException {
+      context.write(key, value);
+    }
+  }
+
+  public static class KVReducer<KTYPE, VTYPE> extends Reducer<KTYPE, VTYPE, KTYPE, VTYPE> {
+
+    @Override
+    public void reduce(KTYPE key, Iterable<VTYPE> values, Context context) throws IOException, InterruptedException {
+      long resultlong = 0;// 8 bytes match BytesFactory.fromBytes function
+      final CRC32 crc32 = new CRC32();
+      for (final VTYPE val : values) {
+        crc32.reset();
+        crc32.update(BytesFactory.toBytes(val));
+        resultlong += crc32.getValue();
+      }
+      final VTYPE V = null;
+      context.write(key, (VTYPE) BytesFactory.newObject(BytesUtil.toBytes(resultlong), V.getClass().getName()));
+    }
+  }
+
+  public KVJob(String jobname, Configuration conf, Class<?> keyclass, Class<?> valueclass, String inputpath,
+      String outputpath) throws Exception {
+    job = new Job(conf, jobname);
+    job.setJarByClass(KVJob.class);
+    job.setMapperClass(KVJob.ValueMapper.class);
+    job.setOutputKeyClass(keyclass);
+    job.setMapOutputValueClass(valueclass);
+    
+    if (conf.get(TestConstants.NATIVETASK_KVTEST_CREATEFILE).equals("true")) {
+      final FileSystem fs = FileSystem.get(conf);
+      fs.delete(new Path(inputpath), true);
+      fs.close();
+      final TestInputFile testfile = new TestInputFile(Integer.valueOf(conf.get(
+          TestConstants.FILESIZE_KEY, "1000")),
+          keyclass.getName(), valueclass.getName(), conf);
+      testfile.createSequenceTestFile(inputpath);
+
+    }
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    FileInputFormat.addInputPath(job, new Path(inputpath));
+    FileOutputFormat.setOutputPath(job, new Path(outputpath));
+  }
+
+  public void runJob() throws Exception {
+
+    job.waitForCompletion(true);
+  }
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/kvtest/KVTest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/kvtest/KVTest.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/kvtest/KVTest.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/kvtest/KVTest.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.kvtest;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.nativetask.testutil.ResultVerifier;
+import org.apache.hadoop.mapred.nativetask.testutil.ScenarioConfiguration;
+import org.apache.hadoop.mapred.nativetask.testutil.TestConstants;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class KVTest {
+  private static Class<?>[] keyclasses = null;
+  private static Class<?>[] valueclasses = null;
+  private static String[] keyclassNames = null;
+  private static String[] valueclassNames = null;
+
+  private static Configuration nativekvtestconf = ScenarioConfiguration.getNativeConfiguration();
+  private static Configuration hadoopkvtestconf = ScenarioConfiguration.getNormalConfiguration();
+  static {
+    nativekvtestconf.addResource(TestConstants.KVTEST_CONF_PATH);
+    hadoopkvtestconf.addResource(TestConstants.KVTEST_CONF_PATH);
+  }
+
+  @Parameters(name = "key:{0}\nvalue:{1}")
+  public static Iterable<Class<?>[]> data() {
+    final String valueclassesStr = nativekvtestconf
+        .get(TestConstants.NATIVETASK_KVTEST_VALUECLASSES);
+    System.out.println(valueclassesStr);
+    valueclassNames = valueclassesStr.replaceAll("\\s", "").split(";");// delete
+    // " "
+    final ArrayList<Class<?>> tmpvalueclasses = new ArrayList<Class<?>>();
+    for (int i = 0; i < valueclassNames.length; i++) {
+      try {
+        if (valueclassNames[i].equals("")) {
+          continue;
+        }
+        tmpvalueclasses.add(Class.forName(valueclassNames[i]));
+      } catch (final ClassNotFoundException e) {
+        e.printStackTrace();
+      }
+    }
+    valueclasses = tmpvalueclasses.toArray(new Class[tmpvalueclasses.size()]);
+    final String keyclassesStr = nativekvtestconf.get(TestConstants.NATIVETASK_KVTEST_KEYCLASSES);
+    System.out.println(keyclassesStr);
+    keyclassNames = keyclassesStr.replaceAll("\\s", "").split(";");// delete
+    // " "
+    final ArrayList<Class<?>> tmpkeyclasses = new ArrayList<Class<?>>();
+    for (int i = 0; i < keyclassNames.length; i++) {
+      try {
+        if (keyclassNames[i].equals("")) {
+          continue;
+        }
+        tmpkeyclasses.add(Class.forName(keyclassNames[i]));
+      } catch (final ClassNotFoundException e) {
+        e.printStackTrace();
+      }
+    }
+    keyclasses = tmpkeyclasses.toArray(new Class[tmpkeyclasses.size()]);
+    final Class<?>[][] kvgroup = new Class<?>[keyclassNames.length * valueclassNames.length][2];
+    for (int i = 0; i < keyclassNames.length; i++) {
+      final int tmpindex = i * valueclassNames.length;
+      for (int j = 0; j < valueclassNames.length; j++) {
+        kvgroup[tmpindex + j][0] = keyclasses[i];
+        kvgroup[tmpindex + j][1] = valueclasses[j];
+      }
+    }
+    return Arrays.asList(kvgroup);
+  }
+
+  private final Class<?> keyclass;
+  private final Class<?> valueclass;
+
+  public KVTest(Class<?> keyclass, Class<?> valueclass) {
+    this.keyclass = keyclass;
+    this.valueclass = valueclass;
+
+  }
+
+  @Test
+  public void testKVCompability() {
+
+    try {
+      final String nativeoutput = this.runNativeTest(
+          "Test:" + keyclass.getSimpleName() + "--" + valueclass.getSimpleName(), keyclass, valueclass);
+      final String normaloutput = this.runNormalTest(
+          "Test:" + keyclass.getSimpleName() + "--" + valueclass.getSimpleName(), keyclass, valueclass);
+      final boolean compareRet = ResultVerifier.verify(normaloutput, nativeoutput);
+      final String input = nativekvtestconf.get(TestConstants.NATIVETASK_KVTEST_INPUTDIR) + "/"
+          + keyclass.getName()
+          + "/" + valueclass.getName();
+      if(compareRet){
+        final FileSystem fs = FileSystem.get(hadoopkvtestconf);
+        fs.delete(new Path(nativeoutput), true);
+        fs.delete(new Path(normaloutput), true);
+        fs.delete(new Path(input), true);
+        fs.close();
+      }
+      assertEquals("file compare result: if they are the same ,then return true", true, compareRet);
+    } catch (final IOException e) {
+      assertEquals("test run exception:", null, e);
+    } catch (final Exception e) {
+      assertEquals("test run exception:", null, e);
+    }
+  }
+
+  @Before
+  public void startUp() {
+
+  }
+
+  private String runNativeTest(String jobname, Class<?> keyclass, Class<?> valueclass) throws IOException {
+    final String inputpath = nativekvtestconf.get(TestConstants.NATIVETASK_KVTEST_INPUTDIR) + "/"
+        + keyclass.getName()
+        + "/" + valueclass.getName();
+    final String outputpath = nativekvtestconf.get(TestConstants.NATIVETASK_KVTEST_OUTPUTDIR) + "/"
+        + keyclass.getName() + "/" + valueclass.getName();
+    // if output file exists ,then delete it
+    final FileSystem fs = FileSystem.get(nativekvtestconf);
+    fs.delete(new Path(outputpath));
+    fs.close();
+    nativekvtestconf.set(TestConstants.NATIVETASK_KVTEST_CREATEFILE, "true");
+    try {
+      final KVJob keyJob = new KVJob(jobname, nativekvtestconf, keyclass, valueclass, inputpath, outputpath);
+      keyJob.runJob();
+    } catch (final Exception e) {
+      return "native testcase run time error.";
+    }
+    return outputpath;
+  }
+
+  private String runNormalTest(String jobname, Class<?> keyclass, Class<?> valueclass) throws IOException {
+    final String inputpath = hadoopkvtestconf.get(TestConstants.NATIVETASK_KVTEST_INPUTDIR) + "/"
+        + keyclass.getName()
+        + "/" + valueclass.getName();
+    final String outputpath = hadoopkvtestconf
+        .get(TestConstants.NATIVETASK_KVTEST_NORMAL_OUTPUTDIR)
+        + "/"
+        + keyclass.getName() + "/" + valueclass.getName();
+    // if output file exists ,then delete it
+    final FileSystem fs = FileSystem.get(hadoopkvtestconf);
+    fs.delete(new Path(outputpath));
+    fs.close();
+    hadoopkvtestconf.set(TestConstants.NATIVETASK_KVTEST_CREATEFILE, "false");
+    try {
+      final KVJob keyJob = new KVJob(jobname, hadoopkvtestconf, keyclass, valueclass, inputpath, outputpath);
+      keyJob.runJob();
+    } catch (final Exception e) {
+      return "normal testcase run time error.";
+    }
+    return outputpath;
+  }
+
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/kvtest/LargeKVTest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/kvtest/LargeKVTest.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/kvtest/LargeKVTest.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/kvtest/LargeKVTest.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.kvtest;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.nativetask.testutil.ResultVerifier;
+import org.apache.hadoop.mapred.nativetask.testutil.ScenarioConfiguration;
+import org.apache.hadoop.mapred.nativetask.testutil.TestConstants;
+import org.junit.Test;
+
+public class LargeKVTest {
+
+  @Test
+  public void testKeySize() {
+    runKVSizeTests(Text.class, IntWritable.class);
+  }
+
+  @Test
+  public void testValueSize() {
+    runKVSizeTests(IntWritable.class, Text.class);
+  }
+
+  private static Configuration nativeConf = ScenarioConfiguration.getNativeConfiguration();
+  private static Configuration normalConf = ScenarioConfiguration.getNormalConfiguration();
+  static {
+    nativeConf.addResource(TestConstants.KVTEST_CONF_PATH);
+    nativeConf.set(TestConstants.NATIVETASK_KVTEST_CREATEFILE, "true");
+    normalConf.addResource(TestConstants.KVTEST_CONF_PATH);
+    normalConf.set(TestConstants.NATIVETASK_KVTEST_CREATEFILE, "false");
+  }
+
+  public void runKVSizeTests(Class<?> keyClass, Class<?> valueClass) {
+    if (!keyClass.equals(Text.class) && !valueClass.equals(Text.class)) {
+      return;
+    }
+    final int deafult_KVSize_Maximum = 1 << 22; // 4M
+    final int KVSize_Maximu = normalConf.getInt(TestConstants.NATIVETASK_KVSIZE_MAX_LARGEKV_TEST,
+        deafult_KVSize_Maximum);
+    try {
+
+      for (int i = 65536; i <= KVSize_Maximu; i *= 4) {
+        int min = i / 4;
+        int max = i;
+        nativeConf.set(TestConstants.NATIVETASK_KVSIZE_MIN, String.valueOf(min));
+        nativeConf.set(TestConstants.NATIVETASK_KVSIZE_MAX, String.valueOf(max));
+        normalConf.set(TestConstants.NATIVETASK_KVSIZE_MIN, String.valueOf(min));
+        normalConf.set(TestConstants.NATIVETASK_KVSIZE_MAX, String.valueOf(max));
+
+        System.out.println("===KV Size Test: min size: " + min + ", max size: " + max + ", keyClass: "
+            + keyClass.getName() + ", valueClass: " + valueClass.getName());
+
+        final String nativeOutPut = runNativeLargeKVTest("Test Large Value Size:" + String.valueOf(i), keyClass,
+            valueClass, nativeConf);
+        final String normalOutPut = this.runNormalLargeKVTest("Test Large Key Size:" + String.valueOf(i), keyClass,
+            valueClass, normalConf);
+        final boolean compareRet = ResultVerifier.verify(normalOutPut, nativeOutPut);
+        final String reason = "keytype: " + keyClass.getName() + ", valuetype: " + valueClass.getName()
+            + ", failed with " + (keyClass.equals(Text.class) ? "key" : "value") + ", min size: " + min
+            + ", max size: " + max + ", normal out: " + normalOutPut + ", native Out: " + nativeOutPut;
+        assertEquals(reason, true, compareRet);
+      }
+    } catch (final Exception e) {
+      // TODO: handle exception
+      // assertEquals("test run exception:", null, e);
+      e.printStackTrace();
+    }
+  }
+
+  private String runNativeLargeKVTest(String jobname, Class<?> keyclass, Class<?> valueclass, Configuration conf)
+      throws Exception {
+    final String inputpath = conf.get(TestConstants.NATIVETASK_KVTEST_INPUTDIR) + "/LargeKV/" + keyclass.getName()
+        + "/" + valueclass.getName();
+    final String outputpath = conf.get(TestConstants.NATIVETASK_KVTEST_OUTPUTDIR) + "/LargeKV/" + keyclass.getName()
+        + "/" + valueclass.getName();
+    // if output file exists ,then delete it
+    final FileSystem fs = FileSystem.get(conf);
+    fs.delete(new Path(outputpath), true);
+    fs.close();
+    try {
+      final KVJob keyJob = new KVJob(jobname, conf, keyclass, valueclass, inputpath, outputpath);
+      keyJob.runJob();
+    } catch (final Exception e) {
+      return "normal testcase run time error.";
+    }
+    return outputpath;
+  }
+
+  private String runNormalLargeKVTest(String jobname, Class<?> keyclass, Class<?> valueclass, Configuration conf)
+      throws IOException {
+    final String inputpath = conf.get(TestConstants.NATIVETASK_KVTEST_INPUTDIR) + "/LargeKV/" + keyclass.getName()
+        + "/" + valueclass.getName();
+    final String outputpath = conf.get(TestConstants.NATIVETASK_KVTEST_NORMAL_OUTPUTDIR) + "/LargeKV/"
+        + keyclass.getName() + "/" + valueclass.getName();
+    // if output file exists ,then delete it
+    final FileSystem fs = FileSystem.get(conf);
+    fs.delete(new Path(outputpath), true);
+    fs.close();
+    try {
+      final KVJob keyJob = new KVJob(jobname, conf, keyclass, valueclass, inputpath, outputpath);
+      keyJob.runJob();
+    } catch (final Exception e) {
+      return "normal testcase run time error.";
+    }
+    return outputpath;
+  }
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/kvtest/TestInputFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/kvtest/TestInputFile.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/kvtest/TestInputFile.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/kvtest/TestInputFile.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.kvtest;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.VIntWritable;
+import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.mapred.nativetask.testutil.BytesFactory;
+import org.apache.hadoop.mapred.nativetask.testutil.ScenarioConfiguration;
+import org.apache.hadoop.mapred.nativetask.testutil.TestConstants;
+
+
+public class TestInputFile {
+	
+  public static class KVSizeScope {
+    private static final int DefaultMinNum = 1;
+    private static final int DefaultMaxNum = 64;
+
+    public int minBytesNum;
+    public int maxBytesNum;
+
+    public KVSizeScope() {
+      this.minBytesNum = DefaultMinNum;
+      this.maxBytesNum = DefaultMaxNum;
+    }
+
+    public KVSizeScope(int min, int max) {
+      this.minBytesNum = min;
+      this.maxBytesNum = max;
+    }
+  }
+
+  private static HashMap<String, KVSizeScope> map = new HashMap<String, KVSizeScope>();
+
+  private byte[] databuf = null;
+  private final String keyClsName, valueClsName;
+  private int filesize = 0;
+  private int keyMaxBytesNum, keyMinBytesNum;
+  private int valueMaxBytesNum, valueMinBytesNum;
+  private SequenceFile.Writer writer = null;
+  Random r = new Random();
+  public static final int DATABUFSIZE = 1 << 22; // 4M
+
+  private enum State {
+    KEY, VALUE
+  };
+  
+  static {
+    map.put(BooleanWritable.class.getName(), new KVSizeScope(1, 1));
+    map.put(DoubleWritable.class.getName(), new KVSizeScope(8, 8));
+    map.put(FloatWritable.class.getName(), new KVSizeScope(4, 4));
+    map.put(VLongWritable.class.getName(), new KVSizeScope(8, 8));
+    map.put(ByteWritable.class.getName(), new KVSizeScope(1, 1));
+    map.put(LongWritable.class.getName(), new KVSizeScope(8, 8));
+    map.put(VIntWritable.class.getName(), new KVSizeScope(4, 4));
+    map.put(IntWritable.class.getName(), new KVSizeScope(4, 4));
+  }
+  
+  public TestInputFile(int filesize, String keytype, String valuetype, Configuration conf) throws Exception {
+    this.filesize = filesize;
+    this.databuf = new byte[DATABUFSIZE];
+    this.keyClsName = keytype;
+    this.valueClsName = valuetype;
+    final int defaultMinBytes = conf.getInt(TestConstants.NATIVETASK_KVSIZE_MIN, 1);
+    final int defaultMaxBytes = conf.getInt(TestConstants.NATIVETASK_KVSIZE_MAX, 64);
+
+    if (map.get(keytype) != null) {
+      keyMinBytesNum = map.get(keytype).minBytesNum;
+      keyMaxBytesNum = map.get(keytype).maxBytesNum;
+    } else {
+      keyMinBytesNum = defaultMinBytes;
+      keyMaxBytesNum = defaultMaxBytes;
+    }
+
+    if (map.get(valuetype) != null) {
+      valueMinBytesNum = map.get(valuetype).minBytesNum;
+      valueMaxBytesNum = map.get(valuetype).maxBytesNum;
+    } else {
+      valueMinBytesNum = defaultMinBytes;
+      valueMaxBytesNum = defaultMaxBytes;
+    }
+  }
+
+  public void createSequenceTestFile(String filepath) throws Exception {
+    int FULL_BYTE_SPACE = 256;
+    createSequenceTestFile(filepath, FULL_BYTE_SPACE);
+  }
+
+  public void createSequenceTestFile(String filepath, int base) throws Exception {
+    createSequenceTestFile(filepath, base, (byte)0);
+  }
+  
+  public void createSequenceTestFile(String filepath, int base,  byte start) throws Exception {
+    System.out.println("create file " + filepath);
+    System.out.println(keyClsName + " " + valueClsName);
+    Class<?> tmpkeycls, tmpvaluecls;
+    try {
+      tmpkeycls = Class.forName(keyClsName);
+    } catch (final ClassNotFoundException e) {
+      throw new Exception("key class not found: ", e);
+    }
+    try {
+      tmpvaluecls = Class.forName(valueClsName);
+    } catch (final ClassNotFoundException e) {
+      throw new Exception("key class not found: ", e);
+    }
+    try {
+      final Path outputfilepath = new Path(filepath);
+      final ScenarioConfiguration conf= new ScenarioConfiguration();
+      final FileSystem hdfs = outputfilepath.getFileSystem(conf);
+      writer = new SequenceFile.Writer(hdfs, conf, outputfilepath, tmpkeycls, tmpvaluecls);
+    } catch (final Exception e) {
+      e.printStackTrace();
+    }
+
+    int tmpfilesize = this.filesize;
+    while (tmpfilesize > DATABUFSIZE) {
+      nextRandomBytes(databuf, base, start);
+      final int size = flushBuf(DATABUFSIZE);
+      tmpfilesize -= size;
+    }
+    nextRandomBytes(databuf, base, start);
+    flushBuf(tmpfilesize);
+
+    if (writer != null) {
+      IOUtils.closeStream(writer);
+    } else {
+      throw new Exception("no writer to create sequenceTestFile!");
+    }
+  }
+  
+  private void nextRandomBytes(byte[] buf, int base) {
+    nextRandomBytes(buf, base, (byte)0);
+  }
+  
+  private void nextRandomBytes(byte[] buf, int base, byte start) {
+    r.nextBytes(buf);
+    for (int i = 0; i < buf.length; i++) {
+      buf[i] = (byte) ((buf[i] & 0xFF) % base + start);
+    }
+  }
+
+  private int flushBuf(int buflen) throws Exception {
+    final Random r = new Random();
+    int keybytesnum = 0;
+    int valuebytesnum = 0;
+    int offset = 0;
+
+    while (offset < buflen) {
+      final int remains = buflen - offset;
+      keybytesnum = keyMaxBytesNum;
+      if (keyMaxBytesNum != keyMinBytesNum) {
+        keybytesnum = keyMinBytesNum + r.nextInt(keyMaxBytesNum - keyMinBytesNum);
+      }
+
+      valuebytesnum = valueMaxBytesNum;
+      if (valueMaxBytesNum != valueMinBytesNum) {
+        valuebytesnum = valueMinBytesNum + r.nextInt(valueMaxBytesNum - valueMinBytesNum);
+      }
+
+      if (keybytesnum + valuebytesnum > remains) {
+        break;
+      }
+
+      final byte[] key = new byte[keybytesnum];
+      final byte[] value = new byte[valuebytesnum];
+
+      System.arraycopy(databuf, offset, key, 0, keybytesnum);
+      offset += keybytesnum;
+
+      System.arraycopy(databuf, offset, value, 0, valuebytesnum);
+      offset += valuebytesnum;
+      
+      try {
+        writer.append(BytesFactory.newObject(key, this.keyClsName), BytesFactory.newObject(value, this.valueClsName));
+      } catch (final IOException e) {
+        e.printStackTrace();
+        throw new Exception("sequence file create failed", e);
+      }
+    }
+    return offset;
+  }
+
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/nonsorttest/NonSortTest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/nonsorttest/NonSortTest.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/nonsorttest/NonSortTest.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/function/org/apache/hadoop/mapred/nativetask/nonsorttest/NonSortTest.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.nonsorttest;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.nativetask.kvtest.TestInputFile;
+import org.apache.hadoop.mapred.nativetask.testutil.ResultVerifier;
+import org.apache.hadoop.mapred.nativetask.testutil.ScenarioConfiguration;
+import org.apache.hadoop.mapred.nativetask.testutil.TestConstants;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.junit.Before;
+import org.junit.Test;
+
+public class NonSortTest {
+
+  @Test
+  public void nonSortTest() throws Exception {
+    Configuration nativeConf = ScenarioConfiguration.getNativeConfiguration();
+    nativeConf.addResource(TestConstants.NONSORT_TEST_CONF);
+    nativeConf.set(TestConstants.NATIVETASK_MAP_OUTPUT_SORT, "false");
+    String inputpath = nativeConf.get(TestConstants.NONSORT_TEST_INPUTDIR);
+    String outputpath = nativeConf.get(TestConstants.NONSORT_TEST_NATIVE_OUTPUT);
+    final Job nativeNonSort = getJob(nativeConf, "NativeNonSort", inputpath, outputpath);
+    nativeNonSort.waitForCompletion(true);
+
+    Configuration normalConf = ScenarioConfiguration.getNormalConfiguration();
+    normalConf.addResource(TestConstants.NONSORT_TEST_CONF);
+    inputpath = normalConf.get(TestConstants.NONSORT_TEST_INPUTDIR);
+    outputpath = normalConf.get(TestConstants.NONSORT_TEST_NORMAL_OUTPUT);
+    final Job hadoopWithSort = getJob(normalConf, "NormalJob", inputpath, outputpath);
+    hadoopWithSort.waitForCompletion(true);
+
+    final boolean compareRet = ResultVerifier.verify(nativeConf.get(TestConstants.NONSORT_TEST_NATIVE_OUTPUT),
+        normalConf.get(TestConstants.NONSORT_TEST_NORMAL_OUTPUT));
+    assertEquals("file compare result: if they are the same ,then return true", true, compareRet);
+  }
+
+  @Before
+  public void startUp() throws Exception {
+    final ScenarioConfiguration configuration = new ScenarioConfiguration();
+    configuration.addNonSortTestConf();
+    final FileSystem fs = FileSystem.get(configuration);
+    final Path path = new Path(configuration.get(TestConstants.NONSORT_TEST_INPUTDIR));
+    if (!fs.exists(path)) {
+      new TestInputFile(configuration.getInt("nativetask.nonsorttest.filesize", 10000000), Text.class.getName(),
+          Text.class.getName(), configuration).createSequenceTestFile(path.toString());
+    }
+    fs.close();
+  }
+
+  private Job getJob(Configuration conf, String jobName, String inputpath, String outputpath) throws IOException {
+    final FileSystem fs = FileSystem.get(conf);
+    if (fs.exists(new Path(outputpath))) {
+      fs.delete(new Path(outputpath), true);
+    }
+    fs.close();
+    final Job job = new Job(conf, jobName);
+    job.setJarByClass(NonSortTestMR.class);
+    job.setMapperClass(NonSortTestMR.Map.class);
+    job.setReducerClass(NonSortTestMR.KeyHashSumReduce.class);
+    job.setOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(IntWritable.class);
+    job.setOutputValueClass(LongWritable.class);
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setOutputFormatClass(TextOutputFormat.class);
+    FileInputFormat.addInputPath(job, new Path(inputpath));
+    FileOutputFormat.setOutputPath(job, new Path(outputpath));
+    return job;
+  }
+
+}