You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2012/01/25 00:22:01 UTC
svn commit: r1235548 [7/8] - in /hadoop/common/branches/branch-1: ./
src/core/org/apache/hadoop/conf/ src/core/org/apache/hadoop/io/
src/mapred/org/apache/hadoop/mapreduce/
src/mapred/org/apache/hadoop/mapreduce/lib/db/
src/mapred/org/apache/hadoop/map...
Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsBinaryInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsBinaryInputFormat.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsBinaryInputFormat.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsBinaryInputFormat.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import junit.framework.TestCase;
+
+public class TestMRSequenceFileAsBinaryInputFormat extends TestCase {
+ private static final int RECORDS = 10000;
+
+ public void testBinary() throws IOException, InterruptedException {
+ Job job = new Job();
+ FileSystem fs = FileSystem.getLocal(job.getConfiguration());
+ Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred");
+ Path file = new Path(dir, "testbinary.seq");
+ Random r = new Random();
+ long seed = r.nextLong();
+ r.setSeed(seed);
+
+ fs.delete(dir, true);
+ FileInputFormat.setInputPaths(job, dir);
+
+ Text tkey = new Text();
+ Text tval = new Text();
+
+ SequenceFile.Writer writer = new SequenceFile.Writer(fs,
+ job.getConfiguration(), file, Text.class, Text.class);
+ try {
+ for (int i = 0; i < RECORDS; ++i) {
+ tkey.set(Integer.toString(r.nextInt(), 36));
+ tval.set(Long.toString(r.nextLong(), 36));
+ writer.append(tkey, tval);
+ }
+ } finally {
+ writer.close();
+ }
+ TaskAttemptContext context = MapReduceTestUtil.
+ createDummyMapTaskAttemptContext(job.getConfiguration());
+ InputFormat<BytesWritable,BytesWritable> bformat =
+ new SequenceFileAsBinaryInputFormat();
+
+ int count = 0;
+ r.setSeed(seed);
+ BytesWritable bkey = new BytesWritable();
+ BytesWritable bval = new BytesWritable();
+ Text cmpkey = new Text();
+ Text cmpval = new Text();
+ DataInputBuffer buf = new DataInputBuffer();
+ FileInputFormat.setInputPaths(job, file);
+ for (InputSplit split : bformat.getSplits(job)) {
+ RecordReader<BytesWritable, BytesWritable> reader =
+ bformat.createRecordReader(split, context);
+ MapContext<BytesWritable, BytesWritable, BytesWritable, BytesWritable>
+ mcontext = new MapContext<BytesWritable, BytesWritable,
+ BytesWritable, BytesWritable>(job.getConfiguration(),
+ context.getTaskAttemptID(), reader, null, null,
+ MapReduceTestUtil.createDummyReporter(),
+ split);
+ reader.initialize(split, mcontext);
+ try {
+ while (reader.nextKeyValue()) {
+ bkey = reader.getCurrentKey();
+ bval = reader.getCurrentValue();
+ tkey.set(Integer.toString(r.nextInt(), 36));
+ tval.set(Long.toString(r.nextLong(), 36));
+ buf.reset(bkey.getBytes(), bkey.getLength());
+ cmpkey.readFields(buf);
+ buf.reset(bval.getBytes(), bval.getLength());
+ cmpval.readFields(buf);
+ assertTrue(
+ "Keys don't match: " + "*" + cmpkey.toString() + ":" +
+ tkey.toString() + "*",
+ cmpkey.toString().equals(tkey.toString()));
+ assertTrue(
+ "Vals don't match: " + "*" + cmpval.toString() + ":" +
+ tval.toString() + "*",
+ cmpval.toString().equals(tval.toString()));
+ ++count;
+ }
+ } finally {
+ reader.close();
+ }
+ }
+ assertEquals("Some records not found", RECORDS, count);
+ }
+
+}
Propchange: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsBinaryInputFormat.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsTextInputFormat.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsTextInputFormat.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsTextInputFormat.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.util.*;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.conf.*;
+
+public class TestMRSequenceFileAsTextInputFormat extends TestCase {
+ private static int MAX_LENGTH = 10000;
+ private static Configuration conf = new Configuration();
+
+ public void testFormat() throws Exception {
+ Job job = new Job(conf);
+ FileSystem fs = FileSystem.getLocal(conf);
+ Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred");
+ Path file = new Path(dir, "test.seq");
+
+ int seed = new Random().nextInt();
+ Random random = new Random(seed);
+
+ fs.delete(dir, true);
+
+ FileInputFormat.setInputPaths(job, dir);
+
+ // for a variety of lengths
+ for (int length = 0; length < MAX_LENGTH;
+ length += random.nextInt(MAX_LENGTH / 10) + 1) {
+
+ // create a file with length entries
+ SequenceFile.Writer writer =
+ SequenceFile.createWriter(fs, conf, file,
+ IntWritable.class, LongWritable.class);
+ try {
+ for (int i = 0; i < length; i++) {
+ IntWritable key = new IntWritable(i);
+ LongWritable value = new LongWritable(10 * i);
+ writer.append(key, value);
+ }
+ } finally {
+ writer.close();
+ }
+
+ TaskAttemptContext context = MapReduceTestUtil.
+ createDummyMapTaskAttemptContext(job.getConfiguration());
+ // try splitting the file in a variety of sizes
+ InputFormat<Text, Text> format =
+ new SequenceFileAsTextInputFormat();
+
+ for (int i = 0; i < 3; i++) {
+ // check each split
+ BitSet bits = new BitSet(length);
+ int numSplits =
+ random.nextInt(MAX_LENGTH / (SequenceFile.SYNC_INTERVAL / 20)) + 1;
+ FileInputFormat.setMaxInputSplitSize(job,
+ fs.getFileStatus(file).getLen() / numSplits);
+ for (InputSplit split : format.getSplits(job)) {
+ RecordReader<Text, Text> reader =
+ format.createRecordReader(split, context);
+ MapContext<Text, Text, Text, Text> mcontext =
+ new MapContext<Text, Text, Text, Text>(job.getConfiguration(),
+ context.getTaskAttemptID(), reader, null, null,
+ MapReduceTestUtil.createDummyReporter(),
+ split);
+ reader.initialize(split, mcontext);
+ Class<?> readerClass = reader.getClass();
+ assertEquals("reader class is SequenceFileAsTextRecordReader.",
+ SequenceFileAsTextRecordReader.class, readerClass);
+ Text key;
+ try {
+ int count = 0;
+ while (reader.nextKeyValue()) {
+ key = reader.getCurrentKey();
+ int keyInt = Integer.parseInt(key.toString());
+ assertFalse("Key in multiple partitions.", bits.get(keyInt));
+ bits.set(keyInt);
+ count++;
+ }
+ } finally {
+ reader.close();
+ }
+ }
+ assertEquals("Some keys in no partition.", length, bits.cardinality());
+ }
+
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ new TestMRSequenceFileAsTextInputFormat().testFormat();
+ }
+}
Propchange: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsTextInputFormat.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileInputFilter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileInputFilter.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileInputFilter.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileInputFilter.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.*;
+import java.util.*;
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.*;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.conf.*;
+
+public class TestMRSequenceFileInputFilter extends TestCase {
+ private static final Log LOG =
+ LogFactory.getLog(TestMRSequenceFileInputFilter.class.getName());
+
+ private static final int MAX_LENGTH = 15000;
+ private static final Configuration conf = new Configuration();
+ private static final Job job;
+ private static final FileSystem fs;
+ private static final Path inDir =
+ new Path(System.getProperty("test.build.data",".") + "/mapred");
+ private static final Path inFile = new Path(inDir, "test.seq");
+ private static final Random random = new Random(1);
+
+ static {
+ try {
+ job = new Job(conf);
+ FileInputFormat.setInputPaths(job, inDir);
+ fs = FileSystem.getLocal(conf);
+ } catch (IOException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static void createSequenceFile(int numRecords) throws Exception {
+ // create a file with length entries
+ SequenceFile.Writer writer =
+ SequenceFile.createWriter(fs, conf, inFile,
+ Text.class, BytesWritable.class);
+ try {
+ for (int i = 1; i <= numRecords; i++) {
+ Text key = new Text(Integer.toString(i));
+ byte[] data = new byte[random.nextInt(10)];
+ random.nextBytes(data);
+ BytesWritable value = new BytesWritable(data);
+ writer.append(key, value);
+ }
+ } finally {
+ writer.close();
+ }
+ }
+
+
+ private int countRecords(int numSplits)
+ throws IOException, InterruptedException {
+ InputFormat<Text, BytesWritable> format =
+ new SequenceFileInputFilter<Text, BytesWritable>();
+ if (numSplits == 0) {
+ numSplits =
+ random.nextInt(MAX_LENGTH / (SequenceFile.SYNC_INTERVAL / 20)) + 1;
+ }
+ FileInputFormat.setMaxInputSplitSize(job,
+ fs.getFileStatus(inFile).getLen() / numSplits);
+ TaskAttemptContext context = MapReduceTestUtil.
+ createDummyMapTaskAttemptContext(job.getConfiguration());
+ // check each split
+ int count = 0;
+ for (InputSplit split : format.getSplits(job)) {
+ RecordReader<Text, BytesWritable> reader =
+ format.createRecordReader(split, context);
+ MapContext<Text, BytesWritable, Text, BytesWritable> mcontext =
+ new MapContext<Text, BytesWritable, Text, BytesWritable>(
+ job.getConfiguration(),
+ context.getTaskAttemptID(), reader, null, null,
+ MapReduceTestUtil.createDummyReporter(), split);
+ reader.initialize(split, mcontext);
+ try {
+ while (reader.nextKeyValue()) {
+ LOG.info("Accept record " + reader.getCurrentKey().toString());
+ count++;
+ }
+ } finally {
+ reader.close();
+ }
+ }
+ return count;
+ }
+
+ public void testRegexFilter() throws Exception {
+ // set the filter class
+ LOG.info("Testing Regex Filter with patter: \\A10*");
+ SequenceFileInputFilter.setFilterClass(job,
+ SequenceFileInputFilter.RegexFilter.class);
+ SequenceFileInputFilter.RegexFilter.setPattern(
+ job.getConfiguration(), "\\A10*");
+
+ // clean input dir
+ fs.delete(inDir, true);
+
+ // for a variety of lengths
+ for (int length = 1; length < MAX_LENGTH;
+ length += random.nextInt(MAX_LENGTH / 10) + 1) {
+ LOG.info("******Number of records: " + length);
+ createSequenceFile(length);
+ int count = countRecords(0);
+ assertEquals(count, length==0 ? 0 : (int)Math.log10(length) + 1);
+ }
+
+ // clean up
+ fs.delete(inDir, true);
+ }
+
+ public void testPercentFilter() throws Exception {
+ LOG.info("Testing Percent Filter with frequency: 1000");
+ // set the filter class
+ SequenceFileInputFilter.setFilterClass(job,
+ SequenceFileInputFilter.PercentFilter.class);
+ SequenceFileInputFilter.PercentFilter.setFrequency(
+ job.getConfiguration(), 1000);
+
+ // clean input dir
+ fs.delete(inDir, true);
+
+ // for a variety of lengths
+ for (int length = 0; length < MAX_LENGTH;
+ length += random.nextInt(MAX_LENGTH / 10) + 1) {
+ LOG.info("******Number of records: "+length);
+ createSequenceFile(length);
+ int count = countRecords(1);
+ LOG.info("Accepted " + count + " records");
+ int expectedCount = length / 1000;
+ if (expectedCount * 1000 != length)
+ expectedCount++;
+ assertEquals(count, expectedCount);
+ }
+
+ // clean up
+ fs.delete(inDir, true);
+ }
+
+ public void testMD5Filter() throws Exception {
+ // set the filter class
+ LOG.info("Testing MD5 Filter with frequency: 1000");
+ SequenceFileInputFilter.setFilterClass(job,
+ SequenceFileInputFilter.MD5Filter.class);
+ SequenceFileInputFilter.MD5Filter.setFrequency(
+ job.getConfiguration(), 1000);
+
+ // clean input dir
+ fs.delete(inDir, true);
+
+ // for a variety of lengths
+ for (int length = 0; length < MAX_LENGTH;
+ length += random.nextInt(MAX_LENGTH / 10) + 1) {
+ LOG.info("******Number of records: " + length);
+ createSequenceFile(length);
+ LOG.info("Accepted " + countRecords(0) + " records");
+ }
+ // clean up
+ fs.delete(inDir, true);
+ }
+
+ public static void main(String[] args) throws Exception {
+ TestMRSequenceFileInputFilter filter = new TestMRSequenceFileInputFilter();
+ filter.testRegexFilter();
+ }
+}
Propchange: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileInputFilter.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestMultipleInputs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestMultipleInputs.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestMultipleInputs.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestMultipleInputs.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.Mapper;
+
+/**
+ * @see TestDelegatingInputFormat
+ */
+public class TestMultipleInputs extends TestCase {
+
+ public void testAddInputPathWithFormat() throws IOException {
+ final Job job = new Job();
+ MultipleInputs.addInputPath(job, new Path("/foo"), TextInputFormat.class);
+ MultipleInputs.addInputPath(job, new Path("/bar"),
+ KeyValueTextInputFormat.class);
+ final Map<Path, InputFormat> inputs = MultipleInputs
+ .getInputFormatMap(new JobContext(job.getConfiguration(), new JobID()));
+ assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass());
+ assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar"))
+ .getClass());
+ }
+
+ public void testAddInputPathWithMapper() throws IOException {
+ final Job job = new Job();
+ MultipleInputs.addInputPath(job, new Path("/foo"), TextInputFormat.class,
+ MapClass.class);
+ MultipleInputs.addInputPath(job, new Path("/bar"),
+ KeyValueTextInputFormat.class, MapClass2.class);
+ final Map<Path, InputFormat> inputs = MultipleInputs
+ .getInputFormatMap(job);
+ final Map<Path, Class<? extends Mapper>> maps = MultipleInputs
+ .getMapperTypeMap(job);
+
+ assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass());
+ assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar"))
+ .getClass());
+ assertEquals(MapClass.class, maps.get(new Path("/foo")));
+ assertEquals(MapClass2.class, maps.get(new Path("/bar")));
+ }
+
+ static class MapClass extends Mapper<String, String, String, String> {
+ }
+
+ static class MapClass2 extends MapClass {
+ }
+}
Propchange: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestMultipleInputs.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.*;
+import java.util.*;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapreduce.*;
+
+public class TestNLineInputFormat extends TestCase {
+ private static int MAX_LENGTH = 200;
+
+ private static Configuration conf = new Configuration();
+ private static FileSystem localFs = null;
+
+ static {
+ try {
+ localFs = FileSystem.getLocal(conf);
+ } catch (IOException e) {
+ throw new RuntimeException("init failure", e);
+ }
+ }
+
+ private static Path workDir =
+ new Path(new Path(System.getProperty("test.build.data", "."), "data"),
+ "TestNLineInputFormat");
+
+ public void testFormat() throws Exception {
+ Job job = new Job(conf);
+ Path file = new Path(workDir, "test.txt");
+
+ int seed = new Random().nextInt();
+ Random random = new Random(seed);
+
+ localFs.delete(workDir, true);
+ FileInputFormat.setInputPaths(job, workDir);
+ int numLinesPerMap = 5;
+ NLineInputFormat.setNumLinesPerSplit(job, numLinesPerMap);
+ // for a variety of lengths
+ for (int length = 0; length < MAX_LENGTH;
+ length += random.nextInt(MAX_LENGTH / 10) + 1) {
+ // create a file with length entries
+ Writer writer = new OutputStreamWriter(localFs.create(file));
+ try {
+ for (int i = 0; i < length; i++) {
+ writer.write(Integer.toString(i));
+ writer.write("\n");
+ }
+ } finally {
+ writer.close();
+ }
+ checkFormat(job, numLinesPerMap);
+ }
+ }
+
+ void checkFormat(Job job, int expectedN)
+ throws IOException, InterruptedException {
+ NLineInputFormat format = new NLineInputFormat();
+ List<InputSplit> splits = format.getSplits(job);
+ // check all splits except last one
+ int count = 0;
+ for (int i = 0; i < splits.size() -1; i++) {
+ assertEquals("There are no split locations", 0,
+ splits.get(i).getLocations().length);
+ TaskAttemptContext context = MapReduceTestUtil.
+ createDummyMapTaskAttemptContext(job.getConfiguration());
+ RecordReader<LongWritable, Text> reader = format.createRecordReader(
+ splits.get(i), context);
+ Class<?> clazz = reader.getClass();
+ assertEquals("reader class is LineRecordReader.",
+ LineRecordReader.class, clazz);
+ MapContext<LongWritable, Text, LongWritable, Text> mcontext =
+ new MapContext<LongWritable, Text, LongWritable, Text>(
+ job.getConfiguration(), context.getTaskAttemptID(), reader, null,
+ null, MapReduceTestUtil.createDummyReporter(), splits.get(i));
+ reader.initialize(splits.get(i), mcontext);
+
+ try {
+ count = 0;
+ while (reader.nextKeyValue()) {
+ count++;
+ }
+ } finally {
+ reader.close();
+ }
+ assertEquals("number of lines in split is " + expectedN ,
+ expectedN, count);
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ new TestNLineInputFormat().testFormat();
+ }
+}
Propchange: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/jobcontrol/TestControlledJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/jobcontrol/TestControlledJob.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/jobcontrol/TestControlledJob.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/jobcontrol/TestControlledJob.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.jobcontrol;
+
+import static org.junit.Assert.assertFalse;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+/**
+ */
+public class TestControlledJob {
+
+ @Test
+ public void testAddingDependingJobToRunningJobFails() throws Exception {
+ Configuration conf = new Configuration();
+ ControlledJob job1 = new ControlledJob(conf);
+ job1.setJobState(ControlledJob.State.RUNNING);
+ assertFalse(job1.addDependingJob(new ControlledJob(conf)));
+ }
+
+ @Test
+ public void testAddingDependingJobToCompletedJobFails() throws Exception {
+ Configuration conf = new Configuration();
+ ControlledJob job1 = new ControlledJob(conf);
+ job1.setJobState(ControlledJob.State.SUCCESS);
+ assertFalse(job1.addDependingJob(new ControlledJob(conf)));
+ }
+
+}
Propchange: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/jobcontrol/TestControlledJob.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/jobcontrol/TestMapReduceJobControlWithMocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/jobcontrol/TestMapReduceJobControlWithMocks.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/jobcontrol/TestMapReduceJobControlWithMocks.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/jobcontrol/TestMapReduceJobControlWithMocks.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.jobcontrol;
+
+import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.Test;
+
+/**
+ * Tests the JobControl API using mock and stub Job instances.
+ */
+public class TestMapReduceJobControlWithMocks {
+
+ @Test
+ public void testSuccessfulJobs() throws Exception {
+ JobControl jobControl = new JobControl("Test");
+
+ ControlledJob job1 = createSuccessfulControlledJob(jobControl);
+ ControlledJob job2 = createSuccessfulControlledJob(jobControl);
+ ControlledJob job3 = createSuccessfulControlledJob(jobControl, job1, job2);
+ ControlledJob job4 = createSuccessfulControlledJob(jobControl, job3);
+
+ runJobControl(jobControl);
+
+ assertEquals("Success list", 4, jobControl.getSuccessfulJobList().size());
+ assertEquals("Failed list", 0, jobControl.getFailedJobList().size());
+
+ assertTrue(job1.getJobState() == ControlledJob.State.SUCCESS);
+ assertTrue(job2.getJobState() == ControlledJob.State.SUCCESS);
+ assertTrue(job3.getJobState() == ControlledJob.State.SUCCESS);
+ assertTrue(job4.getJobState() == ControlledJob.State.SUCCESS);
+
+ jobControl.stop();
+ }
+
+ @Test
+ public void testFailedJob() throws Exception {
+ JobControl jobControl = new JobControl("Test");
+
+ ControlledJob job1 = createFailedControlledJob(jobControl);
+ ControlledJob job2 = createSuccessfulControlledJob(jobControl);
+ ControlledJob job3 = createSuccessfulControlledJob(jobControl, job1, job2);
+ ControlledJob job4 = createSuccessfulControlledJob(jobControl, job3);
+
+ runJobControl(jobControl);
+
+ assertEquals("Success list", 1, jobControl.getSuccessfulJobList().size());
+ assertEquals("Failed list", 3, jobControl.getFailedJobList().size());
+
+ assertTrue(job1.getJobState() == ControlledJob.State.FAILED);
+ assertTrue(job2.getJobState() == ControlledJob.State.SUCCESS);
+ assertTrue(job3.getJobState() == ControlledJob.State.DEPENDENT_FAILED);
+ assertTrue(job4.getJobState() == ControlledJob.State.DEPENDENT_FAILED);
+
+ jobControl.stop();
+ }
+
+ @Test
+ public void testKillJob() throws Exception {
+ JobControl jobControl = new JobControl("Test");
+
+ ControlledJob job = createFailedControlledJob(jobControl);
+
+ job.killJob();
+
+ // Verify that killJob() was called on the mock Job
+ verify(job.getJob()).killJob();
+ }
+
+ private Job createJob(boolean complete, boolean successful)
+ throws IOException, InterruptedException {
+ // Create a stub Job that responds in a controlled way
+ Job mockJob = mock(Job.class);
+ when(mockJob.getConfiguration()).thenReturn(new Configuration());
+ when(mockJob.isComplete()).thenReturn(complete);
+ when(mockJob.isSuccessful()).thenReturn(successful);
+ return mockJob;
+ }
+
+ private ControlledJob createControlledJob(JobControl jobControl,
+ boolean successful, ControlledJob... dependingJobs)
+ throws IOException, InterruptedException {
+ List<ControlledJob> dependingJobsList = dependingJobs == null ? null :
+ Arrays.asList(dependingJobs);
+ ControlledJob job = new ControlledJob(createJob(true, successful),
+ dependingJobsList);
+ jobControl.addJob(job);
+ return job;
+ }
+
+ private ControlledJob createSuccessfulControlledJob(JobControl jobControl,
+ ControlledJob... dependingJobs) throws IOException, InterruptedException {
+ return createControlledJob(jobControl, true, dependingJobs);
+ }
+
+ private ControlledJob createFailedControlledJob(JobControl jobControl,
+ ControlledJob... dependingJobs) throws IOException, InterruptedException {
+ return createControlledJob(jobControl, false, dependingJobs);
+ }
+
+ private void runJobControl(JobControl jobControl) {
+ Thread controller = new Thread(jobControl);
+ controller.start();
+ waitTillAllFinished(jobControl);
+ }
+
+ private void waitTillAllFinished(JobControl jobControl) {
+ while (!jobControl.allFinished()) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+}
Propchange: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/jobcontrol/TestMapReduceJobControlWithMocks.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/output/TestMRSequenceFileAsBinaryOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/output/TestMRSequenceFileAsBinaryOutputFormat.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/output/TestMRSequenceFileAsBinaryOutputFormat.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/output/TestMRSequenceFileAsBinaryOutputFormat.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.mapred.InvalidJobConfException;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+
+import junit.framework.TestCase;
+import org.apache.commons.logging.*;
+
+public class TestMRSequenceFileAsBinaryOutputFormat extends TestCase {
+ private static final Log LOG =
+ LogFactory.getLog(TestMRSequenceFileAsBinaryOutputFormat.class.getName());
+
+ private static final int RECORDS = 10000;
+
+ public void testBinary() throws IOException, InterruptedException {
+ Configuration conf = new Configuration();
+ Job job = new Job(conf);
+
+ Path outdir = new Path(System.getProperty("test.build.data", "/tmp"),
+ "outseq");
+ Random r = new Random();
+ long seed = r.nextLong();
+ r.setSeed(seed);
+
+ FileOutputFormat.setOutputPath(job, outdir);
+
+ SequenceFileAsBinaryOutputFormat.setSequenceFileOutputKeyClass(job,
+ IntWritable.class );
+ SequenceFileAsBinaryOutputFormat.setSequenceFileOutputValueClass(job,
+ DoubleWritable.class );
+
+ SequenceFileAsBinaryOutputFormat.setCompressOutput(job, true);
+ SequenceFileAsBinaryOutputFormat.setOutputCompressionType(job,
+ CompressionType.BLOCK);
+
+ BytesWritable bkey = new BytesWritable();
+ BytesWritable bval = new BytesWritable();
+
+ TaskAttemptContext context =
+ MapReduceTestUtil.createDummyMapTaskAttemptContext(job.getConfiguration());
+ OutputFormat<BytesWritable, BytesWritable> outputFormat =
+ new SequenceFileAsBinaryOutputFormat();
+ OutputCommitter committer = outputFormat.getOutputCommitter(context);
+ committer.setupJob(job);
+ RecordWriter<BytesWritable, BytesWritable> writer = outputFormat.
+ getRecordWriter(context);
+
+ IntWritable iwritable = new IntWritable();
+ DoubleWritable dwritable = new DoubleWritable();
+ DataOutputBuffer outbuf = new DataOutputBuffer();
+ LOG.info("Creating data by SequenceFileAsBinaryOutputFormat");
+ try {
+ for (int i = 0; i < RECORDS; ++i) {
+ iwritable = new IntWritable(r.nextInt());
+ iwritable.write(outbuf);
+ bkey.set(outbuf.getData(), 0, outbuf.getLength());
+ outbuf.reset();
+ dwritable = new DoubleWritable(r.nextDouble());
+ dwritable.write(outbuf);
+ bval.set(outbuf.getData(), 0, outbuf.getLength());
+ outbuf.reset();
+ writer.write(bkey, bval);
+ }
+ } finally {
+ writer.close(context);
+ }
+ committer.commitTask(context);
+ committer.commitJob(job);
+
+ InputFormat<IntWritable, DoubleWritable> iformat =
+ new SequenceFileInputFormat<IntWritable, DoubleWritable>();
+ int count = 0;
+ r.setSeed(seed);
+ SequenceFileInputFormat.setInputPaths(job, outdir);
+ LOG.info("Reading data by SequenceFileInputFormat");
+ for (InputSplit split : iformat.getSplits(job)) {
+ RecordReader<IntWritable, DoubleWritable> reader =
+ iformat.createRecordReader(split, context);
+ MapContext<IntWritable, DoubleWritable, BytesWritable, BytesWritable>
+ mcontext = new MapContext<IntWritable, DoubleWritable,
+ BytesWritable, BytesWritable>(job.getConfiguration(),
+ context.getTaskAttemptID(), reader, null, null,
+ MapReduceTestUtil.createDummyReporter(),
+ split);
+ reader.initialize(split, mcontext);
+ try {
+ int sourceInt;
+ double sourceDouble;
+ while (reader.nextKeyValue()) {
+ sourceInt = r.nextInt();
+ sourceDouble = r.nextDouble();
+ iwritable = reader.getCurrentKey();
+ dwritable = reader.getCurrentValue();
+ assertEquals(
+ "Keys don't match: " + "*" + iwritable.get() + ":" +
+ sourceInt + "*",
+ sourceInt, iwritable.get());
+ assertTrue(
+ "Vals don't match: " + "*" + dwritable.get() + ":" +
+ sourceDouble + "*",
+ Double.compare(dwritable.get(), sourceDouble) == 0 );
+ ++count;
+ }
+ } finally {
+ reader.close();
+ }
+ }
+ assertEquals("Some records not found", RECORDS, count);
+ }
+
+ public void testSequenceOutputClassDefaultsToMapRedOutputClass()
+ throws IOException {
+ Job job = new Job();
+ // Setting Random class to test getSequenceFileOutput{Key,Value}Class
+ job.setOutputKeyClass(FloatWritable.class);
+ job.setOutputValueClass(BooleanWritable.class);
+
+ assertEquals("SequenceFileOutputKeyClass should default to ouputKeyClass",
+ FloatWritable.class,
+ SequenceFileAsBinaryOutputFormat.getSequenceFileOutputKeyClass(job));
+ assertEquals("SequenceFileOutputValueClass should default to "
+ + "ouputValueClass",
+ BooleanWritable.class,
+ SequenceFileAsBinaryOutputFormat.getSequenceFileOutputValueClass(job));
+
+ SequenceFileAsBinaryOutputFormat.setSequenceFileOutputKeyClass(job,
+ IntWritable.class );
+ SequenceFileAsBinaryOutputFormat.setSequenceFileOutputValueClass(job,
+ DoubleWritable.class );
+
+ assertEquals("SequenceFileOutputKeyClass not updated",
+ IntWritable.class,
+ SequenceFileAsBinaryOutputFormat.getSequenceFileOutputKeyClass(job));
+ assertEquals("SequenceFileOutputValueClass not updated",
+ DoubleWritable.class,
+ SequenceFileAsBinaryOutputFormat.getSequenceFileOutputValueClass(job));
+ }
+
+ public void testcheckOutputSpecsForbidRecordCompression()
+ throws IOException {
+ Job job = new Job();
+ FileSystem fs = FileSystem.getLocal(job.getConfiguration());
+ Path outputdir = new Path(System.getProperty("test.build.data", "/tmp")
+ + "/output");
+ fs.delete(outputdir, true);
+
+ // Without outputpath, FileOutputFormat.checkoutputspecs will throw
+ // InvalidJobConfException
+ FileOutputFormat.setOutputPath(job, outputdir);
+
+ // SequenceFileAsBinaryOutputFormat doesn't support record compression
+ // It should throw an exception when checked by checkOutputSpecs
+ SequenceFileAsBinaryOutputFormat.setCompressOutput(job, true);
+
+ SequenceFileAsBinaryOutputFormat.setOutputCompressionType(job,
+ CompressionType.BLOCK);
+ try {
+ new SequenceFileAsBinaryOutputFormat().checkOutputSpecs(job);
+ } catch (Exception e) {
+ fail("Block compression should be allowed for "
+ + "SequenceFileAsBinaryOutputFormat:Caught " + e.getClass().getName());
+ }
+
+ SequenceFileAsBinaryOutputFormat.setOutputCompressionType(job,
+ CompressionType.RECORD);
+ try {
+ new SequenceFileAsBinaryOutputFormat().checkOutputSpecs(job);
+ fail("Record compression should not be allowed for "
+ + "SequenceFileAsBinaryOutputFormat");
+ } catch (InvalidJobConfException ie) {
+ // expected
+ } catch (Exception e) {
+ fail("Expected " + InvalidJobConfException.class.getName()
+ + "but caught " + e.getClass().getName() );
+ }
+ }
+}
Propchange: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/output/TestMRSequenceFileAsBinaryOutputFormat.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/output/TestMultipleOutputs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/output/TestMultipleOutputs.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/output/TestMultipleOutputs.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/output/TestMultipleOutputs.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.output;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.HadoopTestCase;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+public class TestMultipleOutputs extends HadoopTestCase {
+
+ public TestMultipleOutputs() throws IOException {
+ super(HadoopTestCase.LOCAL_MR, HadoopTestCase.LOCAL_FS, 1, 1);
+ }
+
+ public void testWithoutCounters() throws Exception {
+ _testMultipleOutputs(false);
+ }
+
+ public void testWithCounters() throws Exception {
+ _testMultipleOutputs(true);
+ }
+
+ private static final Path ROOT_DIR = new Path("testing/mo");
+ private static final Path IN_DIR = new Path(ROOT_DIR, "input");
+ private static final Path OUT_DIR = new Path(ROOT_DIR, "output");
+
+ private Path getDir(Path dir) {
+ // Hack for local FS that does not have the concept of a 'mounting point'
+ if (isLocalFS()) {
+ String localPathRoot = System.getProperty("test.build.data", "/tmp")
+ .replace(' ', '+');
+ dir = new Path(localPathRoot, dir);
+ }
+ return dir;
+ }
+
+ public void setUp() throws Exception {
+ super.setUp();
+ Path rootDir = getDir(ROOT_DIR);
+ Path inDir = getDir(IN_DIR);
+
+ JobConf conf = createJobConf();
+ FileSystem fs = FileSystem.get(conf);
+ fs.delete(rootDir, true);
+ if (!fs.mkdirs(inDir)) {
+ throw new IOException("Mkdirs failed to create " + inDir.toString());
+ }
+ }
+
+ public void tearDown() throws Exception {
+ Path rootDir = getDir(ROOT_DIR);
+
+ JobConf conf = createJobConf();
+ FileSystem fs = FileSystem.get(conf);
+ fs.delete(rootDir, true);
+ super.tearDown();
+ }
+
+ protected void _testMultipleOutputs(boolean withCounters) throws Exception {
+ Path inDir = getDir(IN_DIR);
+ Path outDir = getDir(OUT_DIR);
+
+ JobConf conf = createJobConf();
+ FileSystem fs = FileSystem.get(conf);
+
+ DataOutputStream file = fs.create(new Path(inDir, "part-0"));
+ file.writeBytes("a\nb\n\nc\nd\ne");
+ file.close();
+
+ file = fs.create(new Path(inDir, "part-1"));
+ file.writeBytes("a\nb\n\nc\nd\ne");
+ file.close();
+
+ Job job = new Job(conf);
+ job.setJobName("mo");
+ job.setInputFormatClass(TextInputFormat.class);
+
+ job.setOutputKeyClass(LongWritable.class);
+ job.setOutputValueClass(Text.class);
+
+ job.setMapOutputKeyClass(LongWritable.class);
+ job.setMapOutputValueClass(Text.class);
+
+ job.setOutputFormatClass(TextOutputFormat.class);
+ job.setOutputKeyClass(LongWritable.class);
+ job.setOutputValueClass(Text.class);
+
+ MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class,
+ LongWritable.class, Text.class);
+
+ MultipleOutputs.setCountersEnabled(job, withCounters);
+
+ job.setMapperClass(MOMap.class);
+ job.setReducerClass(MOReduce.class);
+
+ FileInputFormat.setInputPaths(job, inDir);
+ FileOutputFormat.setOutputPath(job, outDir);
+
+ job.waitForCompletion(false);
+
+ // assert number of named output part files
+ int namedOutputCount = 0;
+ FileStatus[] statuses = fs.listStatus(outDir);
+
+ for (FileStatus status : statuses) {
+ if (status.getPath().getName().equals("text-m-00000") ||
+ status.getPath().getName().equals("text-m-00001") ||
+ status.getPath().getName().equals("text-r-00000")) {
+ namedOutputCount++;
+ }
+ }
+ assertEquals(3, namedOutputCount);
+
+ // assert TextOutputFormat files correctness
+ JobContext jobContext = new JobContext(job.getConfiguration(), new JobID());
+ BufferedReader reader = new BufferedReader(
+ new InputStreamReader(fs.open(
+ new Path(FileOutputFormat.getOutputPath(jobContext), "text-r-00000"))));
+ int count = 0;
+ String line = reader.readLine();
+ while (line != null) {
+ assertTrue(line.endsWith("text"));
+ line = reader.readLine();
+ count++;
+ }
+ reader.close();
+ assertFalse(count == 0);
+
+ CounterGroup counters =
+ job.getCounters().getGroup(MultipleOutputs.class.getName());
+ if (!withCounters) {
+ assertEquals(0, counters.size());
+ }
+ else {
+ assertEquals(1, counters.size());
+ assertEquals(4, counters.findCounter("text").getValue());
+ }
+
+ }
+
+ @SuppressWarnings({"unchecked"})
+ public static class MOMap extends Mapper<LongWritable, Text, LongWritable,
+ Text> {
+
+ private MultipleOutputs mos;
+
+ @Override
+ protected void setup(Context context) {
+ mos = new MultipleOutputs(context);
+ }
+
+ @Override
+ public void map(LongWritable key, Text value, Context context)
+ throws IOException, InterruptedException {
+ if (!value.toString().equals("a")) {
+ context.write(key, value);
+ } else {
+ mos.write("text", key, new Text("text"));
+ }
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ mos.close();
+ }
+ }
+
+ @SuppressWarnings({"unchecked"})
+ public static class MOReduce extends Reducer<LongWritable, Text,
+ LongWritable, Text> {
+
+ private MultipleOutputs mos;
+
+ @Override
+ protected void setup(Context context) {
+ mos = new MultipleOutputs(context);
+ }
+
+ @Override
+ public void reduce(LongWritable key, Iterable<Text> values, Context context)
+ throws IOException, InterruptedException {
+ for (Text value : values) {
+ if (!value.toString().equals("b")) {
+ context.write(key, value);
+ } else {
+ mos.write("text", key, new Text("text"));
+ }
+ }
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ mos.close();
+ }
+ }
+
+}
Propchange: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/output/TestMultipleOutputs.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/partition/TestBinaryPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/partition/TestBinaryPartitioner.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/partition/TestBinaryPartitioner.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/partition/TestBinaryPartitioner.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.partition;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BinaryComparable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import junit.framework.TestCase;
+
+public class TestBinaryPartitioner extends TestCase {
+
+ public void testDefaultOffsets() {
+ Configuration conf = new Configuration();
+ BinaryPartitioner<?> partitioner =
+ ReflectionUtils.newInstance(BinaryPartitioner.class, conf);
+
+ BinaryComparable key1 = new BytesWritable(new byte[] { 1, 2, 3, 4, 5 });
+ BinaryComparable key2 = new BytesWritable(new byte[] { 1, 2, 3, 4, 5 });
+ int partition1 = partitioner.getPartition(key1, null, 10);
+ int partition2 = partitioner.getPartition(key2, null, 10);
+ assertEquals(partition1, partition2);
+
+ key1 = new BytesWritable(new byte[] { 1, 2, 3, 4, 5 });
+ key2 = new BytesWritable(new byte[] { 6, 2, 3, 4, 5 });
+ partition1 = partitioner.getPartition(key1, null, 10);
+ partition2 = partitioner.getPartition(key2, null, 10);
+ assertTrue(partition1 != partition2);
+
+ key1 = new BytesWritable(new byte[] { 1, 2, 3, 4, 5 });
+ key2 = new BytesWritable(new byte[] { 1, 2, 3, 4, 6 });
+ partition1 = partitioner.getPartition(key1, null, 10);
+ partition2 = partitioner.getPartition(key2, null, 10);
+ assertTrue(partition1 != partition2);
+ }
+
+ public void testCustomOffsets() {
+ Configuration conf = new Configuration();
+ BinaryComparable key1 = new BytesWritable(new byte[] { 1, 2, 3, 4, 5 });
+ BinaryComparable key2 = new BytesWritable(new byte[] { 6, 2, 3, 7, 8 });
+
+ BinaryPartitioner.setOffsets(conf, 1, -3);
+ BinaryPartitioner<?> partitioner =
+ ReflectionUtils.newInstance(BinaryPartitioner.class, conf);
+ int partition1 = partitioner.getPartition(key1, null, 10);
+ int partition2 = partitioner.getPartition(key2, null, 10);
+ assertEquals(partition1, partition2);
+
+ BinaryPartitioner.setOffsets(conf, 1, 2);
+ partitioner = ReflectionUtils.newInstance(BinaryPartitioner.class, conf);
+ partition1 = partitioner.getPartition(key1, null, 10);
+ partition2 = partitioner.getPartition(key2, null, 10);
+ assertEquals(partition1, partition2);
+
+ BinaryPartitioner.setOffsets(conf, -4, -3);
+ partitioner = ReflectionUtils.newInstance(BinaryPartitioner.class, conf);
+ partition1 = partitioner.getPartition(key1, null, 10);
+ partition2 = partitioner.getPartition(key2, null, 10);
+ assertEquals(partition1, partition2);
+ }
+
+ public void testLowerBound() {
+ Configuration conf = new Configuration();
+ BinaryPartitioner.setLeftOffset(conf, 0);
+ BinaryPartitioner<?> partitioner =
+ ReflectionUtils.newInstance(BinaryPartitioner.class, conf);
+ BinaryComparable key1 = new BytesWritable(new byte[] { 1, 2, 3, 4, 5 });
+ BinaryComparable key2 = new BytesWritable(new byte[] { 6, 2, 3, 4, 5 });
+ int partition1 = partitioner.getPartition(key1, null, 10);
+ int partition2 = partitioner.getPartition(key2, null, 10);
+ assertTrue(partition1 != partition2);
+ }
+
+ public void testUpperBound() {
+ Configuration conf = new Configuration();
+ BinaryPartitioner.setRightOffset(conf, 4);
+ BinaryPartitioner<?> partitioner =
+ ReflectionUtils.newInstance(BinaryPartitioner.class, conf);
+ BinaryComparable key1 = new BytesWritable(new byte[] { 1, 2, 3, 4, 5 });
+ BinaryComparable key2 = new BytesWritable(new byte[] { 1, 2, 3, 4, 6 });
+ int partition1 = partitioner.getPartition(key1, null, 10);
+ int partition2 = partitioner.getPartition(key2, null, 10);
+ assertTrue(partition1 != partition2);
+ }
+
+}
Propchange: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/partition/TestBinaryPartitioner.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.partition;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+public class TestInputSampler {
+
+ static class SequentialSplit extends InputSplit {
+ private int i;
+ SequentialSplit(int i) {
+ this.i = i;
+ }
+ public long getLength() { return 0; }
+ public String[] getLocations() { return new String[0]; }
+ public int getInit() { return i; }
+ }
+
+ static class TestInputSamplerIF
+ extends InputFormat<IntWritable,NullWritable> {
+
+ final int maxDepth;
+ final ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
+
+ TestInputSamplerIF(int maxDepth, int numSplits, int... splitInit) {
+ this.maxDepth = maxDepth;
+ assert splitInit.length == numSplits;
+ for (int i = 0; i < numSplits; ++i) {
+ splits.add(new SequentialSplit(splitInit[i]));
+ }
+ }
+
+ public List<InputSplit> getSplits(JobContext context)
+ throws IOException, InterruptedException {
+ return splits;
+ }
+
+ public RecordReader<IntWritable,NullWritable> createRecordReader(
+ final InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new RecordReader<IntWritable,NullWritable>() {
+ private int maxVal;
+ private final IntWritable i = new IntWritable();
+ public void initialize(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ i.set(((SequentialSplit)split).getInit() - 1);
+ maxVal = i.get() + maxDepth + 1;
+ }
+ public boolean nextKeyValue() {
+ i.set(i.get() + 1);
+ return i.get() < maxVal;
+ }
+ public IntWritable getCurrentKey() { return i; }
+ public NullWritable getCurrentValue() { return NullWritable.get(); }
+ public float getProgress() { return 1.0f; }
+ public void close() { }
+ };
+ }
+
+ }
+
+ /**
+ * Verify SplitSampler contract, that an equal number of records are taken
+ * from the first splits.
+ */
+ @Test
+ @SuppressWarnings("unchecked") // IntWritable comparator not typesafe
+ public void testSplitSampler() throws Exception {
+ final int TOT_SPLITS = 15;
+ final int NUM_SPLITS = 5;
+ final int STEP_SAMPLE = 5;
+ final int NUM_SAMPLES = NUM_SPLITS * STEP_SAMPLE;
+ InputSampler.Sampler<IntWritable,NullWritable> sampler =
+ new InputSampler.SplitSampler<IntWritable,NullWritable>(
+ NUM_SAMPLES, NUM_SPLITS);
+ int inits[] = new int[TOT_SPLITS];
+ for (int i = 0; i < TOT_SPLITS; ++i) {
+ inits[i] = i * STEP_SAMPLE;
+ }
+ Job ignored = new Job();
+ Object[] samples = sampler.getSample(
+ new TestInputSamplerIF(100000, TOT_SPLITS, inits), ignored);
+ assertEquals(NUM_SAMPLES, samples.length);
+ Arrays.sort(samples, new IntWritable.Comparator());
+ for (int i = 0; i < NUM_SAMPLES; ++i) {
+ assertEquals(i, ((IntWritable)samples[i]).get());
+ }
+ }
+
+ /**
+ * Verify IntervalSampler contract, that samples are taken at regular
+ * intervals from the given splits.
+ */
+ @Test
+ @SuppressWarnings("unchecked") // IntWritable comparator not typesafe
+ public void testIntervalSampler() throws Exception {
+ final int TOT_SPLITS = 16;
+ final int PER_SPLIT_SAMPLE = 4;
+ final int NUM_SAMPLES = TOT_SPLITS * PER_SPLIT_SAMPLE;
+ final double FREQ = 1.0 / TOT_SPLITS;
+ InputSampler.Sampler<IntWritable,NullWritable> sampler =
+ new InputSampler.IntervalSampler<IntWritable,NullWritable>(
+ FREQ, NUM_SAMPLES);
+ int inits[] = new int[TOT_SPLITS];
+ for (int i = 0; i < TOT_SPLITS; ++i) {
+ inits[i] = i;
+ }
+ Job ignored = new Job();
+ Object[] samples = sampler.getSample(new TestInputSamplerIF(
+ NUM_SAMPLES, TOT_SPLITS, inits), ignored);
+ assertEquals(NUM_SAMPLES, samples.length);
+ Arrays.sort(samples, new IntWritable.Comparator());
+ for (int i = 0; i < NUM_SAMPLES; ++i) {
+ assertEquals(i, ((IntWritable)samples[i]).get());
+ }
+ }
+
+}
Propchange: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/partition/TestKeyFieldHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/partition/TestKeyFieldHelper.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/partition/TestKeyFieldHelper.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/partition/TestKeyFieldHelper.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,425 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.partition;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import junit.framework.TestCase;
+
+public class TestKeyFieldHelper extends TestCase {
+ private static final Log LOG = LogFactory.getLog(TestKeyFieldHelper.class);
+ /**
+ * Test is key-field-helper's parse option.
+ */
+ public void testparseOption() throws Exception {
+ KeyFieldHelper helper = new KeyFieldHelper();
+ helper.setKeyFieldSeparator("\t");
+ String keySpecs = "-k1.2,3.4";
+ String eKeySpecs = keySpecs;
+ helper.parseOption(keySpecs);
+ String actKeySpecs = helper.keySpecs().get(0).toString();
+ assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+
+ // test -k a.b
+ keySpecs = "-k 1.2";
+ eKeySpecs = "-k1.2,0.0";
+ helper = new KeyFieldHelper();
+ helper.parseOption(keySpecs);
+ actKeySpecs = helper.keySpecs().get(0).toString();
+ assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+
+ keySpecs = "-nr -k1.2,3.4";
+ eKeySpecs = "-k1.2,3.4nr";
+ helper = new KeyFieldHelper();
+ helper.parseOption(keySpecs);
+ actKeySpecs = helper.keySpecs().get(0).toString();
+ assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+
+ keySpecs = "-nr -k1.2,3.4n";
+ eKeySpecs = "-k1.2,3.4n";
+ helper = new KeyFieldHelper();
+ helper.parseOption(keySpecs);
+ actKeySpecs = helper.keySpecs().get(0).toString();
+ assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+
+ keySpecs = "-nr -k1.2,3.4r";
+ eKeySpecs = "-k1.2,3.4r";
+ helper = new KeyFieldHelper();
+ helper.parseOption(keySpecs);
+ actKeySpecs = helper.keySpecs().get(0).toString();
+ assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+
+ keySpecs = "-nr -k1.2,3.4 -k5.6,7.8n -k9.10,11.12r -k13.14,15.16nr";
+ //1st
+ eKeySpecs = "-k1.2,3.4nr";
+ helper = new KeyFieldHelper();
+ helper.parseOption(keySpecs);
+ actKeySpecs = helper.keySpecs().get(0).toString();
+ assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+ // 2nd
+ eKeySpecs = "-k5.6,7.8n";
+ actKeySpecs = helper.keySpecs().get(1).toString();
+ assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+ //3rd
+ eKeySpecs = "-k9.10,11.12r";
+ actKeySpecs = helper.keySpecs().get(2).toString();
+ assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+ //4th
+ eKeySpecs = "-k13.14,15.16nr";
+ actKeySpecs = helper.keySpecs().get(3).toString();
+ assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+
+ keySpecs = "-k1.2n,3.4";
+ eKeySpecs = "-k1.2,3.4n";
+ helper = new KeyFieldHelper();
+ helper.parseOption(keySpecs);
+ actKeySpecs = helper.keySpecs().get(0).toString();
+ assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+
+ keySpecs = "-k1.2r,3.4";
+ eKeySpecs = "-k1.2,3.4r";
+ helper = new KeyFieldHelper();
+ helper.parseOption(keySpecs);
+ actKeySpecs = helper.keySpecs().get(0).toString();
+ assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+
+ keySpecs = "-k1.2nr,3.4";
+ eKeySpecs = "-k1.2,3.4nr";
+ helper = new KeyFieldHelper();
+ helper.parseOption(keySpecs);
+ actKeySpecs = helper.keySpecs().get(0).toString();
+ assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+
+ keySpecs = "-k1.2,3.4n";
+ eKeySpecs = "-k1.2,3.4n";
+ helper = new KeyFieldHelper();
+ helper.parseOption(keySpecs);
+ actKeySpecs = helper.keySpecs().get(0).toString();
+ assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+
+ keySpecs = "-k1.2,3.4r";
+ eKeySpecs = "-k1.2,3.4r";
+ helper = new KeyFieldHelper();
+ helper.parseOption(keySpecs);
+ actKeySpecs = helper.keySpecs().get(0).toString();
+ assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+
+ keySpecs = "-k1.2,3.4nr";
+ eKeySpecs = "-k1.2,3.4nr";
+ helper = new KeyFieldHelper();
+ helper.parseOption(keySpecs);
+ actKeySpecs = helper.keySpecs().get(0).toString();
+ assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+
+ keySpecs = "-nr -k1.2,3.4 -k5.6,7.8";
+ eKeySpecs = "-k1.2,3.4nr";
+ helper = new KeyFieldHelper();
+ helper.parseOption(keySpecs);
+ actKeySpecs = helper.keySpecs().get(0).toString();
+ assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+ eKeySpecs = "-k5.6,7.8nr";
+ actKeySpecs = helper.keySpecs().get(1).toString();
+ assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+
+ keySpecs = "-n -k1.2,3.4 -k5.6,7.8";
+ eKeySpecs = "-k1.2,3.4n";
+ helper = new KeyFieldHelper();
+ helper.parseOption(keySpecs);
+ actKeySpecs = helper.keySpecs().get(0).toString();
+ assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+ eKeySpecs = "-k5.6,7.8n";
+ actKeySpecs = helper.keySpecs().get(1).toString();
+ assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+
+ keySpecs = "-r -k1.2,3.4 -k5.6,7.8";
+ eKeySpecs = "-k1.2,3.4r";
+ helper = new KeyFieldHelper();
+ helper.parseOption(keySpecs);
+ actKeySpecs = helper.keySpecs().get(0).toString();
+ assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+ eKeySpecs = "-k5.6,7.8r";
+ actKeySpecs = helper.keySpecs().get(1).toString();
+ assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+
+ keySpecs = "-k1.2,3.4n -k5.6,7.8";
+ eKeySpecs = "-k1.2,3.4n";
+ helper = new KeyFieldHelper();
+ helper.parseOption(keySpecs);
+ actKeySpecs = helper.keySpecs().get(0).toString();
+ assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+ eKeySpecs = "-k5.6,7.8";
+ actKeySpecs = helper.keySpecs().get(1).toString();
+ assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+
+ keySpecs = "-k1.2,3.4r -k5.6,7.8";
+ eKeySpecs = "-k1.2,3.4r";
+ helper = new KeyFieldHelper();
+ helper.parseOption(keySpecs);
+ actKeySpecs = helper.keySpecs().get(0).toString();
+ assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+ eKeySpecs = "-k5.6,7.8";
+ actKeySpecs = helper.keySpecs().get(1).toString();
+ assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+
+ keySpecs = "-k1.2,3.4nr -k5.6,7.8";
+ eKeySpecs = "-k1.2,3.4nr";
+ helper = new KeyFieldHelper();
+ helper.parseOption(keySpecs);
+ actKeySpecs = helper.keySpecs().get(0).toString();
+ assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+ eKeySpecs = "-k5.6,7.8";
+ actKeySpecs = helper.keySpecs().get(1).toString();
+ assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+
+ keySpecs = "-n";
+ eKeySpecs = "-k1.1,0.0n";
+ helper = new KeyFieldHelper();
+ helper.parseOption(keySpecs);
+ actKeySpecs = helper.keySpecs().get(0).toString();
+ assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+
+ keySpecs = "-r";
+ eKeySpecs = "-k1.1,0.0r";
+ helper = new KeyFieldHelper();
+ helper.parseOption(keySpecs);
+ actKeySpecs = helper.keySpecs().get(0).toString();
+ assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+
+ keySpecs = "-nr";
+ eKeySpecs = "-k1.1,0.0nr";
+ helper = new KeyFieldHelper();
+ helper.parseOption(keySpecs);
+ actKeySpecs = helper.keySpecs().get(0).toString();
+ assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+ }
+
+ /**
+ * Test is key-field-helper's getWordLengths.
+ */
+ public void testGetWordLengths() throws Exception {
+ KeyFieldHelper helper = new KeyFieldHelper();
+ helper.setKeyFieldSeparator("\t");
+ // test getWordLengths with unspecified key-specifications
+ String input = "hi";
+ int[] result = helper.getWordLengths(input.getBytes(), 0, 2);
+ assertTrue(equals(result, new int[] {1}));
+
+ // set the key specs
+ helper.setKeyFieldSpec(1, 2);
+
+ // test getWordLengths with 3 words
+ input = "hi\thello there";
+ result = helper.getWordLengths(input.getBytes(), 0, input.length());
+ assertTrue(equals(result, new int[] {2, 2, 11}));
+
+ // test getWordLengths with 4 words but with a different separator
+ helper.setKeyFieldSeparator(" ");
+ input = "hi hello\tthere you";
+ result = helper.getWordLengths(input.getBytes(), 0, input.length());
+ assertTrue(equals(result, new int[] {3, 2, 11, 3}));
+
+ // test with non zero start index
+ input = "hi hello there you where me there";
+ // .....................
+ result = helper.getWordLengths(input.getBytes(), 10, 33);
+ assertTrue(equals(result, new int[] {5, 4, 3, 5, 2, 3}));
+
+ input = "hi hello there you where me ";
+ // ..................
+ result = helper.getWordLengths(input.getBytes(), 10, input.length());
+ assertTrue(equals(result, new int[] {5, 4, 3, 5, 2, 0}));
+
+ input = "";
+ result = helper.getWordLengths(input.getBytes(), 0, 0);
+ assertTrue(equals(result, new int[] {1, 0}));
+
+ input = " abc";
+ result = helper.getWordLengths(input.getBytes(), 0, 5);
+ assertTrue(equals(result, new int[] {3, 0, 0, 3}));
+
+ input = " abc";
+ result = helper.getWordLengths(input.getBytes(), 0, 2);
+ assertTrue(equals(result, new int[] {3, 0, 0, 0}));
+
+ input = " abc ";
+ result = helper.getWordLengths(input.getBytes(), 0, 2);
+ assertTrue(equals(result, new int[] {2, 0, 1}));
+
+ helper.setKeyFieldSeparator("abcd");
+ input = "abc";
+ result = helper.getWordLengths(input.getBytes(), 0, 3);
+ assertTrue(equals(result, new int[] {1, 3}));
+ }
+
+ /**
+ * Test is key-field-helper's getStartOffset/getEndOffset.
+ */
+ public void testgetStartEndOffset() throws Exception {
+ KeyFieldHelper helper = new KeyFieldHelper();
+ helper.setKeyFieldSeparator("\t");
+ // test getStartOffset with -k1,2
+ helper.setKeyFieldSpec(1, 2);
+ String input = "hi\thello";
+ String expectedOutput = input;
+ testKeySpecs(input, expectedOutput, helper);
+
+ // test getStartOffset with -k1.0,0 .. should result into start = -1
+ helper = new KeyFieldHelper();
+ helper.setKeyFieldSeparator("\t");
+ helper.parseOption("-k1.0,0");
+ testKeySpecs(input, null, helper);
+
+ // test getStartOffset with -k1,0
+ helper = new KeyFieldHelper();
+ helper.setKeyFieldSeparator("\t");
+ helper.parseOption("-k1,0");
+ expectedOutput = input;
+ testKeySpecs(input, expectedOutput, helper);
+
+ // test getStartOffset with -k1.2,0
+ helper = new KeyFieldHelper();
+ helper.setKeyFieldSeparator("\t");
+ helper.parseOption("-k1.2,0");
+ expectedOutput = "i\thello";
+ testKeySpecs(input, expectedOutput, helper);
+
+ // test getWordLengths with -k1.0,2.3
+ helper = new KeyFieldHelper();
+ helper.setKeyFieldSeparator("\t");
+ helper.parseOption("-k1.1,2.3");
+ expectedOutput = "hi\thel";
+ testKeySpecs(input, expectedOutput, helper);
+
+ // test getWordLengths with -k1.2,2.3
+ helper = new KeyFieldHelper();
+ helper.setKeyFieldSeparator("\t");
+ helper.parseOption("-k1.2,2.3");
+ expectedOutput = "i\thel";
+ testKeySpecs(input, expectedOutput, helper);
+
+ // test getStartOffset with -k1.2,3.0
+ helper = new KeyFieldHelper();
+ helper.setKeyFieldSeparator("\t");
+ helper.parseOption("-k1.2,3.0");
+ expectedOutput = "i\thello";
+ testKeySpecs(input, expectedOutput, helper);
+
+ // test getStartOffset with -k2,2
+ helper = new KeyFieldHelper();
+ helper.setKeyFieldSeparator("\t");
+ helper.parseOption("-k2,2");
+ expectedOutput = "hello";
+ testKeySpecs(input, expectedOutput, helper);
+
+ // test getStartOffset with -k3.0,4.0
+ helper = new KeyFieldHelper();
+ helper.setKeyFieldSeparator("\t");
+ helper.parseOption("-k3.1,4.0");
+ testKeySpecs(input, null, helper);
+
+ // test getStartOffset with -k2.1
+ helper = new KeyFieldHelper();
+ input = "123123123123123hi\thello\thow";
+ helper.setKeyFieldSeparator("\t");
+ helper.parseOption("-k2.1");
+ expectedOutput = "hello\thow";
+ testKeySpecs(input, expectedOutput, helper, 15, input.length());
+
+ // test getStartOffset with -k2.1,4 with end ending on \t
+ helper = new KeyFieldHelper();
+ input = "123123123123123hi\thello\t\thow\tare";
+ helper.setKeyFieldSeparator("\t");
+ helper.parseOption("-k2.1,3");
+ expectedOutput = "hello\t";
+ testKeySpecs(input, expectedOutput, helper, 17, input.length());
+
+ // test getStartOffset with -k2.1 with end ending on \t
+ helper = new KeyFieldHelper();
+ input = "123123123123123hi\thello\thow\tare";
+ helper.setKeyFieldSeparator("\t");
+ helper.parseOption("-k2.1");
+ expectedOutput = "hello\thow\t";
+ testKeySpecs(input, expectedOutput, helper, 17, 28);
+
+ // test getStartOffset with -k2.1,3 with smaller length
+ helper = new KeyFieldHelper();
+ input = "123123123123123hi\thello\thow";
+ helper.setKeyFieldSeparator("\t");
+ helper.parseOption("-k2.1,3");
+ expectedOutput = "hello";
+ testKeySpecs(input, expectedOutput, helper, 15, 23);
+ }
+
+ private void testKeySpecs(String input, String expectedOutput,
+ KeyFieldHelper helper) {
+ testKeySpecs(input, expectedOutput, helper, 0, -1);
+ }
+
+ private void testKeySpecs(String input, String expectedOutput,
+ KeyFieldHelper helper, int s1, int e1) {
+ LOG.info("input : " + input);
+ String keySpecs = helper.keySpecs().get(0).toString();
+ LOG.info("keyspecs : " + keySpecs);
+ byte[] inputBytes = input.getBytes(); // get the input bytes
+ if (e1 == -1) {
+ e1 = inputBytes.length;
+ }
+ LOG.info("length : " + e1);
+ // get the word lengths
+ int[] indices = helper.getWordLengths(inputBytes, s1, e1);
+ // get the start index
+ int start = helper.getStartOffset(inputBytes, s1, e1, indices,
+ helper.keySpecs().get(0));
+ LOG.info("start : " + start);
+ if (expectedOutput == null) {
+ assertEquals("Expected -1 when the start index is invalid", -1, start);
+ return;
+ }
+ // get the end index
+ int end = helper.getEndOffset(inputBytes, s1, e1, indices,
+ helper.keySpecs().get(0));
+ LOG.info("end : " + end);
+ //my fix
+ end = (end >= inputBytes.length) ? inputBytes.length -1 : end;
+ int length = end + 1 - start;
+ LOG.info("length : " + length);
+ byte[] outputBytes = new byte[length];
+ System.arraycopy(inputBytes, start, outputBytes, 0, length);
+ String output = new String(outputBytes);
+ LOG.info("output : " + output);
+ LOG.info("expected-output : " + expectedOutput);
+ assertEquals(keySpecs + " failed on input '" + input + "'",
+ expectedOutput, output);
+ }
+
+ // check for equality of 2 int arrays
+ private boolean equals(int[] test, int[] expected) {
+ // check array length
+ if (test[0] != expected[0]) {
+ return false;
+ }
+ // if length is same then check the contents
+ for (int i = 0; i < test[0] && i < expected[0]; ++i) {
+ if (test[i] != expected[i]) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
\ No newline at end of file
Propchange: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/partition/TestKeyFieldHelper.java
------------------------------------------------------------------------------
svn:eol-style = native