You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2014/07/24 08:17:34 UTC
svn commit: r1613004 [1/2] - in
/hadoop/common/branches/MR-2841/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/
hadoop-mapreduce-client/hadoop-mapre...
Author: todd
Date: Thu Jul 24 06:17:33 2014
New Revision: 1613004
URL: http://svn.apache.org/r1613004
Log:
MAPREDUCE-5996. native-task: Rename system tests into standard directory layout. Contributed by Todd Lipcon.
Added:
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/CombinerTest.java
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/LargeKVCombinerTest.java
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/OldAPICombinerTest.java
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/WordCount.java
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/WordCountWithOldAPI.java
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/compresstest/
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/compresstest/CompressMapper.java
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/compresstest/CompressTest.java
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/HashSumReducer.java
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/KVJob.java
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/KVTest.java
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/LargeKVTest.java
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/TestInputFile.java
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/nonsorttest/
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/nonsorttest/NonSortTest.java
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/nonsorttest/NonSortTestMR.java
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/BytesFactory.java
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/EnforceNativeOutputCollectorDelegator.java
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/MockValueClass.java
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/ResultVerifier.java
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/ScenarioConfiguration.java
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/TestConstants.java
Removed:
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/system/
Modified:
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt
Modified: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt?rev=1613004&r1=1613003&r2=1613004&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt (original)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt Thu Jul 24 06:17:33 2014
@@ -3,3 +3,4 @@ Changes for Hadoop Native Map Output Col
MAPREDUCE-5985. native-task: Fix build on macosx. Contributed by Binglin Chang
MAPREDUCE-5994. Simplify ByteUtils and fix failing test. (todd)
+MAPREDUCE-5996. native-task: Rename system tests into standard directory layout (todd)
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/CombinerTest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/CombinerTest.java?rev=1613004&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/CombinerTest.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/CombinerTest.java Thu Jul 24 06:17:33 2014
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.combinertest;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapred.nativetask.combinertest.WordCount.IntSumReducer;
+import org.apache.hadoop.mapred.nativetask.combinertest.WordCount.TokenizerMapper;
+import org.apache.hadoop.mapred.nativetask.kvtest.TestInputFile;
+import org.apache.hadoop.mapred.nativetask.testutil.ResultVerifier;
+import org.apache.hadoop.mapred.nativetask.testutil.ScenarioConfiguration;
+import org.apache.hadoop.mapred.nativetask.testutil.TestConstants;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CombinerTest {
+ private FileSystem fs;
+ private String inputpath;
+ private String nativeoutputpath;
+ private String hadoopoutputpath;
+
+ @Test
+ public void testWordCountCombiner() {
+ try {
+
+ final Configuration nativeConf = ScenarioConfiguration.getNativeConfiguration();
+ nativeConf.addResource(TestConstants.COMBINER_CONF_PATH);
+ final Job nativejob = getJob("nativewordcount", nativeConf, inputpath, nativeoutputpath);
+
+ final Configuration commonConf = ScenarioConfiguration.getNormalConfiguration();
+ commonConf.addResource(TestConstants.COMBINER_CONF_PATH);
+
+ final Job normaljob = getJob("normalwordcount", commonConf, inputpath, hadoopoutputpath);
+
+ nativejob.waitForCompletion(true);
+
+ Counter nativeReduceGroups = nativejob.getCounters().findCounter(Task.Counter.REDUCE_INPUT_RECORDS);
+
+ normaljob.waitForCompletion(true);
+ Counter normalReduceGroups = normaljob.getCounters().findCounter(Task.Counter.REDUCE_INPUT_RECORDS);
+
+ assertEquals(true, ResultVerifier.verify(nativeoutputpath, hadoopoutputpath));
+ assertEquals("Native Reduce reduce group counter should equal orignal reduce group counter",
+ nativeReduceGroups.getValue(), normalReduceGroups.getValue());
+
+ } catch (final Exception e) {
+ e.printStackTrace();
+ assertEquals("run exception", true, false);
+ }
+ }
+
+ @Before
+ public void startUp() throws Exception {
+ final ScenarioConfiguration conf = new ScenarioConfiguration();
+ conf.addcombinerConf();
+
+ this.fs = FileSystem.get(conf);
+
+ this.inputpath = conf.get(TestConstants.NATIVETASK_TEST_COMBINER_INPUTPATH_KEY,
+ TestConstants.NATIVETASK_TEST_COMBINER_INPUTPATH_DEFAULTV) + "/wordcount";
+
+ if (!fs.exists(new Path(inputpath))) {
+ new TestInputFile(
+ conf.getInt(TestConstants.NATIVETASK_COMBINER_WORDCOUNT_FILESIZE, 1000000),
+ Text.class.getName(),
+ Text.class.getName(), conf).createSequenceTestFile(inputpath, 1, (byte)('a'));
+ }
+
+ this.nativeoutputpath = conf.get(TestConstants.NATIVETASK_TEST_COMBINER_OUTPUTPATH,
+ TestConstants.NATIVETASK_TEST_COMBINER_OUTPUTPATH_DEFAULTV) + "/nativewordcount";
+ this.hadoopoutputpath = conf.get(TestConstants.NORMAL_TEST_COMBINER_OUTPUTPATH,
+ TestConstants.NORMAL_TEST_COMBINER_OUTPUTPATH_DEFAULTV) + "/normalwordcount";
+ }
+
+ protected static Job getJob(String jobname, Configuration inputConf, String inputpath, String outputpath)
+ throws Exception {
+ final Configuration conf = new Configuration(inputConf);
+ conf.set("fileoutputpath", outputpath);
+ final FileSystem fs = FileSystem.get(conf);
+ if (fs.exists(new Path(outputpath))) {
+ fs.delete(new Path(outputpath));
+ }
+ fs.close();
+ final Job job = new Job(conf, jobname);
+ job.setJarByClass(WordCount.class);
+ job.setMapperClass(TokenizerMapper.class);
+ job.setCombinerClass(IntSumReducer.class);
+ job.setReducerClass(IntSumReducer.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(IntWritable.class);
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ FileInputFormat.addInputPath(job, new Path(inputpath));
+ FileOutputFormat.setOutputPath(job, new Path(outputpath));
+ return job;
+ }
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/LargeKVCombinerTest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/LargeKVCombinerTest.java?rev=1613004&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/LargeKVCombinerTest.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/LargeKVCombinerTest.java Thu Jul 24 06:17:33 2014
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.combinertest;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapred.nativetask.kvtest.TestInputFile;
+import org.apache.hadoop.mapred.nativetask.testutil.ResultVerifier;
+import org.apache.hadoop.mapred.nativetask.testutil.ScenarioConfiguration;
+import org.apache.hadoop.mapred.nativetask.testutil.TestConstants;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.Test;
+
+public class LargeKVCombinerTest {
+
+ @Test
+ public void testLargeValueCombiner(){
+ final Configuration normalConf = ScenarioConfiguration.getNormalConfiguration();
+ final Configuration nativeConf = ScenarioConfiguration.getNativeConfiguration();
+ normalConf.addResource(TestConstants.COMBINER_CONF_PATH);
+ nativeConf.addResource(TestConstants.COMBINER_CONF_PATH);
+ final int deafult_KVSize_Maximum = 1 << 22; // 4M
+ final int KVSize_Maximu = normalConf.getInt(TestConstants.NATIVETASK_KVSIZE_MAX_LARGEKV_TEST,
+ deafult_KVSize_Maximum);
+ final String inputPath = normalConf.get(TestConstants.NATIVETASK_TEST_COMBINER_INPUTPATH_KEY,
+ TestConstants.NATIVETASK_TEST_COMBINER_INPUTPATH_DEFAULTV) + "/largeKV";
+ final String nativeOutputPath = normalConf.get(TestConstants.NATIVETASK_TEST_COMBINER_OUTPUTPATH,
+ TestConstants.NATIVETASK_TEST_COMBINER_OUTPUTPATH_DEFAULTV) + "/nativeLargeKV";
+ final String hadoopOutputPath = normalConf.get(TestConstants.NORMAL_TEST_COMBINER_OUTPUTPATH,
+ TestConstants.NORMAL_TEST_COMBINER_OUTPUTPATH_DEFAULTV) + "/normalLargeKV";
+ try {
+ final FileSystem fs = FileSystem.get(normalConf);
+ for (int i = 65536; i <= KVSize_Maximu; i *= 4) {
+
+ int max = i;
+ int min = Math.max(i / 4, max - 10);
+
+ System.out.println("===KV Size Test: min size: " + min + ", max size: " + max);
+
+ normalConf.set(TestConstants.NATIVETASK_KVSIZE_MIN, String.valueOf(min));
+ normalConf.set(TestConstants.NATIVETASK_KVSIZE_MAX, String.valueOf(max));
+ nativeConf.set(TestConstants.NATIVETASK_KVSIZE_MIN, String.valueOf(min));
+ nativeConf.set(TestConstants.NATIVETASK_KVSIZE_MAX, String.valueOf(max));
+ fs.delete(new Path(inputPath), true);
+ new TestInputFile(normalConf.getInt(TestConstants.NATIVETASK_COMBINER_WORDCOUNT_FILESIZE,
+ 1000000), IntWritable.class.getName(),
+ Text.class.getName(), normalConf).createSequenceTestFile(inputPath, 1);
+
+ final Job normaljob = CombinerTest.getJob("normalwordcount", normalConf, inputPath, hadoopOutputPath);
+ final Job nativejob = CombinerTest.getJob("nativewordcount", nativeConf, inputPath, nativeOutputPath);
+
+ nativejob.waitForCompletion(true);
+ Counter nativeReduceGroups = nativejob.getCounters().findCounter(Task.Counter.REDUCE_INPUT_RECORDS);
+
+ normaljob.waitForCompletion(true);
+ Counter normalReduceGroups = normaljob.getCounters().findCounter(Task.Counter.REDUCE_INPUT_RECORDS);
+
+ final boolean compareRet = ResultVerifier.verify(nativeOutputPath, hadoopOutputPath);
+
+ final String reason = "LargeKVCombinerTest failed with, min size: " + min
+ + ", max size: " + max + ", normal out: " + hadoopOutputPath + ", native Out: " + nativeOutputPath;
+
+ assertEquals(reason, true, compareRet);
+// assertEquals("Native Reduce reduce group counter should equal orignal reduce group counter",
+// nativeReduceGroups.getValue(), normalReduceGroups.getValue());
+ }
+ fs.close();
+ } catch (final Exception e) {
+ e.printStackTrace();
+ assertEquals("run exception", true, false);
+ }
+ }
+
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/OldAPICombinerTest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/OldAPICombinerTest.java?rev=1613004&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/OldAPICombinerTest.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/OldAPICombinerTest.java Thu Jul 24 06:17:33 2014
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.combinertest;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.mapred.nativetask.kvtest.TestInputFile;
+import org.apache.hadoop.mapred.nativetask.testutil.ResultVerifier;
+import org.apache.hadoop.mapred.nativetask.testutil.ScenarioConfiguration;
+import org.apache.hadoop.mapred.nativetask.testutil.TestConstants;
+import org.apache.hadoop.mapreduce.Counter;
+import org.junit.Before;
+import org.junit.Test;
+
+public class OldAPICombinerTest {
+ private FileSystem fs;
+ private String inputpath;
+
+ @Test
+ public void testWordCountCombinerWithOldAPI() throws Exception {
+ final Configuration nativeConf = ScenarioConfiguration.getNativeConfiguration();
+ nativeConf.addResource(TestConstants.COMBINER_CONF_PATH);
+ final String nativeoutput = nativeConf.get(TestConstants.OLDAPI_NATIVETASK_TEST_COMBINER_OUTPUTPATH);
+ final JobConf nativeJob = getOldAPIJobconf(nativeConf, "nativeCombinerWithOldAPI", inputpath, nativeoutput);
+ RunningJob nativeRunning = JobClient.runJob(nativeJob);
+
+ Counter nativeReduceGroups = nativeRunning.getCounters().findCounter(Task.Counter.REDUCE_INPUT_RECORDS);
+
+ final Configuration normalConf = ScenarioConfiguration.getNormalConfiguration();
+ normalConf.addResource(TestConstants.COMBINER_CONF_PATH);
+ final String normaloutput = normalConf.get(TestConstants.OLDAPI_NORMAL_TEST_COMBINER_OUTPUTPATH);
+ final JobConf normalJob = getOldAPIJobconf(normalConf, "normalCombinerWithOldAPI", inputpath, normaloutput);
+
+ RunningJob normalRunning = JobClient.runJob(normalJob);
+ Counter normalReduceGroups = normalRunning.getCounters().findCounter(Task.Counter.REDUCE_INPUT_RECORDS);
+
+ final boolean compareRet = ResultVerifier.verify(nativeoutput, normaloutput);
+ assertEquals("file compare result: if they are the same ,then return true", true, compareRet);
+
+ assertEquals("The input reduce record count must be same", nativeReduceGroups.getValue(), normalReduceGroups.getValue());
+ }
+
+ @Before
+ public void startUp() throws Exception {
+ final ScenarioConfiguration conf = new ScenarioConfiguration();
+ conf.addcombinerConf();
+ this.fs = FileSystem.get(conf);
+ this.inputpath = conf.get(TestConstants.NATIVETASK_TEST_COMBINER_INPUTPATH_KEY,
+ TestConstants.NATIVETASK_TEST_COMBINER_INPUTPATH_DEFAULTV) + "/wordcount";
+
+ if (!fs.exists(new Path(inputpath))) {
+ new TestInputFile(conf.getInt("nativetask.combiner.wordcount.filesize", 1000000), Text.class.getName(),
+ Text.class.getName(), conf).createSequenceTestFile(inputpath, 1, (byte)('a'));
+ }
+ }
+
+ private static JobConf getOldAPIJobconf(Configuration configuration, String name, String input, String output)
+ throws Exception {
+ final JobConf jobConf = new JobConf(configuration);
+ final FileSystem fs = FileSystem.get(configuration);
+ if (fs.exists(new Path(output))) {
+ fs.delete(new Path(output), true);
+ }
+ fs.close();
+ jobConf.setJobName(name);
+ jobConf.setOutputKeyClass(Text.class);
+ jobConf.setOutputValueClass(IntWritable.class);
+ jobConf.setMapperClass(WordCountWithOldAPI.TokenizerMapperWithOldAPI.class);
+ jobConf.setCombinerClass(WordCountWithOldAPI.IntSumReducerWithOldAPI.class);
+ jobConf.setReducerClass(WordCountWithOldAPI.IntSumReducerWithOldAPI.class);
+
+ jobConf.setInputFormat(SequenceFileInputFormat.class);
+ jobConf.setOutputFormat(TextOutputFormat.class);
+
+ FileInputFormat.setInputPaths(jobConf, new Path(input));
+ FileOutputFormat.setOutputPath(jobConf, new Path(output));
+ return jobConf;
+ }
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/WordCount.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/WordCount.java?rev=1613004&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/WordCount.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/WordCount.java Thu Jul 24 06:17:33 2014
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.combinertest;
+
+import java.io.IOException;
+import java.util.StringTokenizer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.GenericOptionsParser;
+
+public class WordCount {
+
+ private static Log LOG = LogFactory.getLog(WordCount.class);
+
+ public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
+
+ private final static IntWritable one = new IntWritable(1);
+ private final Text word = new Text();
+
+ @Override
+ public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
+ final StringTokenizer itr = new StringTokenizer(value.toString());
+ while (itr.hasMoreTokens()) {
+ word.set(itr.nextToken());
+ context.write(word, one);
+ }
+ }
+ }
+
+ public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
+ private final IntWritable result = new IntWritable();
+
+ @Override
+ public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,
+ InterruptedException {
+ int sum = 0;
+ for (final IntWritable val : values) {
+ sum += val.get();
+ }
+ result.set(sum);
+ context.write(key, result);
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ final Configuration conf = new Configuration();
+ final String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+ if (otherArgs.length != 2) {
+ System.err.println("Usage: wordcount <in> <out>");
+ System.exit(2);
+ }
+ final Job job = new Job(conf, conf.get(MRJobConfig.JOB_NAME, "word count"));
+ job.setJarByClass(WordCount.class);
+ job.setMapperClass(TokenizerMapper.class);
+ job.setCombinerClass(IntSumReducer.class);
+ job.setReducerClass(IntSumReducer.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(IntWritable.class);
+ FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
+ FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
+ System.exit(job.waitForCompletion(true) ? 0 : 1);
+ }
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/WordCountWithOldAPI.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/WordCountWithOldAPI.java?rev=1613004&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/WordCountWithOldAPI.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/WordCountWithOldAPI.java Thu Jul 24 06:17:33 2014
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.combinertest;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+public class WordCountWithOldAPI {
+
+ public static class TokenizerMapperWithOldAPI extends MapReduceBase implements
+ Mapper<Object, Text, Text, IntWritable> {
+ private final static IntWritable one = new IntWritable(1);
+ private final Text word = new Text();
+
+ @Override
+ public void map(Object key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter)
+ throws IOException {
+ final StringTokenizer itr = new StringTokenizer(value.toString());
+ while (itr.hasMoreTokens()) {
+ word.set(itr.nextToken());
+ output.collect(word, one);
+ }
+ }
+ }
+
+ public static class IntSumReducerWithOldAPI extends MapReduceBase implements
+ Reducer<Text, IntWritable, Text, IntWritable> {
+ private final IntWritable result = new IntWritable();
+
+ @Override
+ public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output,
+ Reporter reporter) throws IOException {
+ int sum = 0;
+ while (values.hasNext()) {
+ sum += values.next().get();
+ }
+ result.set(sum);
+ output.collect(key, result);
+ }
+ }
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/compresstest/CompressMapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/compresstest/CompressMapper.java?rev=1613004&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/compresstest/CompressMapper.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/compresstest/CompressMapper.java Thu Jul 24 06:17:33 2014
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.compresstest;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.nativetask.testutil.ScenarioConfiguration;
+import org.apache.hadoop.mapred.nativetask.testutil.TestConstants;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+public class CompressMapper {
+ public static final String inputFile = "./compress/input.txt";
+ public static final String outputFileDir = "./compress/output/";
+
+ public static class TextCompressMapper extends Mapper<Text, Text, Text, Text> {
+
+ @Override
+ protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
+ context.write(key, value);
+ }
+ }
+
+ public static Job getCompressJob(String jobname, Configuration conf) {
+ Job job = null;
+ try {
+ job = new Job(conf, jobname + "-CompressMapperJob");
+ job.setJarByClass(CompressMapper.class);
+ job.setMapperClass(TextCompressMapper.class);
+ job.setOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(Text.class);
+ final Path outputpath = new Path(outputFileDir + jobname);
+ // if output file exists ,delete it
+ final FileSystem hdfs = FileSystem.get(new ScenarioConfiguration());
+ if (hdfs.exists(outputpath)) {
+ hdfs.delete(outputpath);
+ }
+ hdfs.close();
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ FileInputFormat.addInputPath(job, new Path(inputFile));
+ FileOutputFormat.setOutputPath(job, outputpath);
+ } catch (final Exception e) {
+ e.printStackTrace();
+ }
+ return job;
+ }
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/compresstest/CompressTest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/compresstest/CompressTest.java?rev=1613004&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/compresstest/CompressTest.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/compresstest/CompressTest.java Thu Jul 24 06:17:33 2014
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.compresstest;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.nativetask.kvtest.TestInputFile;
+import org.apache.hadoop.mapred.nativetask.testutil.ResultVerifier;
+import org.apache.hadoop.mapred.nativetask.testutil.ScenarioConfiguration;
+import org.apache.hadoop.mapred.nativetask.testutil.TestConstants;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CompressTest {
+
+ @Test
+ public void testSnappyCompress() throws Exception {
+ final Configuration conf = ScenarioConfiguration.getNativeConfiguration();
+ conf.addResource(TestConstants.SNAPPY_COMPRESS_CONF_PATH);
+ final Job job = CompressMapper.getCompressJob("nativesnappy", conf);
+ job.waitForCompletion(true);
+
+ final Configuration hadoopconf = ScenarioConfiguration.getNormalConfiguration();
+ hadoopconf.addResource(TestConstants.SNAPPY_COMPRESS_CONF_PATH);
+ final Job hadoopjob = CompressMapper.getCompressJob("hadoopsnappy", hadoopconf);
+ hadoopjob.waitForCompletion(true);
+
+ final boolean compareRet = ResultVerifier.verify(CompressMapper.outputFileDir + "nativesnappy",
+ CompressMapper.outputFileDir + "hadoopsnappy");
+ assertEquals("file compare result: if they are the same ,then return true", true, compareRet);
+ }
+
+ @Test
+ public void testGzipCompress() throws Exception {
+ final Configuration conf = ScenarioConfiguration.getNativeConfiguration();
+ conf.addResource(TestConstants.GZIP_COMPRESS_CONF_PATH);
+ final Job job = CompressMapper.getCompressJob("nativegzip", conf);
+ job.waitForCompletion(true);
+
+ final Configuration hadoopconf = ScenarioConfiguration.getNormalConfiguration();
+ hadoopconf.addResource(TestConstants.GZIP_COMPRESS_CONF_PATH);
+ final Job hadoopjob = CompressMapper.getCompressJob("hadoopgzip", hadoopconf);
+ hadoopjob.waitForCompletion(true);
+
+ final boolean compareRet = ResultVerifier.verify(CompressMapper.outputFileDir + "nativegzip",
+ CompressMapper.outputFileDir + "hadoopgzip");
+ assertEquals("file compare result: if they are the same ,then return true", true, compareRet);
+ }
+
+ @Test
+ public void testBzip2Compress() throws Exception {
+ final Configuration nativeconf = ScenarioConfiguration.getNativeConfiguration();
+ nativeconf.addResource(TestConstants.BZIP2_COMPRESS_CONF_PATH);
+ final Job nativejob = CompressMapper.getCompressJob("nativebzip2", nativeconf);
+ nativejob.waitForCompletion(true);
+
+ final Configuration hadoopconf = ScenarioConfiguration.getNormalConfiguration();
+ hadoopconf.addResource(TestConstants.BZIP2_COMPRESS_CONF_PATH);
+ final Job hadoopjob = CompressMapper.getCompressJob("hadoopbzip2", hadoopconf);
+ hadoopjob.waitForCompletion(true);
+
+ final boolean compareRet = ResultVerifier.verify(CompressMapper.outputFileDir + "nativebzip2",
+ CompressMapper.outputFileDir + "hadoopbzip2");
+ assertEquals("file compare result: if they are the same ,then return true", true, compareRet);
+ }
+
+ @Test
+ public void testLz4Compress() throws Exception {
+ final Configuration nativeConf = ScenarioConfiguration.getNativeConfiguration();
+ nativeConf.addResource(TestConstants.LZ4_COMPRESS_CONF_PATH);
+ final Job nativeJob = CompressMapper.getCompressJob("nativelz4", nativeConf);
+ nativeJob.waitForCompletion(true);
+
+ final Configuration hadoopConf = ScenarioConfiguration.getNormalConfiguration();
+ hadoopConf.addResource(TestConstants.LZ4_COMPRESS_CONF_PATH);
+ final Job hadoopJob = CompressMapper.getCompressJob("hadooplz4", hadoopConf);
+ hadoopJob.waitForCompletion(true);
+ final boolean compareRet = ResultVerifier.verify(CompressMapper.outputFileDir + "nativelz4",
+ CompressMapper.outputFileDir + "hadooplz4");
+ assertEquals("file compare result: if they are the same ,then return true", true, compareRet);
+ }
+
+ @Test
+ public void testDefaultCompress() throws Exception {
+ final Configuration nativeConf = ScenarioConfiguration.getNativeConfiguration();
+ nativeConf.addResource(TestConstants.DEFAULT_COMPRESS_CONF_PATH);
+ final Job nativeJob = CompressMapper.getCompressJob("nativedefault", nativeConf);
+ nativeJob.waitForCompletion(true);
+
+ final Configuration hadoopConf = ScenarioConfiguration.getNormalConfiguration();
+ hadoopConf.addResource(TestConstants.DEFAULT_COMPRESS_CONF_PATH);
+ final Job hadoopJob = CompressMapper.getCompressJob("hadoopdefault", hadoopConf);
+ hadoopJob.waitForCompletion(true);
+ final boolean compareRet = ResultVerifier.verify(CompressMapper.outputFileDir + "nativedefault",
+ CompressMapper.outputFileDir + "hadoopdefault");
+ assertEquals("file compare result: if they are the same ,then return true", true, compareRet);
+ }
+
+ @Before
+ public void startUp() throws Exception {
+ final ScenarioConfiguration conf = new ScenarioConfiguration();
+ final FileSystem fs = FileSystem.get(conf);
+ final Path path = new Path(CompressMapper.inputFile);
+ fs.delete(path);
+ if (!fs.exists(path)) {
+ new TestInputFile(ScenarioConfiguration.getNormalConfiguration().getInt(
+ TestConstants.NATIVETASK_COMPRESS_FILESIZE, 100000),
+ Text.class.getName(), Text.class.getName(), conf)
+ .createSequenceTestFile(CompressMapper.inputFile);
+
+ }
+ fs.close();
+ }
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/HashSumReducer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/HashSumReducer.java?rev=1613004&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/HashSumReducer.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/HashSumReducer.java Thu Jul 24 06:17:33 2014
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.kvtest;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Reducer;
+
+public class HashSumReducer<KTYPE, VTYPE> extends Reducer<KTYPE, VTYPE, KTYPE, IntWritable> {
+
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(os);
+
+ @Override
+ public void reduce(KTYPE key, Iterable<VTYPE> values, Context context) throws IOException, InterruptedException {
+ int hashSum = 0;
+ for (final VTYPE val : values) {
+ if (val instanceof Writable) {
+ os.reset();
+ ((Writable) val).write(dos);
+ final int hash = Arrays.hashCode(os.toByteArray());
+ hashSum += hash;
+ }
+ }
+
+ context.write(key, new IntWritable(hashSum));
+ }
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/KVJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/KVJob.java?rev=1613004&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/KVJob.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/KVJob.java Thu Jul 24 06:17:33 2014
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.kvtest;
+
+import java.io.IOException;
+import java.util.zip.CRC32;
+
+import com.google.common.primitives.Longs;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.nativetask.testutil.BytesFactory;
+import org.apache.hadoop.mapred.nativetask.testutil.TestConstants;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+public class KVJob {
+ public static final String INPUTPATH = "nativetask.kvtest.inputfile.path";
+ public static final String OUTPUTPATH = "nativetask.kvtest.outputfile.path";
+ Job job = null;
+
+ public static class ValueMapper<KTYPE, VTYPE> extends Mapper<KTYPE, VTYPE, KTYPE, VTYPE> {
+ @Override
+ public void map(KTYPE key, VTYPE value, Context context) throws IOException, InterruptedException {
+ context.write(key, value);
+ }
+ }
+
+ public static class KVMReducer<KTYPE, VTYPE> extends Reducer<KTYPE, VTYPE, KTYPE, VTYPE> {
+ public void reduce(KTYPE key, VTYPE value, Context context) throws IOException, InterruptedException {
+ context.write(key, value);
+ }
+ }
+
+ public static class KVReducer<KTYPE, VTYPE> extends Reducer<KTYPE, VTYPE, KTYPE, VTYPE> {
+
+ @Override
+ public void reduce(KTYPE key, Iterable<VTYPE> values, Context context) throws IOException, InterruptedException {
+ long resultlong = 0;// 8 bytes match BytesFactory.fromBytes function
+ final CRC32 crc32 = new CRC32();
+ for (final VTYPE val : values) {
+ crc32.reset();
+ crc32.update(BytesFactory.toBytes(val));
+ resultlong += crc32.getValue();
+ }
+ final VTYPE V = null;
+ context.write(key, (VTYPE) BytesFactory.newObject(Longs.toByteArray(resultlong), V.getClass().getName()));
+ }
+ }
+
+ public KVJob(String jobname, Configuration conf, Class<?> keyclass, Class<?> valueclass, String inputpath,
+ String outputpath) throws Exception {
+ job = new Job(conf, jobname);
+ job.setJarByClass(KVJob.class);
+ job.setMapperClass(KVJob.ValueMapper.class);
+ job.setOutputKeyClass(keyclass);
+ job.setMapOutputValueClass(valueclass);
+
+ if (conf.get(TestConstants.NATIVETASK_KVTEST_CREATEFILE).equals("true")) {
+ final FileSystem fs = FileSystem.get(conf);
+ fs.delete(new Path(inputpath), true);
+ fs.close();
+ final TestInputFile testfile = new TestInputFile(Integer.valueOf(conf.get(
+ TestConstants.FILESIZE_KEY, "1000")),
+ keyclass.getName(), valueclass.getName(), conf);
+ testfile.createSequenceTestFile(inputpath);
+
+ }
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ FileInputFormat.addInputPath(job, new Path(inputpath));
+ FileOutputFormat.setOutputPath(job, new Path(outputpath));
+ }
+
+ public void runJob() throws Exception {
+
+ job.waitForCompletion(true);
+ }
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/KVTest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/KVTest.java?rev=1613004&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/KVTest.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/KVTest.java Thu Jul 24 06:17:33 2014
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.kvtest;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.nativetask.testutil.ResultVerifier;
+import org.apache.hadoop.mapred.nativetask.testutil.ScenarioConfiguration;
+import org.apache.hadoop.mapred.nativetask.testutil.TestConstants;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class KVTest {
+ private static Class<?>[] keyclasses = null;
+ private static Class<?>[] valueclasses = null;
+ private static String[] keyclassNames = null;
+ private static String[] valueclassNames = null;
+
+ private static Configuration nativekvtestconf = ScenarioConfiguration.getNativeConfiguration();
+ private static Configuration hadoopkvtestconf = ScenarioConfiguration.getNormalConfiguration();
+ static {
+ nativekvtestconf.addResource(TestConstants.KVTEST_CONF_PATH);
+ hadoopkvtestconf.addResource(TestConstants.KVTEST_CONF_PATH);
+ }
+
+ @Parameters(name = "key:{0}\nvalue:{1}")
+ public static Iterable<Class<?>[]> data() {
+ final String valueclassesStr = nativekvtestconf
+ .get(TestConstants.NATIVETASK_KVTEST_VALUECLASSES);
+ System.out.println(valueclassesStr);
+ valueclassNames = valueclassesStr.replaceAll("\\s", "").split(";");// delete
+ // " "
+ final ArrayList<Class<?>> tmpvalueclasses = new ArrayList<Class<?>>();
+ for (int i = 0; i < valueclassNames.length; i++) {
+ try {
+ if (valueclassNames[i].equals("")) {
+ continue;
+ }
+ tmpvalueclasses.add(Class.forName(valueclassNames[i]));
+ } catch (final ClassNotFoundException e) {
+ e.printStackTrace();
+ }
+ }
+ valueclasses = tmpvalueclasses.toArray(new Class[tmpvalueclasses.size()]);
+ final String keyclassesStr = nativekvtestconf.get(TestConstants.NATIVETASK_KVTEST_KEYCLASSES);
+ System.out.println(keyclassesStr);
+ keyclassNames = keyclassesStr.replaceAll("\\s", "").split(";");// delete
+ // " "
+ final ArrayList<Class<?>> tmpkeyclasses = new ArrayList<Class<?>>();
+ for (int i = 0; i < keyclassNames.length; i++) {
+ try {
+ if (keyclassNames[i].equals("")) {
+ continue;
+ }
+ tmpkeyclasses.add(Class.forName(keyclassNames[i]));
+ } catch (final ClassNotFoundException e) {
+ e.printStackTrace();
+ }
+ }
+ keyclasses = tmpkeyclasses.toArray(new Class[tmpkeyclasses.size()]);
+ final Class<?>[][] kvgroup = new Class<?>[keyclassNames.length * valueclassNames.length][2];
+ for (int i = 0; i < keyclassNames.length; i++) {
+ final int tmpindex = i * valueclassNames.length;
+ for (int j = 0; j < valueclassNames.length; j++) {
+ kvgroup[tmpindex + j][0] = keyclasses[i];
+ kvgroup[tmpindex + j][1] = valueclasses[j];
+ }
+ }
+ return Arrays.asList(kvgroup);
+ }
+
+ private final Class<?> keyclass;
+ private final Class<?> valueclass;
+
+ public KVTest(Class<?> keyclass, Class<?> valueclass) {
+ this.keyclass = keyclass;
+ this.valueclass = valueclass;
+
+ }
+
+ @Test
+ public void testKVCompability() {
+
+ try {
+ final String nativeoutput = this.runNativeTest(
+ "Test:" + keyclass.getSimpleName() + "--" + valueclass.getSimpleName(), keyclass, valueclass);
+ final String normaloutput = this.runNormalTest(
+ "Test:" + keyclass.getSimpleName() + "--" + valueclass.getSimpleName(), keyclass, valueclass);
+ final boolean compareRet = ResultVerifier.verify(normaloutput, nativeoutput);
+ final String input = nativekvtestconf.get(TestConstants.NATIVETASK_KVTEST_INPUTDIR) + "/"
+ + keyclass.getName()
+ + "/" + valueclass.getName();
+ if(compareRet){
+ final FileSystem fs = FileSystem.get(hadoopkvtestconf);
+ fs.delete(new Path(nativeoutput), true);
+ fs.delete(new Path(normaloutput), true);
+ fs.delete(new Path(input), true);
+ fs.close();
+ }
+ assertEquals("file compare result: if they are the same ,then return true", true, compareRet);
+ } catch (final IOException e) {
+ assertEquals("test run exception:", null, e);
+ } catch (final Exception e) {
+ assertEquals("test run exception:", null, e);
+ }
+ }
+
+ @Before
+ public void startUp() {
+
+ }
+
+ private String runNativeTest(String jobname, Class<?> keyclass, Class<?> valueclass) throws IOException {
+ final String inputpath = nativekvtestconf.get(TestConstants.NATIVETASK_KVTEST_INPUTDIR) + "/"
+ + keyclass.getName()
+ + "/" + valueclass.getName();
+ final String outputpath = nativekvtestconf.get(TestConstants.NATIVETASK_KVTEST_OUTPUTDIR) + "/"
+ + keyclass.getName() + "/" + valueclass.getName();
+ // if output file exists ,then delete it
+ final FileSystem fs = FileSystem.get(nativekvtestconf);
+ fs.delete(new Path(outputpath));
+ fs.close();
+ nativekvtestconf.set(TestConstants.NATIVETASK_KVTEST_CREATEFILE, "true");
+ try {
+ final KVJob keyJob = new KVJob(jobname, nativekvtestconf, keyclass, valueclass, inputpath, outputpath);
+ keyJob.runJob();
+ } catch (final Exception e) {
+ return "native testcase run time error.";
+ }
+ return outputpath;
+ }
+
+ private String runNormalTest(String jobname, Class<?> keyclass, Class<?> valueclass) throws IOException {
+ final String inputpath = hadoopkvtestconf.get(TestConstants.NATIVETASK_KVTEST_INPUTDIR) + "/"
+ + keyclass.getName()
+ + "/" + valueclass.getName();
+ final String outputpath = hadoopkvtestconf
+ .get(TestConstants.NATIVETASK_KVTEST_NORMAL_OUTPUTDIR)
+ + "/"
+ + keyclass.getName() + "/" + valueclass.getName();
+ // if output file exists ,then delete it
+ final FileSystem fs = FileSystem.get(hadoopkvtestconf);
+ fs.delete(new Path(outputpath));
+ fs.close();
+ hadoopkvtestconf.set(TestConstants.NATIVETASK_KVTEST_CREATEFILE, "false");
+ try {
+ final KVJob keyJob = new KVJob(jobname, hadoopkvtestconf, keyclass, valueclass, inputpath, outputpath);
+ keyJob.runJob();
+ } catch (final Exception e) {
+ return "normal testcase run time error.";
+ }
+ return outputpath;
+ }
+
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/LargeKVTest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/LargeKVTest.java?rev=1613004&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/LargeKVTest.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/LargeKVTest.java Thu Jul 24 06:17:33 2014
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.kvtest;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.nativetask.testutil.ResultVerifier;
+import org.apache.hadoop.mapred.nativetask.testutil.ScenarioConfiguration;
+import org.apache.hadoop.mapred.nativetask.testutil.TestConstants;
+import org.junit.Test;
+
+public class LargeKVTest {
+
+ @Test
+ public void testKeySize() {
+ runKVSizeTests(Text.class, IntWritable.class);
+ }
+
+ @Test
+ public void testValueSize() {
+ runKVSizeTests(IntWritable.class, Text.class);
+ }
+
+ private static Configuration nativeConf = ScenarioConfiguration.getNativeConfiguration();
+ private static Configuration normalConf = ScenarioConfiguration.getNormalConfiguration();
+ static {
+ nativeConf.addResource(TestConstants.KVTEST_CONF_PATH);
+ nativeConf.set(TestConstants.NATIVETASK_KVTEST_CREATEFILE, "true");
+ normalConf.addResource(TestConstants.KVTEST_CONF_PATH);
+ normalConf.set(TestConstants.NATIVETASK_KVTEST_CREATEFILE, "false");
+ }
+
+ public void runKVSizeTests(Class<?> keyClass, Class<?> valueClass) {
+ if (!keyClass.equals(Text.class) && !valueClass.equals(Text.class)) {
+ return;
+ }
+ final int deafult_KVSize_Maximum = 1 << 22; // 4M
+ final int KVSize_Maximu = normalConf.getInt(TestConstants.NATIVETASK_KVSIZE_MAX_LARGEKV_TEST,
+ deafult_KVSize_Maximum);
+ try {
+
+ for (int i = 65536; i <= KVSize_Maximu; i *= 4) {
+ int min = i / 4;
+ int max = i;
+ nativeConf.set(TestConstants.NATIVETASK_KVSIZE_MIN, String.valueOf(min));
+ nativeConf.set(TestConstants.NATIVETASK_KVSIZE_MAX, String.valueOf(max));
+ normalConf.set(TestConstants.NATIVETASK_KVSIZE_MIN, String.valueOf(min));
+ normalConf.set(TestConstants.NATIVETASK_KVSIZE_MAX, String.valueOf(max));
+
+ System.out.println("===KV Size Test: min size: " + min + ", max size: " + max + ", keyClass: "
+ + keyClass.getName() + ", valueClass: " + valueClass.getName());
+
+ final String nativeOutPut = runNativeLargeKVTest("Test Large Value Size:" + String.valueOf(i), keyClass,
+ valueClass, nativeConf);
+ final String normalOutPut = this.runNormalLargeKVTest("Test Large Key Size:" + String.valueOf(i), keyClass,
+ valueClass, normalConf);
+ final boolean compareRet = ResultVerifier.verify(normalOutPut, nativeOutPut);
+ final String reason = "keytype: " + keyClass.getName() + ", valuetype: " + valueClass.getName()
+ + ", failed with " + (keyClass.equals(Text.class) ? "key" : "value") + ", min size: " + min
+ + ", max size: " + max + ", normal out: " + normalOutPut + ", native Out: " + nativeOutPut;
+ assertEquals(reason, true, compareRet);
+ }
+ } catch (final Exception e) {
+ // TODO: handle exception
+ // assertEquals("test run exception:", null, e);
+ e.printStackTrace();
+ }
+ }
+
+ private String runNativeLargeKVTest(String jobname, Class<?> keyclass, Class<?> valueclass, Configuration conf)
+ throws Exception {
+ final String inputpath = conf.get(TestConstants.NATIVETASK_KVTEST_INPUTDIR) + "/LargeKV/" + keyclass.getName()
+ + "/" + valueclass.getName();
+ final String outputpath = conf.get(TestConstants.NATIVETASK_KVTEST_OUTPUTDIR) + "/LargeKV/" + keyclass.getName()
+ + "/" + valueclass.getName();
+ // if output file exists ,then delete it
+ final FileSystem fs = FileSystem.get(conf);
+ fs.delete(new Path(outputpath), true);
+ fs.close();
+ try {
+ final KVJob keyJob = new KVJob(jobname, conf, keyclass, valueclass, inputpath, outputpath);
+ keyJob.runJob();
+ } catch (final Exception e) {
+ return "normal testcase run time error.";
+ }
+ return outputpath;
+ }
+
+ private String runNormalLargeKVTest(String jobname, Class<?> keyclass, Class<?> valueclass, Configuration conf)
+ throws IOException {
+ final String inputpath = conf.get(TestConstants.NATIVETASK_KVTEST_INPUTDIR) + "/LargeKV/" + keyclass.getName()
+ + "/" + valueclass.getName();
+ final String outputpath = conf.get(TestConstants.NATIVETASK_KVTEST_NORMAL_OUTPUTDIR) + "/LargeKV/"
+ + keyclass.getName() + "/" + valueclass.getName();
+ // if output file exists ,then delete it
+ final FileSystem fs = FileSystem.get(conf);
+ fs.delete(new Path(outputpath), true);
+ fs.close();
+ try {
+ final KVJob keyJob = new KVJob(jobname, conf, keyclass, valueclass, inputpath, outputpath);
+ keyJob.runJob();
+ } catch (final Exception e) {
+ return "normal testcase run time error.";
+ }
+ return outputpath;
+ }
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/TestInputFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/TestInputFile.java?rev=1613004&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/TestInputFile.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/TestInputFile.java Thu Jul 24 06:17:33 2014
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.kvtest;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.VIntWritable;
+import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.mapred.nativetask.testutil.BytesFactory;
+import org.apache.hadoop.mapred.nativetask.testutil.ScenarioConfiguration;
+import org.apache.hadoop.mapred.nativetask.testutil.TestConstants;
+
+
+public class TestInputFile {
+
+ public static class KVSizeScope {
+ private static final int DefaultMinNum = 1;
+ private static final int DefaultMaxNum = 64;
+
+ public int minBytesNum;
+ public int maxBytesNum;
+
+ public KVSizeScope() {
+ this.minBytesNum = DefaultMinNum;
+ this.maxBytesNum = DefaultMaxNum;
+ }
+
+ public KVSizeScope(int min, int max) {
+ this.minBytesNum = min;
+ this.maxBytesNum = max;
+ }
+ }
+
+ private static HashMap<String, KVSizeScope> map = new HashMap<String, KVSizeScope>();
+
+ private byte[] databuf = null;
+ private final String keyClsName, valueClsName;
+ private int filesize = 0;
+ private int keyMaxBytesNum, keyMinBytesNum;
+ private int valueMaxBytesNum, valueMinBytesNum;
+ private SequenceFile.Writer writer = null;
+ Random r = new Random();
+ public static final int DATABUFSIZE = 1 << 22; // 4M
+
+ private enum State {
+ KEY, VALUE
+ };
+
+ static {
+ map.put(BooleanWritable.class.getName(), new KVSizeScope(1, 1));
+ map.put(DoubleWritable.class.getName(), new KVSizeScope(8, 8));
+ map.put(FloatWritable.class.getName(), new KVSizeScope(4, 4));
+ map.put(VLongWritable.class.getName(), new KVSizeScope(8, 8));
+ map.put(ByteWritable.class.getName(), new KVSizeScope(1, 1));
+ map.put(LongWritable.class.getName(), new KVSizeScope(8, 8));
+ map.put(VIntWritable.class.getName(), new KVSizeScope(4, 4));
+ map.put(IntWritable.class.getName(), new KVSizeScope(4, 4));
+ }
+
+ public TestInputFile(int filesize, String keytype, String valuetype, Configuration conf) throws Exception {
+ this.filesize = filesize;
+ this.databuf = new byte[DATABUFSIZE];
+ this.keyClsName = keytype;
+ this.valueClsName = valuetype;
+ final int defaultMinBytes = conf.getInt(TestConstants.NATIVETASK_KVSIZE_MIN, 1);
+ final int defaultMaxBytes = conf.getInt(TestConstants.NATIVETASK_KVSIZE_MAX, 64);
+
+ if (map.get(keytype) != null) {
+ keyMinBytesNum = map.get(keytype).minBytesNum;
+ keyMaxBytesNum = map.get(keytype).maxBytesNum;
+ } else {
+ keyMinBytesNum = defaultMinBytes;
+ keyMaxBytesNum = defaultMaxBytes;
+ }
+
+ if (map.get(valuetype) != null) {
+ valueMinBytesNum = map.get(valuetype).minBytesNum;
+ valueMaxBytesNum = map.get(valuetype).maxBytesNum;
+ } else {
+ valueMinBytesNum = defaultMinBytes;
+ valueMaxBytesNum = defaultMaxBytes;
+ }
+ }
+
+ public void createSequenceTestFile(String filepath) throws Exception {
+ int FULL_BYTE_SPACE = 256;
+ createSequenceTestFile(filepath, FULL_BYTE_SPACE);
+ }
+
+ public void createSequenceTestFile(String filepath, int base) throws Exception {
+ createSequenceTestFile(filepath, base, (byte)0);
+ }
+
+ public void createSequenceTestFile(String filepath, int base, byte start) throws Exception {
+ System.out.println("create file " + filepath);
+ System.out.println(keyClsName + " " + valueClsName);
+ Class<?> tmpkeycls, tmpvaluecls;
+ try {
+ tmpkeycls = Class.forName(keyClsName);
+ } catch (final ClassNotFoundException e) {
+ throw new Exception("key class not found: ", e);
+ }
+ try {
+ tmpvaluecls = Class.forName(valueClsName);
+ } catch (final ClassNotFoundException e) {
+ throw new Exception("key class not found: ", e);
+ }
+ try {
+ final Path outputfilepath = new Path(filepath);
+ final ScenarioConfiguration conf= new ScenarioConfiguration();
+ final FileSystem hdfs = outputfilepath.getFileSystem(conf);
+ writer = new SequenceFile.Writer(hdfs, conf, outputfilepath, tmpkeycls, tmpvaluecls);
+ } catch (final Exception e) {
+ e.printStackTrace();
+ }
+
+ int tmpfilesize = this.filesize;
+ while (tmpfilesize > DATABUFSIZE) {
+ nextRandomBytes(databuf, base, start);
+ final int size = flushBuf(DATABUFSIZE);
+ tmpfilesize -= size;
+ }
+ nextRandomBytes(databuf, base, start);
+ flushBuf(tmpfilesize);
+
+ if (writer != null) {
+ IOUtils.closeStream(writer);
+ } else {
+ throw new Exception("no writer to create sequenceTestFile!");
+ }
+ }
+
+ private void nextRandomBytes(byte[] buf, int base) {
+ nextRandomBytes(buf, base, (byte)0);
+ }
+
+ private void nextRandomBytes(byte[] buf, int base, byte start) {
+ r.nextBytes(buf);
+ for (int i = 0; i < buf.length; i++) {
+ buf[i] = (byte) ((buf[i] & 0xFF) % base + start);
+ }
+ }
+
+ private int flushBuf(int buflen) throws Exception {
+ final Random r = new Random();
+ int keybytesnum = 0;
+ int valuebytesnum = 0;
+ int offset = 0;
+
+ while (offset < buflen) {
+ final int remains = buflen - offset;
+ keybytesnum = keyMaxBytesNum;
+ if (keyMaxBytesNum != keyMinBytesNum) {
+ keybytesnum = keyMinBytesNum + r.nextInt(keyMaxBytesNum - keyMinBytesNum);
+ }
+
+ valuebytesnum = valueMaxBytesNum;
+ if (valueMaxBytesNum != valueMinBytesNum) {
+ valuebytesnum = valueMinBytesNum + r.nextInt(valueMaxBytesNum - valueMinBytesNum);
+ }
+
+ if (keybytesnum + valuebytesnum > remains) {
+ break;
+ }
+
+ final byte[] key = new byte[keybytesnum];
+ final byte[] value = new byte[valuebytesnum];
+
+ System.arraycopy(databuf, offset, key, 0, keybytesnum);
+ offset += keybytesnum;
+
+ System.arraycopy(databuf, offset, value, 0, valuebytesnum);
+ offset += valuebytesnum;
+
+ try {
+ writer.append(BytesFactory.newObject(key, this.keyClsName), BytesFactory.newObject(value, this.valueClsName));
+ } catch (final IOException e) {
+ e.printStackTrace();
+ throw new Exception("sequence file create failed", e);
+ }
+ }
+ return offset;
+ }
+
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/nonsorttest/NonSortTest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/nonsorttest/NonSortTest.java?rev=1613004&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/nonsorttest/NonSortTest.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/nonsorttest/NonSortTest.java Thu Jul 24 06:17:33 2014
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.nonsorttest;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.nativetask.kvtest.TestInputFile;
+import org.apache.hadoop.mapred.nativetask.testutil.ResultVerifier;
+import org.apache.hadoop.mapred.nativetask.testutil.ScenarioConfiguration;
+import org.apache.hadoop.mapred.nativetask.testutil.TestConstants;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.junit.Before;
+import org.junit.Test;
+
+public class NonSortTest {
+
+ @Test
+ public void nonSortTest() throws Exception {
+ Configuration nativeConf = ScenarioConfiguration.getNativeConfiguration();
+ nativeConf.addResource(TestConstants.NONSORT_TEST_CONF);
+ nativeConf.set(TestConstants.NATIVETASK_MAP_OUTPUT_SORT, "false");
+ String inputpath = nativeConf.get(TestConstants.NONSORT_TEST_INPUTDIR);
+ String outputpath = nativeConf.get(TestConstants.NONSORT_TEST_NATIVE_OUTPUT);
+ final Job nativeNonSort = getJob(nativeConf, "NativeNonSort", inputpath, outputpath);
+ nativeNonSort.waitForCompletion(true);
+
+ Configuration normalConf = ScenarioConfiguration.getNormalConfiguration();
+ normalConf.addResource(TestConstants.NONSORT_TEST_CONF);
+ inputpath = normalConf.get(TestConstants.NONSORT_TEST_INPUTDIR);
+ outputpath = normalConf.get(TestConstants.NONSORT_TEST_NORMAL_OUTPUT);
+ final Job hadoopWithSort = getJob(normalConf, "NormalJob", inputpath, outputpath);
+ hadoopWithSort.waitForCompletion(true);
+
+ final boolean compareRet = ResultVerifier.verify(nativeConf.get(TestConstants.NONSORT_TEST_NATIVE_OUTPUT),
+ normalConf.get(TestConstants.NONSORT_TEST_NORMAL_OUTPUT));
+ assertEquals("file compare result: if they are the same ,then return true", true, compareRet);
+ }
+
+ @Before
+ public void startUp() throws Exception {
+ final ScenarioConfiguration configuration = new ScenarioConfiguration();
+ configuration.addNonSortTestConf();
+ final FileSystem fs = FileSystem.get(configuration);
+ final Path path = new Path(configuration.get(TestConstants.NONSORT_TEST_INPUTDIR));
+ if (!fs.exists(path)) {
+ new TestInputFile(configuration.getInt("nativetask.nonsorttest.filesize", 10000000), Text.class.getName(),
+ Text.class.getName(), configuration).createSequenceTestFile(path.toString());
+ }
+ fs.close();
+ }
+
+ private Job getJob(Configuration conf, String jobName, String inputpath, String outputpath) throws IOException {
+ final FileSystem fs = FileSystem.get(conf);
+ if (fs.exists(new Path(outputpath))) {
+ fs.delete(new Path(outputpath), true);
+ }
+ fs.close();
+ final Job job = new Job(conf, jobName);
+ job.setJarByClass(NonSortTestMR.class);
+ job.setMapperClass(NonSortTestMR.Map.class);
+ job.setReducerClass(NonSortTestMR.KeyHashSumReduce.class);
+ job.setOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(IntWritable.class);
+ job.setOutputValueClass(LongWritable.class);
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setOutputFormatClass(TextOutputFormat.class);
+ FileInputFormat.addInputPath(job, new Path(inputpath));
+ FileOutputFormat.setOutputPath(job, new Path(outputpath));
+ return job;
+ }
+
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/nonsorttest/NonSortTestMR.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/nonsorttest/NonSortTestMR.java?rev=1613004&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/nonsorttest/NonSortTestMR.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/nonsorttest/NonSortTestMR.java Thu Jul 24 06:17:33 2014
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.nonsorttest;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+
+public class NonSortTestMR {
+
+ public static class Map extends Mapper<Object, Text, Text, IntWritable> {
+ private final static IntWritable one = new IntWritable(1);
+ private final Text word = new Text();
+
+ @Override
+ public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
+ final String line = value.toString();
+ final StringTokenizer tokenizer = new StringTokenizer(line);
+ while (tokenizer.hasMoreTokens()) {
+ word.set(tokenizer.nextToken());
+ context.write(word, one);
+ }
+ }
+ }
+
+ public static class KeyHashSumReduce extends Reducer<Text, IntWritable, Text, LongWritable> {
+ long sum = 0;
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(os);
+
+ @Override
+ public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,
+ InterruptedException {
+ for (final IntWritable val : values) {
+ os.reset();
+ key.write(dos);
+ final int hash = Arrays.hashCode(os.toByteArray());
+ sum += hash;
+ }
+ }
+
+ @Override
+ public void cleanup(Context context) throws IOException, InterruptedException {
+ context.write(new Text("NonSortTest"), new LongWritable(sum));
+ }
+ }
+
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/BytesFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/BytesFactory.java?rev=1613004&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/BytesFactory.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/BytesFactory.java Thu Jul 24 06:17:33 2014
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.testutil;
+
+import java.util.Random;
+
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.VIntWritable;
+import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.mapred.nativetask.util.BytesUtil;
+
+
+public class BytesFactory {
+ public static Random r = new Random();
+
+ public static Object newObject(byte[] seed, String className) {
+ r.setSeed(seed.hashCode());
+ if (className.equals(IntWritable.class.getName())) {
+ return new IntWritable(Ints.fromByteArray(seed));
+ } else if (className.equals(FloatWritable.class.getName())) {
+ return new FloatWritable(r.nextFloat());
+ } else if (className.equals(DoubleWritable.class.getName())) {
+ return new DoubleWritable(r.nextDouble());
+ } else if (className.equals(LongWritable.class.getName())) {
+ return new LongWritable(Longs.fromByteArray(seed));
+ } else if (className.equals(VIntWritable.class.getName())) {
+ return new VIntWritable(Ints.fromByteArray(seed));
+ } else if (className.equals(VLongWritable.class.getName())) {
+ return new VLongWritable(Longs.fromByteArray(seed));
+ } else if (className.equals(BooleanWritable.class.getName())) {
+ return new BooleanWritable(seed[0] % 2 == 1 ? true : false);
+ } else if (className.equals(Text.class.getName())) {
+ return new Text(BytesUtil.toStringBinary(seed));
+ } else if (className.equals(ByteWritable.class.getName())) {
+ return new ByteWritable(seed.length > 0 ? seed[0] : 0);
+ } else if (className.equals(BytesWritable.class.getName())) {
+ return new BytesWritable(seed);
+ } else if (className.equals(UTF8.class.getName())) {
+ return new UTF8(BytesUtil.toStringBinary(seed));
+ } else if (className.equals(MockValueClass.class.getName())) {
+ return new MockValueClass(seed);
+ } else {
+ return null;
+ }
+ }
+
+
+ public static <VTYPE> byte[] fromBytes(byte[] bytes) throws Exception {
+ throw new Exception("Not supported");
+ }
+
+ public static <VTYPE> byte[] toBytes(VTYPE obj) {
+ final String className = obj.getClass().getName();
+ if (className.equals(IntWritable.class.getName())) {
+ return Ints.toByteArray(((IntWritable) obj).get());
+ } else if (className.equals(FloatWritable.class.getName())) {
+ return BytesUtil.toBytes(((FloatWritable) obj).get());
+ } else if (className.equals(DoubleWritable.class.getName())) {
+ return BytesUtil.toBytes(((DoubleWritable) obj).get());
+ } else if (className.equals(LongWritable.class.getName())) {
+ return Longs.toByteArray(((LongWritable) obj).get());
+ } else if (className.equals(VIntWritable.class.getName())) {
+ return Ints.toByteArray(((VIntWritable) obj).get());
+ } else if (className.equals(VLongWritable.class.getName())) {
+ return Longs.toByteArray(((VLongWritable) obj).get());
+ } else if (className.equals(BooleanWritable.class.getName())) {
+ return BytesUtil.toBytes(((BooleanWritable) obj).get());
+ } else if (className.equals(Text.class.getName())) {
+ return ((Text)obj).copyBytes();
+ } else if (className.equals(ByteWritable.class.getName())) {
+ return Ints.toByteArray((int) ((ByteWritable) obj).get());
+ } else if (className.equals(BytesWritable.class.getName())) {
+ // TODO: copyBytes instead?
+ return ((BytesWritable) obj).getBytes();
+ } else {
+ return new byte[0];
+ }
+ }
+}