You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/04/23 22:41:26 UTC

[24/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/fn/AggregatorsTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/fn/AggregatorsTest.java b/crunch-core/src/test/java/org/apache/crunch/fn/AggregatorsTest.java
new file mode 100644
index 0000000..6ee1972
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/fn/AggregatorsTest.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import static org.apache.crunch.fn.Aggregators.MAX_BIGINTS;
+import static org.apache.crunch.fn.Aggregators.MAX_DOUBLES;
+import static org.apache.crunch.fn.Aggregators.MAX_FLOATS;
+import static org.apache.crunch.fn.Aggregators.MAX_INTS;
+import static org.apache.crunch.fn.Aggregators.MAX_LONGS;
+import static org.apache.crunch.fn.Aggregators.MAX_N;
+import static org.apache.crunch.fn.Aggregators.MIN_BIGINTS;
+import static org.apache.crunch.fn.Aggregators.MIN_DOUBLES;
+import static org.apache.crunch.fn.Aggregators.MIN_FLOATS;
+import static org.apache.crunch.fn.Aggregators.MIN_INTS;
+import static org.apache.crunch.fn.Aggregators.MIN_LONGS;
+import static org.apache.crunch.fn.Aggregators.MIN_N;
+import static org.apache.crunch.fn.Aggregators.STRING_CONCAT;
+import static org.apache.crunch.fn.Aggregators.SUM_BIGINTS;
+import static org.apache.crunch.fn.Aggregators.SUM_DOUBLES;
+import static org.apache.crunch.fn.Aggregators.SUM_FLOATS;
+import static org.apache.crunch.fn.Aggregators.SUM_INTS;
+import static org.apache.crunch.fn.Aggregators.SUM_LONGS;
+import static org.hamcrest.Matchers.closeTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import java.math.BigInteger;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.crunch.Aggregator;
+import org.apache.crunch.CombineFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.Tuple4;
+import org.apache.crunch.TupleN;
+import org.apache.crunch.impl.mem.emit.InMemoryEmitter;
+import org.junit.Test;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+
+public class AggregatorsTest {
+
+  @Test
+  public void testSums2() {
+    assertThat(sapply(SUM_INTS(), 1, 2, 3, -4), is(2));
+    assertThat(sapply(SUM_LONGS(), 1L, 2L, 3L, -4L, 5000000000L), is(5000000002L));
+    assertThat(sapply(SUM_FLOATS(), 1f, 2f, 3f, -4f), is(2f));
+    assertThat(sapply(SUM_DOUBLES(), 0.1, 0.2, 0.3), is(closeTo(0.6, 0.00001)));
+    assertThat(sapply(SUM_BIGINTS(), bigInt("7"), bigInt("3")), is(bigInt("10")));
+  }
+
+  @Test
+  public void testSums() {
+    assertThat(sapply(SUM_LONGS(), 29L, 17L, 1729L), is(1775L));
+    assertThat(sapply(SUM_LONGS(), 29L, 7L, 1729L), is(1765L));
+    assertThat(sapply(SUM_INTS(), 29, 17, 1729), is(1775));
+    assertThat(sapply(SUM_FLOATS(), 29f, 17f, 1729f), is(1775.0f));
+    assertThat(sapply(SUM_DOUBLES(), 29.0, 17.0, 1729.0), is(1775.0));
+    assertThat(sapply(SUM_BIGINTS(), bigInt("29"), bigInt("17"), bigInt("1729")), is(bigInt("1775")));
+  }
+
+  @Test
+  public void testMax() {
+    assertThat(sapply(MAX_LONGS(), 29L, 17L, 1729L), is(1729L));
+    assertThat(sapply(MAX_INTS(), 29, 17, 1729), is(1729));
+    assertThat(sapply(MAX_FLOATS(), 29f, 17f, 1729f), is(1729.0f));
+    assertThat(sapply(MAX_DOUBLES(), 29.0, 17.0, 1729.0), is(1729.0));
+    assertThat(sapply(MAX_FLOATS(), 29f, 1745f, 17f, 1729f), is(1745.0f));
+    assertThat(sapply(MAX_BIGINTS(), bigInt("29"), bigInt("17"), bigInt("1729")), is(bigInt("1729")));
+  }
+
+  @Test
+  public void testMin() {
+    assertThat(sapply(MIN_LONGS(), 29L, 17L, 1729L), is(17L));
+    assertThat(sapply(MIN_INTS(), 29, 17, 1729), is(17));
+    assertThat(sapply(MIN_FLOATS(), 29f, 17f, 1729f), is(17.0f));
+    assertThat(sapply(MIN_DOUBLES(), 29.0, 17.0, 1729.0), is(17.0));
+    assertThat(sapply(MIN_INTS(), 29, 170, 1729), is(29));
+    assertThat(sapply(MIN_BIGINTS(), bigInt("29"), bigInt("17"), bigInt("1729")), is(bigInt("17")));
+  }
+
+  @Test
+  public void testMaxN() {
+    assertThat(apply(MAX_INTS(2), 17, 34, 98, 29, 1009), is(ImmutableList.of(98, 1009)));
+    assertThat(apply(MAX_N(1, String.class), "b", "a"), is(ImmutableList.of("b")));
+    assertThat(apply(MAX_N(3, String.class), "b", "a", "d", "c"), is(ImmutableList.of("b", "c", "d")));
+  }
+
+  @Test
+  public void testMinN() {
+    assertThat(apply(MIN_INTS(2), 17, 34, 98, 29, 1009), is(ImmutableList.of(17, 29)));
+    assertThat(apply(MIN_N(1, String.class), "b", "a"), is(ImmutableList.of("a")));
+    assertThat(apply(MIN_N(3, String.class), "b", "a", "d", "c"), is(ImmutableList.of("a", "b", "c")));
+  }
+
+  @Test
+  public void testFirstN() {
+    assertThat(apply(Aggregators.<Integer>FIRST_N(2), 17, 34, 98, 29, 1009), is(ImmutableList.of(17, 34)));
+  }
+
+  @Test
+  public void testLastN() {
+    assertThat(apply(Aggregators.<Integer>LAST_N(2), 17, 34, 98, 29, 1009), is(ImmutableList.of(29, 1009)));
+  }
+  
+  @Test
+  public void testUniqueElements() {
+    assertThat(ImmutableSet.copyOf(apply(Aggregators.<Integer>UNIQUE_ELEMENTS(), 17, 29, 29, 16, 17)),
+        is(ImmutableSet.of(17, 29, 16)));
+    
+    Iterable<Integer> samp = apply(Aggregators.<Integer>SAMPLE_UNIQUE_ELEMENTS(2), 17, 29, 16, 17, 29, 16);
+    assertThat(Iterables.size(samp), is(2));
+    assertThat(ImmutableSet.copyOf(samp).size(), is(2)); // check that the two elements are unique
+  }
+  
+  @Test
+  public void testPairs() {
+    List<Pair<Long, Double>> input = ImmutableList.of(Pair.of(1720L, 17.29), Pair.of(9L, -3.14));
+    Aggregator<Pair<Long, Double>> a = Aggregators.pairAggregator(SUM_LONGS(), MIN_DOUBLES());
+
+    assertThat(sapply(a, input), is(Pair.of(1729L, -3.14)));
+  }
+
+  @Test
+  public void testPairsTwoLongs() {
+    List<Pair<Long, Long>> input = ImmutableList.of(Pair.of(1720L, 1L), Pair.of(9L, 19L));
+    Aggregator<Pair<Long, Long>> a = Aggregators.pairAggregator(SUM_LONGS(), SUM_LONGS());
+
+    assertThat(sapply(a, input), is(Pair.of(1729L, 20L)));
+  }
+
+  @Test
+  public void testTrips() {
+    List<Tuple3<Float, Double, Double>> input = ImmutableList.of(Tuple3.of(17.29f, 12.2, 0.1),
+        Tuple3.of(3.0f, 1.2, 3.14), Tuple3.of(-1.0f, 14.5, -0.98));
+    Aggregator<Tuple3<Float, Double, Double>> a = Aggregators.tripAggregator(
+        MAX_FLOATS(), MAX_DOUBLES(), MIN_DOUBLES());
+
+    assertThat(sapply(a, input), is(Tuple3.of(17.29f, 14.5, -0.98)));
+  }
+
+  @Test
+  public void testQuads() {
+    List<Tuple4<Float, Double, Double, Integer>> input = ImmutableList.of(Tuple4.of(17.29f, 12.2, 0.1, 1),
+        Tuple4.of(3.0f, 1.2, 3.14, 2), Tuple4.of(-1.0f, 14.5, -0.98, 3));
+    Aggregator<Tuple4<Float, Double, Double, Integer>> a = Aggregators.quadAggregator(
+        MAX_FLOATS(), MAX_DOUBLES(), MIN_DOUBLES(), SUM_INTS());
+
+    assertThat(sapply(a, input), is(Tuple4.of(17.29f, 14.5, -0.98, 6)));
+  }
+
+  @Test
+  public void testTupleN() {
+    List<TupleN> input = ImmutableList.of(new TupleN(1, 3.0, 1, 2.0, 4L), new TupleN(4, 17.0, 1, 9.7, 12L));
+    Aggregator<TupleN> a = Aggregators.tupleAggregator(
+        MIN_INTS(), SUM_DOUBLES(), MAX_INTS(), MIN_DOUBLES(), MAX_LONGS());
+
+    assertThat(sapply(a, input), is(new TupleN(1, 20.0, 1, 2.0, 12L)));
+  }
+
+  @Test
+  public void testConcatenation() {
+    assertThat(sapply(STRING_CONCAT("", true), "foo", "foobar", "bar"), is("foofoobarbar"));
+    assertThat(sapply(STRING_CONCAT("/", false), "foo", "foobar", "bar"), is("foo/foobar/bar"));
+    assertThat(sapply(STRING_CONCAT(" ", true), " ", ""), is("  "));
+    assertThat(sapply(STRING_CONCAT(" ", true), Arrays.asList(null, "")), is(""));
+    assertThat(sapply(STRING_CONCAT(" ", true, 20, 3), "foo", "foobar", "bar"), is("foo bar"));
+    assertThat(sapply(STRING_CONCAT(" ", true, 10, 6), "foo", "foobar", "bar"), is("foo foobar"));
+    assertThat(sapply(STRING_CONCAT(" ", true, 9, 6), "foo", "foobar", "bar"), is("foo bar"));
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testConcatenationNullException() {
+    sapply(STRING_CONCAT(" ", false), Arrays.asList(null, "" ));
+  }
+
+
+  private static <T> T sapply(Aggregator<T> a, T... values) {
+    return sapply(a, ImmutableList.copyOf(values));
+  }
+
+  private static <T> T sapply(Aggregator<T> a, Iterable<T> values) {
+    return Iterables.getOnlyElement(apply(a, values));
+  }
+
+  private static <T> ImmutableList<T> apply(Aggregator<T> a, T... values) {
+    return apply(a, ImmutableList.copyOf(values));
+  }
+
+  private static <T> ImmutableList<T> apply(Aggregator<T> a, Iterable<T> values) {
+    CombineFn<String, T> fn = Aggregators.toCombineFn(a);
+
+    InMemoryEmitter<Pair<String, T>> e1 = new InMemoryEmitter<Pair<String,T>>();
+    fn.process(Pair.of("", values), e1);
+
+    // and a second time to make sure Aggregator.reset() works
+    InMemoryEmitter<Pair<String, T>> e2 = new InMemoryEmitter<Pair<String,T>>();
+    fn.process(Pair.of("", values), e2);
+
+    assertEquals(getValues(e1), getValues(e2));
+
+    return getValues(e1);
+  }
+
+  private static <K, V> ImmutableList<V> getValues(InMemoryEmitter<Pair<K, V>> emitter) {
+    return ImmutableList.copyOf(
+        Iterables.transform(emitter.getOutput(), new Function<Pair<K, V>, V>() {
+      @Override
+      public V apply(Pair<K, V> input) {
+        return input.second();
+      }
+    }));
+  }
+
+  private static BigInteger bigInt(String value) {
+    return new BigInteger(value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/fn/ExtractKeyFnTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/fn/ExtractKeyFnTest.java b/crunch-core/src/test/java/org/apache/crunch/fn/ExtractKeyFnTest.java
new file mode 100644
index 0000000..b5b2a1b
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/fn/ExtractKeyFnTest.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class ExtractKeyFnTest {
+
+  protected static final MapFn<String, Integer> mapFn = new MapFn<String, Integer>() {
+    @Override
+    public Integer map(String input) {
+      return input.hashCode();
+    }
+  };
+
+  protected static final ExtractKeyFn<Integer, String> one = new ExtractKeyFn<Integer, String>(mapFn);
+
+  @Test
+  public void test() {
+    StoreLastEmitter<Pair<Integer, String>> emitter = StoreLastEmitter.create();
+    one.process("boza", emitter);
+    assertEquals(Pair.of("boza".hashCode(), "boza"), emitter.getLast());
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/fn/FilterFnTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/fn/FilterFnTest.java b/crunch-core/src/test/java/org/apache/crunch/fn/FilterFnTest.java
new file mode 100644
index 0000000..a649f99
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/fn/FilterFnTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import org.apache.crunch.FilterFn;
+import org.junit.Test;
+
+import com.google.common.base.Predicates;
+
+
+public class FilterFnTest {
+
+  private static final FilterFn<String> TRUE = FilterFns.<String>ACCEPT_ALL();
+  private static final FilterFn<String> FALSE = FilterFns.<String>REJECT_ALL();
+
+  @Test
+  public void testAcceptAll() {
+    assertThat(TRUE.accept(""), is(true));
+    assertThat(TRUE.accept("foo"), is(true));
+  }
+
+  @Test
+  public void testRejectAll() {
+    assertThat(FALSE.accept(""), is(false));
+    assertThat(FALSE.accept("foo"), is(false));
+
+    Predicates.or(Predicates.alwaysFalse(), Predicates.alwaysTrue());
+  }
+
+  @Test
+  public void testAnd() {
+    assertThat(FilterFns.and(TRUE, TRUE).accept("foo"), is(true));
+    assertThat(FilterFns.and(TRUE, FALSE).accept("foo"), is(false));
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testGeneric() {
+    assertThat(FilterFns.and(TRUE).accept("foo"), is(true));
+    assertThat(FilterFns.and(FALSE).accept("foo"), is(false));
+    assertThat(FilterFns.and(FALSE, FALSE, FALSE).accept("foo"), is(false));
+    assertThat(FilterFns.and(TRUE, TRUE, FALSE).accept("foo"), is(false));
+    assertThat(FilterFns.and(FALSE, FALSE, FALSE, FALSE).accept("foo"), is(false));
+  }
+
+  @Test
+  public void testOr() {
+    assertThat(FilterFns.or(FALSE, TRUE).accept("foo"), is(true));
+    assertThat(FilterFns.or(TRUE, FALSE).accept("foo"), is(true));
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testOrGeneric() {
+    assertThat(FilterFns.or(TRUE).accept("foo"), is(true));
+    assertThat(FilterFns.or(FALSE).accept("foo"), is(false));
+    assertThat(FilterFns.or(TRUE, FALSE, TRUE).accept("foo"), is(true));
+    assertThat(FilterFns.or(FALSE, FALSE, TRUE).accept("foo"), is(true));
+    assertThat(FilterFns.or(FALSE, FALSE, FALSE).accept("foo"), is(false));
+  }
+
+  @Test
+  public void testNot() {
+    assertThat(FilterFns.not(TRUE).accept("foo"), is(false));
+    assertThat(FilterFns.not(FALSE).accept("foo"), is(true));
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/fn/MapKeysTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/fn/MapKeysTest.java b/crunch-core/src/test/java/org/apache/crunch/fn/MapKeysTest.java
new file mode 100644
index 0000000..6b73700
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/fn/MapKeysTest.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.crunch.Pair;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class MapKeysTest {
+
+  protected static final MapKeysFn<String, Integer, Integer> one = new MapKeysFn<String, Integer, Integer>() {
+    @Override
+    public Integer map(String input) {
+      return 1;
+    }
+  };
+
+  protected static final MapKeysFn<String, Integer, Integer> two = new MapKeysFn<String, Integer, Integer>() {
+    @Override
+    public Integer map(String input) {
+      return 2;
+    }
+  };
+
+  @Test
+  public void test() {
+    StoreLastEmitter<Pair<Integer, Integer>> emitter = StoreLastEmitter.create();
+    one.process(Pair.of("k", Integer.MAX_VALUE), emitter);
+    assertEquals(Pair.of(1, Integer.MAX_VALUE), emitter.getLast());
+    two.process(Pair.of("k", Integer.MAX_VALUE), emitter);
+    assertEquals(Pair.of(2, Integer.MAX_VALUE), emitter.getLast());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/fn/MapValuesTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/fn/MapValuesTest.java b/crunch-core/src/test/java/org/apache/crunch/fn/MapValuesTest.java
new file mode 100644
index 0000000..097b008
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/fn/MapValuesTest.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.crunch.Pair;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class MapValuesTest {
+
+  static final MapValuesFn<String, String, Integer> one = new MapValuesFn<String, String, Integer>() {
+    @Override
+    public Integer map(String input) {
+      return 1;
+    }
+  };
+
+  static final MapValuesFn<String, String, Integer> two = new MapValuesFn<String, String, Integer>() {
+    @Override
+    public Integer map(String input) {
+      return 2;
+    }
+  };
+
+  @Test
+  public void test() {
+    StoreLastEmitter<Pair<String, Integer>> emitter = StoreLastEmitter.create();
+    one.process(Pair.of("k", "v"), emitter);
+    assertEquals(Pair.of("k", 1), emitter.getLast());
+    two.process(Pair.of("k", "v"), emitter);
+    assertEquals(Pair.of("k", 2), emitter.getLast());
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/fn/PairMapTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/fn/PairMapTest.java b/crunch-core/src/test/java/org/apache/crunch/fn/PairMapTest.java
new file mode 100644
index 0000000..bef6c85
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/fn/PairMapTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class PairMapTest {
+
+  static final MapFn<String, Integer> one = new MapFn<String, Integer>() {
+    @Override
+    public Integer map(String input) {
+      return 1;
+    }
+  };
+
+  static final MapFn<String, Integer> two = new MapFn<String, Integer>() {
+    @Override
+    public Integer map(String input) {
+      return 2;
+    }
+  };
+
+  @Test
+  public void testPairMap() {
+    StoreLastEmitter<Pair<Integer, Integer>> emitter = StoreLastEmitter.create();
+    PairMapFn<String, String, Integer, Integer> fn = new PairMapFn<String, String, Integer, Integer>(one, two);
+    fn.process(Pair.of("a", "b"), emitter);
+    Pair<Integer, Integer> pair = emitter.getLast();
+    assertTrue(pair.first() == 1);
+    assertTrue(pair.second() == 2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/fn/StoreLastEmitter.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/fn/StoreLastEmitter.java b/crunch-core/src/test/java/org/apache/crunch/fn/StoreLastEmitter.java
new file mode 100644
index 0000000..cdd8754
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/fn/StoreLastEmitter.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import org.apache.crunch.Emitter;
+
+class StoreLastEmitter<T> implements Emitter<T> {
+  private T last;
+
+  @Override
+  public void emit(T emitted) {
+    last = emitted;
+  }
+
+  public T getLast() {
+    return last;
+  }
+
+  @Override
+  public void flush() {
+  }
+
+  public static <T> StoreLastEmitter<T> create() {
+    return new StoreLastEmitter<T>();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/impl/SingleUseIterableTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/SingleUseIterableTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/SingleUseIterableTest.java
new file mode 100644
index 0000000..811a0a3
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/impl/SingleUseIterableTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class SingleUseIterableTest {
+
+  @Test
+  public void testIterator() {
+    List<Integer> values = Lists.newArrayList(1,2,3);
+    
+    SingleUseIterable<Integer> iterable = new SingleUseIterable<Integer>(values);
+
+    List<Integer> retrievedValues = Lists.newArrayList(iterable);
+    
+    assertEquals(values, retrievedValues);
+  }
+  
+  @Test(expected=IllegalStateException.class)
+  public void testIterator_MultipleCalls() {
+    List<Integer> values = Lists.newArrayList(1,2,3);
+    
+    SingleUseIterable<Integer> iterable = new SingleUseIterable<Integer>(values);
+
+    List<Integer> retrievedValues = Lists.newArrayList(iterable);
+
+    for (Integer n : iterable) {
+      
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/impl/mr/MRPipelineTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/MRPipelineTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mr/MRPipelineTest.java
new file mode 100644
index 0000000..9ed7a46
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/impl/mr/MRPipelineTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.impl.mr.collect.PCollectionImpl;
+import org.apache.crunch.impl.mr.run.RuntimeParameters;
+import org.apache.crunch.io.ReadableSourceTarget;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+
+@RunWith(MockitoJUnitRunner.class)
+public class MRPipelineTest {
+  @Rule
+  public TemporaryFolder tempDir = new TemporaryFolder();
+  @Mock
+  private PCollectionImpl<String> pcollection;
+  @Mock
+  private ReadableSourceTarget<String> readableSourceTarget;
+  @Mock
+  private SourceTarget<String> nonReadableSourceTarget;
+  private MRPipeline pipeline;
+
+  @Before
+  public void setUp() throws IOException {
+    Configuration conf = new Configuration();
+    conf.set(RuntimeParameters.TMP_DIR, tempDir.getRoot().getAbsolutePath());
+    pipeline = spy(new MRPipeline(MRPipelineTest.class, conf));
+  }
+
+  @Test
+  public void testGetMaterializeSourceTarget_AlreadyMaterialized() {
+    when(pcollection.getMaterializedAt()).thenReturn(readableSourceTarget);
+
+    assertEquals(readableSourceTarget, pipeline.getMaterializeSourceTarget(pcollection));
+  }
+
+  @Test
+  public void testGetMaterializeSourceTarget_NotMaterialized_HasOutput() {
+    when(pcollection.getPType()).thenReturn(Avros.strings());
+    doReturn(readableSourceTarget).when(pipeline).createIntermediateOutput(Avros.strings());
+    when(pcollection.getMaterializedAt()).thenReturn(null);
+
+    assertEquals(readableSourceTarget, pipeline.getMaterializeSourceTarget(pcollection));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testGetMaterializeSourceTarget_NotMaterialized_NotReadableSourceTarget() {
+    when(pcollection.getPType()).thenReturn(Avros.strings());
+    doReturn(nonReadableSourceTarget).when(pipeline).createIntermediateOutput(Avros.strings());
+    when(pcollection.getMaterializedAt()).thenReturn(null);
+
+    pipeline.getMaterializeSourceTarget(pcollection);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoCollectionImplTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoCollectionImplTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoCollectionImplTest.java
new file mode 100644
index 0000000..fd582bc
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoCollectionImplTest.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.collect;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.impl.mr.plan.DoNode;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.writable.Writables;
+import org.junit.Test;
+
+public class DoCollectionImplTest {
+
+  @Test
+  public void testGetSizeInternal_NoScaleFactor() {
+    runScaleTest(100L, 1.0f, 100L);
+  }
+
+  @Test
+  public void testGetSizeInternal_ScaleFactorBelowZero() {
+    runScaleTest(100L, 0.5f, 50L);
+  }
+
+  @Test
+  public void testGetSizeInternal_ScaleFactorAboveZero() {
+    runScaleTest(100L, 1.5f, 150L);
+  }
+
+  private void runScaleTest(long inputSize, float scaleFactor, long expectedScaledSize) {
+    PCollectionImpl<String> parentCollection = new SizedPCollectionImpl("Sized collection", inputSize);
+
+    DoCollectionImpl<String> doCollectionImpl = new DoCollectionImpl<String>("Scaled collection", parentCollection,
+        new ScaledFunction(scaleFactor), Writables.strings());
+
+    assertEquals(expectedScaledSize, doCollectionImpl.getSizeInternal());
+  }
+
+  static class ScaledFunction extends DoFn<String, String> {
+
+    private float scaleFactor;
+
+    public ScaledFunction(float scaleFactor) {
+      this.scaleFactor = scaleFactor;
+    }
+
+    @Override
+    public void process(String input, Emitter<String> emitter) {
+      emitter.emit(input);
+    }
+
+    @Override
+    public float scaleFactor() {
+      return scaleFactor;
+    }
+
+  }
+
+  static class SizedPCollectionImpl extends PCollectionImpl<String> {
+
+    private long internalSize;
+
+    public SizedPCollectionImpl(String name, long internalSize) {
+      super(name);
+      this.internalSize = internalSize;
+    }
+
+    @Override
+    public PType getPType() {
+      return null;
+    }
+
+    @Override
+    public DoNode createDoNode() {
+      return null;
+    }
+
+    @Override
+    public List getParents() {
+      return null;
+    }
+
+    @Override
+    protected void acceptInternal(Visitor visitor) {
+    }
+
+    @Override
+    protected long getSizeInternal() {
+      return internalSize;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoTableImplTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoTableImplTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoTableImplTest.java
new file mode 100644
index 0000000..89b9944
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoTableImplTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.collect;
+
+import static org.apache.crunch.types.writable.Writables.strings;
+import static org.apache.crunch.types.writable.Writables.tableOf;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+import org.junit.Test;
+
+public class DoTableImplTest {
+
+  @Test
+  public void testGetSizeInternal_NoScaleFactor() {
+    runScaleTest(100L, 1.0f, 100L);
+  }
+
+  @Test
+  public void testGetSizeInternal_ScaleFactorBelowZero() {
+    runScaleTest(100L, 0.5f, 50L);
+  }
+
+  @Test
+  public void testGetSizeInternal_ScaleFactorAboveZero() {
+    runScaleTest(100L, 1.5f, 150L);
+  }
+
+  private void runScaleTest(long inputSize, float scaleFactor, long expectedScaledSize) {
+
+    @SuppressWarnings("unchecked")
+    PCollectionImpl<String> parentCollection = (PCollectionImpl<String>) mock(PCollectionImpl.class);
+
+    when(parentCollection.getSize()).thenReturn(inputSize);
+
+    DoTableImpl<String, String> doTableImpl = new DoTableImpl<String, String>("Scalled table collection",
+        parentCollection, new TableScaledFunction(scaleFactor), tableOf(strings(), strings()));
+
+    assertEquals(expectedScaledSize, doTableImpl.getSizeInternal());
+
+    verify(parentCollection).getSize();
+
+    verifyNoMoreInteractions(parentCollection);
+  }
+
+  static class TableScaledFunction extends DoFn<String, Pair<String, String>> {
+
+    private float scaleFactor;
+
+    public TableScaledFunction(float scaleFactor) {
+      this.scaleFactor = scaleFactor;
+    }
+
+    @Override
+    public float scaleFactor() {
+      return scaleFactor;
+    }
+
+    @Override
+    public void process(String input, Emitter<Pair<String, String>> emitter) {
+      emitter.emit(Pair.of(input, input));
+
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java
new file mode 100644
index 0000000..dd72364
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.emit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+import org.apache.crunch.impl.mr.run.RTNode;
+import org.apache.crunch.test.StringWrapper;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import com.google.common.collect.Lists;
+
+public class IntermediateEmitterTest {
+
+  private StringWrapper stringWrapper;
+  private PType ptype;
+
+  @Before
+  public void setUp() {
+    stringWrapper = new StringWrapper("test");
+    ptype = spy(Avros.reflects(StringWrapper.class));
+  }
+
+  @Test
+  public void testEmit_SingleChild() {
+    RTNode singleChild = mock(RTNode.class);
+    IntermediateEmitter emitter = new IntermediateEmitter(ptype, Lists.newArrayList(singleChild),
+        new Configuration());
+    emitter.emit(stringWrapper);
+
+    ArgumentCaptor<StringWrapper> argumentCaptor = ArgumentCaptor.forClass(StringWrapper.class);
+    verify(singleChild).process(argumentCaptor.capture());
+    assertSame(stringWrapper, argumentCaptor.getValue());
+  }
+
+  @Test
+  public void testEmit_MultipleChildren() {
+    RTNode childA = mock(RTNode.class);
+    RTNode childB = mock(RTNode.class);
+    IntermediateEmitter emitter = new IntermediateEmitter(ptype, Lists.newArrayList(childA, childB),
+        new Configuration());
+    emitter.emit(stringWrapper);
+
+    ArgumentCaptor<StringWrapper> argumentCaptorA = ArgumentCaptor.forClass(StringWrapper.class);
+    ArgumentCaptor<StringWrapper> argumentCaptorB = ArgumentCaptor.forClass(StringWrapper.class);
+
+    verify(childA).process(argumentCaptorA.capture());
+    verify(childB).process(argumentCaptorB.capture());
+
+    assertEquals(stringWrapper, argumentCaptorA.getValue());
+    assertEquals(stringWrapper, argumentCaptorB.getValue());
+
+    // Make sure that multiple children means deep copies are performed
+    assertNotSame(stringWrapper, argumentCaptorA.getValue());
+    assertNotSame(stringWrapper, argumentCaptorB.getValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/impl/mr/exec/CappedExponentialCounterTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/exec/CappedExponentialCounterTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mr/exec/CappedExponentialCounterTest.java
new file mode 100644
index 0000000..958df12
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/impl/mr/exec/CappedExponentialCounterTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.exec;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+public class CappedExponentialCounterTest {
+
+  @Test
+  public void testGet() {
+    CappedExponentialCounter c = new CappedExponentialCounter(1L, Long.MAX_VALUE);
+    assertEquals(1L, c.get());
+    assertEquals(2L, c.get());
+    assertEquals(4L, c.get());
+    assertEquals(8L, c.get());
+  }
+
+  @Test
+  public void testCap() {
+    CappedExponentialCounter c = new CappedExponentialCounter(1L, 2);
+    assertEquals(1L, c.get());
+    assertEquals(2L, c.get());
+    assertEquals(2L, c.get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobHooksTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobHooksTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobHooksTest.java
new file mode 100644
index 0000000..f03c3e2
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobHooksTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.exec;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+public class CrunchJobHooksTest {
+
+  @Test
+  public void testExtractPartitionNumber() {
+    assertEquals(0, CrunchJobHooks.extractPartitionNumber("out1-r-00000"));
+    assertEquals(10, CrunchJobHooks.extractPartitionNumber("out2-r-00010"));
+    assertEquals(99999, CrunchJobHooks.extractPartitionNumber("out3-r-99999"));
+  }
+
+  @Test
+  public void testExtractPartitionNumber_WithSuffix() {
+    assertEquals(10, CrunchJobHooks.extractPartitionNumber("out2-r-00010.avro"));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testExtractPartitionNumber_MapOutputFile() {
+    CrunchJobHooks.extractPartitionNumber("out1-m-00000");
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java
new file mode 100644
index 0000000..562238d
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+
+import org.apache.crunch.Source;
+import org.apache.crunch.Target;
+import org.apache.crunch.impl.mr.collect.InputCollection;
+import org.apache.crunch.impl.mr.collect.PCollectionImpl;
+import org.apache.crunch.impl.mr.plan.DotfileWriter.MRTaskType;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.google.common.collect.Lists;
+
+public class DotfileWriterTest {
+
+  private DotfileWriter dotfileWriter;
+
+  @Before
+  public void setUp() {
+    dotfileWriter = new DotfileWriter();
+  }
+
+  @Test
+  public void testFormatPCollectionNodeDeclaration() {
+    PCollectionImpl<?> pcollectionImpl = mock(PCollectionImpl.class);
+    JobPrototype jobPrototype = mock(JobPrototype.class);
+    when(pcollectionImpl.getName()).thenReturn("collection");
+
+    assertEquals("\"collection@" + pcollectionImpl.hashCode() + "@" + jobPrototype.hashCode()
+        + "\" [label=\"collection\" shape=box];",
+        dotfileWriter.formatPCollectionNodeDeclaration(pcollectionImpl, jobPrototype));
+  }
+
+  @Test
+  public void testFormatPCollectionNodeDeclaration_InputPCollection() {
+    InputCollection<?> inputCollection = mock(InputCollection.class, Mockito.RETURNS_DEEP_STUBS);
+    JobPrototype jobPrototype = mock(JobPrototype.class);
+    when(inputCollection.getName()).thenReturn("input");
+    when(inputCollection.getSource().toString()).thenReturn("source");
+
+    assertEquals("\"source\" [label=\"input\" shape=folder];",
+        dotfileWriter.formatPCollectionNodeDeclaration(inputCollection, jobPrototype));
+  }
+
+  @Test
+  public void testFormatTargetNodeDeclaration() {
+    Target target = mock(Target.class);
+    when(target.toString()).thenReturn("target/path");
+
+    assertEquals("\"target/path\" [label=\"target/path\" shape=folder];",
+        dotfileWriter.formatTargetNodeDeclaration(target));
+  }
+
+  @Test
+  public void testFormatPCollection() {
+    PCollectionImpl<?> pcollectionImpl = mock(PCollectionImpl.class);
+    JobPrototype jobPrototype = mock(JobPrototype.class);
+    when(pcollectionImpl.getName()).thenReturn("collection");
+
+    assertEquals("\"collection@" + pcollectionImpl.hashCode() + "@" + jobPrototype.hashCode() + "\"",
+        dotfileWriter.formatPCollection(pcollectionImpl, jobPrototype));
+  }
+
+  @Test
+  public void testFormatPCollection_InputCollection() {
+    InputCollection<Object> inputCollection = mock(InputCollection.class);
+    Source<Object> source = mock(Source.class);
+    JobPrototype jobPrototype = mock(JobPrototype.class);
+    when(source.toString()).thenReturn("mocksource");
+    when(inputCollection.getSource()).thenReturn(source);
+
+    assertEquals("\"mocksource\"", dotfileWriter.formatPCollection(inputCollection, jobPrototype));
+  }
+
+  @Test
+  public void testFormatNodeCollection() {
+    List<String> nodeCollection = Lists.newArrayList("one", "two", "three");
+    assertEquals("one -> two -> three;", dotfileWriter.formatNodeCollection(nodeCollection));
+  }
+
+  @Test
+  public void testFormatNodePath() {
+    PCollectionImpl<?> tail = mock(PCollectionImpl.class);
+    PCollectionImpl<?> head = mock(PCollectionImpl.class);
+    JobPrototype jobPrototype = mock(JobPrototype.class);
+
+    when(tail.getName()).thenReturn("tail");
+    when(head.getName()).thenReturn("head");
+
+    NodePath nodePath = new NodePath(tail);
+    nodePath.close(head);
+
+    assertEquals(
+        Lists.newArrayList("\"head@" + head.hashCode() + "@" + jobPrototype.hashCode() + "\" -> \"tail@"
+            + tail.hashCode() + "@" + jobPrototype.hashCode() + "\";"),
+        dotfileWriter.formatNodePath(nodePath, jobPrototype));
+  }
+
+  @Test
+  public void testGetTaskGraphAttributes_Map() {
+    assertEquals("label = Map; color = blue;", dotfileWriter.getTaskGraphAttributes(MRTaskType.MAP));
+  }
+
+  @Test
+  public void testGetTaskGraphAttributes_Reduce() {
+    assertEquals("label = Reduce; color = red;", dotfileWriter.getTaskGraphAttributes(MRTaskType.REDUCE));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java
new file mode 100644
index 0000000..7963c83
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.crunch.types.writable.Writables;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class JobNameBuilderTest {
+
+  @Test
+  public void testBuild() {
+    final String pipelineName = "PipelineName";
+    final String nodeName = "outputNode";
+    DoNode doNode = DoNode.createOutputNode(nodeName, Writables.strings());
+    JobNameBuilder jobNameBuilder = new JobNameBuilder(pipelineName);
+    jobNameBuilder.visit(Lists.newArrayList(doNode));
+    String jobName = jobNameBuilder.build();
+
+    assertEquals(String.format("%s: %s", pipelineName, nodeName), jobName);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/io/SequentialFileNamingSchemeTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/io/SequentialFileNamingSchemeTest.java b/crunch-core/src/test/java/org/apache/crunch/io/SequentialFileNamingSchemeTest.java
new file mode 100644
index 0000000..467da15
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/io/SequentialFileNamingSchemeTest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class SequentialFileNamingSchemeTest {
+
+  // The partition id used for testing. This partition id should be ignored by
+  // the SequentialFileNamingScheme.
+  private static final int PARTITION_ID = 42;
+
+  private SequentialFileNamingScheme namingScheme;
+  private Configuration configuration;
+
+  @Rule
+  public TemporaryFolder tmpOutputDir = new TemporaryFolder();
+
+  @Before
+  public void setUp() throws IOException {
+    configuration = new Configuration();
+    namingScheme = new SequentialFileNamingScheme();
+  }
+
+  @Test
+  public void testGetMapOutputName_EmptyDirectory() throws IOException {
+    assertEquals("part-m-00000",
+        namingScheme.getMapOutputName(configuration, new Path(tmpOutputDir.getRoot().getAbsolutePath())));
+  }
+
+  @Test
+  public void testGetMapOutputName_NonEmptyDirectory() throws IOException {
+    File outputDirectory = tmpOutputDir.getRoot();
+
+    new File(outputDirectory, "existing-1").createNewFile();
+    new File(outputDirectory, "existing-2").createNewFile();
+
+    assertEquals("part-m-00002",
+        namingScheme.getMapOutputName(configuration, new Path(outputDirectory.getAbsolutePath())));
+  }
+
+  @Test
+  public void testGetReduceOutputName_EmptyDirectory() throws IOException {
+    assertEquals("part-r-00000", namingScheme.getReduceOutputName(configuration, new Path(tmpOutputDir.getRoot()
+        .getAbsolutePath()), PARTITION_ID));
+  }
+
+  @Test
+  public void testGetReduceOutputName_NonEmptyDirectory() throws IOException {
+    File outputDirectory = tmpOutputDir.getRoot();
+
+    new File(outputDirectory, "existing-1").createNewFile();
+    new File(outputDirectory, "existing-2").createNewFile();
+
+    assertEquals("part-r-00002",
+        namingScheme.getReduceOutputName(configuration, new Path(outputDirectory.getAbsolutePath()), PARTITION_ID));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/io/SourceTargetHelperTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/io/SourceTargetHelperTest.java b/crunch-core/src/test/java/org/apache/crunch/io/SourceTargetHelperTest.java
new file mode 100644
index 0000000..5b0ea55
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/io/SourceTargetHelperTest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+public class SourceTargetHelperTest {
+
+  @Test
+  public void testGetNonexistentPathSize() throws Exception {
+    File tmp = File.createTempFile("pathsize", "");
+    Path tmpPath = new Path(tmp.getAbsolutePath());
+    tmp.delete();
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+    assertEquals(-1L, SourceTargetHelper.getPathSize(fs, tmpPath));
+  }
+
+  @Test
+  public void testGetNonExistentPathSize_NonExistantPath() throws IOException {
+    FileSystem mockFs = new MockFileSystem();
+    assertEquals(-1L, SourceTargetHelper.getPathSize(mockFs, new Path("does/not/exist")));
+  }
+
+  /**
+   * Mock FileSystem that returns null for {@link FileSystem#listStatus(Path)}.
+   */
+  static class MockFileSystem extends LocalFileSystem {
+
+    @Override
+    public FileStatus[] listStatus(Path f) throws IOException {
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/io/avro/AvroFileReaderFactoryTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/io/avro/AvroFileReaderFactoryTest.java b/crunch-core/src/test/java/org/apache/crunch/io/avro/AvroFileReaderFactoryTest.java
new file mode 100644
index 0000000..62085f8
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/io/avro/AvroFileReaderFactoryTest.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.avro;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.crunch.Pair;
+import org.apache.crunch.test.Person;
+import org.apache.crunch.test.StringWrapper;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class AvroFileReaderFactoryTest {
+
+  private File avroFile;
+
+  @Before
+  public void setUp() throws IOException {
+    avroFile = File.createTempFile("test", ".av");
+  }
+
+  @After
+  public void tearDown() {
+    avroFile.delete();
+  }
+
+  private void populateGenericFile(List<GenericRecord> genericRecords, Schema outputSchema) throws IOException {
+    FileOutputStream outputStream = new FileOutputStream(this.avroFile);
+    GenericDatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<GenericRecord>(outputSchema);
+
+    DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(genericDatumWriter);
+    dataFileWriter.create(outputSchema, outputStream);
+
+    for (GenericRecord record : genericRecords) {
+      dataFileWriter.append(record);
+    }
+
+    dataFileWriter.close();
+    outputStream.close();
+
+  }
+
+  private <T> AvroFileReaderFactory<T> createFileReaderFactory(AvroType<T> avroType) {
+    return new AvroFileReaderFactory<T>(avroType);
+  }
+
+  @Test
+  public void testRead_GenericReader() throws IOException {
+    GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
+    savedRecord.put("name", "John Doe");
+    savedRecord.put("age", 42);
+    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+    populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+    AvroFileReaderFactory<GenericData.Record> genericReader = createFileReaderFactory(Avros.generics(Person.SCHEMA$));
+    Iterator<GenericData.Record> recordIterator = genericReader.read(FileSystem.getLocal(new Configuration()),
+        new Path(this.avroFile.getAbsolutePath()));
+
+    GenericRecord genericRecord = recordIterator.next();
+    assertEquals(savedRecord, genericRecord);
+    assertFalse(recordIterator.hasNext());
+  }
+
+  @Test
+  public void testRead_SpecificReader() throws IOException {
+    GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
+    savedRecord.put("name", "John Doe");
+    savedRecord.put("age", 42);
+    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+    populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+    AvroFileReaderFactory<Person> genericReader = createFileReaderFactory(Avros.records(Person.class));
+    Iterator<Person> recordIterator = genericReader.read(FileSystem.getLocal(new Configuration()), new Path(
+        this.avroFile.getAbsolutePath()));
+
+    Person expectedPerson = new Person();
+    expectedPerson.age = 42;
+    expectedPerson.name = "John Doe";
+    List<CharSequence> siblingNames = Lists.newArrayList();
+    siblingNames.add("Jimmy");
+    siblingNames.add("Jane");
+    expectedPerson.siblingnames = siblingNames;
+
+    Person person = recordIterator.next();
+
+    assertEquals(expectedPerson, person);
+    assertFalse(recordIterator.hasNext());
+  }
+
+  @Test
+  public void testRead_ReflectReader() throws IOException {
+    Schema reflectSchema = ReflectData.get().getSchema(StringWrapper.class);
+    GenericRecord savedRecord = new GenericData.Record(reflectSchema);
+    savedRecord.put("value", "stringvalue");
+    populateGenericFile(Lists.newArrayList(savedRecord), reflectSchema);
+
+    AvroFileReaderFactory<StringWrapper> genericReader = createFileReaderFactory(Avros.reflects(StringWrapper.class));
+    Iterator<StringWrapper> recordIterator = genericReader.read(FileSystem.getLocal(new Configuration()), new Path(
+        this.avroFile.getAbsolutePath()));
+
+    StringWrapper stringWrapper = recordIterator.next();
+
+    assertEquals("stringvalue", stringWrapper.getValue());
+    assertFalse(recordIterator.hasNext());
+  }
+
+  @Test
+  public void testCreateDatumReader_Generic() {
+    DatumReader<Record> datumReader = AvroFileReaderFactory.createDatumReader(Avros.generics(Person.SCHEMA$));
+    assertEquals(GenericDatumReader.class, datumReader.getClass());
+  }
+
+  @Test
+  public void testCreateDatumReader_Reflect() {
+    DatumReader<StringWrapper> datumReader = AvroFileReaderFactory.createDatumReader(Avros
+        .reflects(StringWrapper.class));
+    assertEquals(ReflectDatumReader.class, datumReader.getClass());
+  }
+
+  @Test
+  public void testCreateDatumReader_Specific() {
+    DatumReader<Person> datumReader = AvroFileReaderFactory.createDatumReader(Avros.records(Person.class));
+    assertEquals(SpecificDatumReader.class, datumReader.getClass());
+  }
+
+  @Test
+  public void testCreateDatumReader_ReflectAndSpecific() {
+    Assume.assumeTrue(Avros.CAN_COMBINE_SPECIFIC_AND_REFLECT_SCHEMAS);
+
+    DatumReader<Pair<Person, StringWrapper>> datumReader = AvroFileReaderFactory.createDatumReader(Avros.pairs(
+        Avros.records(Person.class), Avros.reflects(StringWrapper.class)));
+    assertEquals(ReflectDatumReader.class, datumReader.getClass());
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testCreateDatumReader_ReflectAndSpecific_NotSupported() {
+    Assume.assumeTrue(!Avros.CAN_COMBINE_SPECIFIC_AND_REFLECT_SCHEMAS);
+    AvroFileReaderFactory.createDatumReader(Avros.pairs(Avros.records(Person.class),
+        Avros.reflects(StringWrapper.class)));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/io/avro/AvroFileSourceTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/io/avro/AvroFileSourceTest.java b/crunch-core/src/test/java/org/apache/crunch/io/avro/AvroFileSourceTest.java
new file mode 100644
index 0000000..ceef2b2
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/io/avro/AvroFileSourceTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.avro;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.mapred.AvroJob;
+import org.apache.crunch.test.Person;
+import org.apache.crunch.test.StringWrapper;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AvroFileSourceTest {
+
+  private Job job;
+  File tempFile;
+
+  @Before
+  public void setUp() throws IOException {
+    job = new Job();
+    tempFile = File.createTempFile("test", ".avr");
+  }
+
+  @After
+  public void tearDown() {
+    tempFile.delete();
+  }
+
+  @Test
+  public void testConfigureJob_SpecificData() throws IOException {
+    AvroType<Person> avroSpecificType = Avros.records(Person.class);
+    AvroFileSource<Person> personFileSource = new AvroFileSource<Person>(new Path(tempFile.getAbsolutePath()),
+        avroSpecificType);
+
+    personFileSource.configureSource(job, -1);
+
+    assertFalse(job.getConfiguration().getBoolean(AvroJob.INPUT_IS_REFLECT, true));
+    assertEquals(Person.SCHEMA$.toString(), job.getConfiguration().get(AvroJob.INPUT_SCHEMA));
+  }
+
+  @Test
+  public void testConfigureJob_GenericData() throws IOException {
+    AvroType<Record> avroGenericType = Avros.generics(Person.SCHEMA$);
+    AvroFileSource<Record> personFileSource = new AvroFileSource<Record>(new Path(tempFile.getAbsolutePath()),
+        avroGenericType);
+
+    personFileSource.configureSource(job, -1);
+
+    assertFalse(job.getConfiguration().getBoolean(AvroJob.INPUT_IS_REFLECT, true));
+
+  }
+
+  @Test
+  public void testConfigureJob_ReflectData() throws IOException {
+    AvroType<StringWrapper> avroReflectType = Avros.reflects(StringWrapper.class);
+    AvroFileSource<StringWrapper> personFileSource = new AvroFileSource<StringWrapper>(new Path(
+        tempFile.getAbsolutePath()), avroReflectType);
+
+    personFileSource.configureSource(job, -1);
+
+    assertTrue(job.getConfiguration().getBoolean(AvroJob.INPUT_IS_REFLECT, false));
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/lib/AvroIndexedRecordPartitionerTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/AvroIndexedRecordPartitionerTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/AvroIndexedRecordPartitionerTest.java
new file mode 100644
index 0000000..0dfed32
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/lib/AvroIndexedRecordPartitionerTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.crunch.lib.join.JoinUtils.AvroIndexedRecordPartitioner;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AvroIndexedRecordPartitionerTest {
+
+  private AvroIndexedRecordPartitioner avroPartitioner;
+
+  @Before
+  public void setUp() {
+    avroPartitioner = new AvroIndexedRecordPartitioner();
+  }
+
+  @Test
+  public void testGetPartition() {
+    IndexedRecord indexedRecord = new MockIndexedRecord(3);
+    AvroKey<IndexedRecord> avroKey = new AvroKey<IndexedRecord>(indexedRecord);
+
+    assertEquals(3, avroPartitioner.getPartition(avroKey, new AvroValue<Object>(), 5));
+    assertEquals(1, avroPartitioner.getPartition(avroKey, new AvroValue<Object>(), 2));
+  }
+
+  @Test
+  public void testGetPartition_NegativeHashValue() {
+    IndexedRecord indexedRecord = new MockIndexedRecord(-3);
+    AvroKey<IndexedRecord> avroKey = new AvroKey<IndexedRecord>(indexedRecord);
+
+    assertEquals(3, avroPartitioner.getPartition(avroKey, new AvroValue<Object>(), 5));
+    assertEquals(1, avroPartitioner.getPartition(avroKey, new AvroValue<Object>(), 2));
+  }
+
+  @Test
+  public void testGetPartition_IntegerMinValue() {
+    IndexedRecord indexedRecord = new MockIndexedRecord(Integer.MIN_VALUE);
+    AvroKey<IndexedRecord> avroKey = new AvroKey<IndexedRecord>(indexedRecord);
+
+    assertEquals(0, avroPartitioner.getPartition(avroKey, new AvroValue<Object>(), Integer.MAX_VALUE));
+  }
+
+  /**
+   * Mock implementation of IndexedRecord to give us control over the hashCode.
+   */
+  static class MockIndexedRecord implements IndexedRecord {
+
+    private Integer value;
+
+    public MockIndexedRecord(Integer value) {
+      this.value = value;
+    }
+
+    @Override
+    public int hashCode() {
+      return value.hashCode();
+    }
+
+    @Override
+    public Schema getSchema() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Object get(int arg0) {
+      return this.value;
+    }
+
+    @Override
+    public void put(int arg0, Object arg1) {
+      throw new UnsupportedOperationException();
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/lib/CartesianTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/CartesianTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/CartesianTest.java
new file mode 100644
index 0000000..b19097c
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/lib/CartesianTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.types.writable.Writables;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class CartesianTest {
+
+  @Test
+  public void testCartesianCollection_SingleValues() {
+
+    PCollection<String> letters = MemPipeline.typedCollectionOf(Writables.strings(), "a", "b");
+    PCollection<Integer> ints = MemPipeline.typedCollectionOf(Writables.ints(), 1, 2);
+
+    PCollection<Pair<String, Integer>> cartesianProduct = Cartesian.cross(letters, ints);
+
+    @SuppressWarnings("unchecked")
+    List<Pair<String, Integer>> expectedResults = Lists.newArrayList(Pair.of("a", 1), Pair.of("a", 2), Pair.of("b", 1),
+        Pair.of("b", 2));
+    List<Pair<String, Integer>> actualResults = Lists.newArrayList(cartesianProduct.materialize());
+    Collections.sort(actualResults);
+
+    assertEquals(expectedResults, actualResults);
+  }
+
+  @Test
+  public void testCartesianCollection_Tables() {
+
+    PTable<String, Integer> leftTable = MemPipeline.typedTableOf(
+        Writables.tableOf(Writables.strings(), Writables.ints()), "a", 1, "b", 2);
+    PTable<String, Float> rightTable = MemPipeline.typedTableOf(
+        Writables.tableOf(Writables.strings(), Writables.floats()), "A", 1.0f, "B", 2.0f);
+
+    PTable<Pair<String, String>, Pair<Integer, Float>> cartesianProduct = Cartesian.cross(leftTable, rightTable);
+
+    List<Pair<Pair<String, String>, Pair<Integer, Float>>> expectedResults = Lists.newArrayList();
+    expectedResults.add(Pair.of(Pair.of("a", "A"), Pair.of(1, 1.0f)));
+    expectedResults.add(Pair.of(Pair.of("a", "B"), Pair.of(1, 2.0f)));
+    expectedResults.add(Pair.of(Pair.of("b", "A"), Pair.of(2, 1.0f)));
+    expectedResults.add(Pair.of(Pair.of("b", "B"), Pair.of(2, 2.0f)));
+
+    List<Pair<Pair<String, String>, Pair<Integer, Float>>> actualResults = Lists.newArrayList(cartesianProduct
+        .materialize());
+    Collections.sort(actualResults);
+
+    assertEquals(expectedResults, actualResults);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/lib/DistinctTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/DistinctTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/DistinctTest.java
new file mode 100644
index 0000000..8c0b3bf
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/lib/DistinctTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.types.avro.Avros;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableSet;
+
+public class DistinctTest {
+  private static final List<Integer> DATA = Arrays.asList(
+      17, 29, 17, 29, 17, 29, 36, 45, 17, 45, 36, 29
+  );
+
+  @Test
+  public void testDistinct() {
+    PCollection<Integer> input = MemPipeline.typedCollectionOf(Avros.ints(), DATA);
+    Iterable<Integer> unique = Distinct.distinct(input).materialize();
+
+    assertEquals(ImmutableSet.copyOf(DATA), ImmutableSet.copyOf(unique));
+  }
+
+  @Test
+  public void testDistinctFlush() {
+    PCollection<Integer> input = MemPipeline.typedCollectionOf(Avros.ints(), DATA);
+    Iterable<Integer> unique = Distinct.distinct(input, 2).materialize();
+
+    assertEquals(ImmutableSet.copyOf(DATA), ImmutableSet.copyOf(unique));
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/lib/SampleTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/SampleTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/SampleTest.java
new file mode 100644
index 0000000..bd6fd81
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/lib/SampleTest.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.types.writable.Writables;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
+public class SampleTest {
+  private PCollection<Pair<String, Double>> values = MemPipeline.typedCollectionOf(
+      Writables.pairs(Writables.strings(), Writables.doubles()),
+      ImmutableList.of(
+        Pair.of("foo", 200.0),
+        Pair.of("bar", 400.0),
+        Pair.of("baz", 100.0),
+        Pair.of("biz", 100.0)));
+  
+  @Test
+  public void testWRS() throws Exception {
+    Map<String, Integer> histogram = Maps.newHashMap();
+    
+    for (int i = 0; i < 100; i++) {
+      PCollection<String> sample = Sample.weightedReservoirSample(values, 1, 1729L + i);
+      for (String s : sample.materialize()) {
+        if (!histogram.containsKey(s)) {
+          histogram.put(s, 1);
+        } else {
+          histogram.put(s, 1 + histogram.get(s));
+        }
+      }
+    }
+    
+    Map<String, Integer> expected = ImmutableMap.of(
+        "foo", 24, "bar", 51, "baz", 13, "biz", 12);
+    assertEquals(expected, histogram);
+  }
+
+  @Test
+  public void testSample() {
+    PCollection<Integer> pcollect = MemPipeline.collectionOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    Iterable<Integer> sample = Sample.sample(pcollect, 123998L, 0.2).materialize();
+    List<Integer> sampleValues = ImmutableList.copyOf(sample);
+    assertEquals(ImmutableList.of(6, 7), sampleValues);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/lib/SecondarySortTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/SecondarySortTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/SecondarySortTest.java
new file mode 100644
index 0000000..933b986
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/lib/SecondarySortTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.apache.crunch.types.avro.Avros.*;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+
+
+public class SecondarySortTest {
+  @Test
+  public void testInMemory() throws Exception {
+    PTable<Long, Pair<Long, String>> input = MemPipeline.typedTableOf(tableOf(longs(), pairs(longs(), strings())),
+        1729L, Pair.of(17L, "a"), 100L, Pair.of(29L, "b"), 1729L, Pair.of(29L, "c"));
+    PCollection<String> letters = SecondarySort.sortAndApply(input, new StringifyFn(), strings());
+    assertEquals(ImmutableList.of("b", "ac"), letters.materialize());
+  }
+  
+  private static class StringifyFn extends DoFn<Pair<Long, Iterable<Pair<Long, String>>>, String> {
+    @Override
+    public void process(Pair<Long, Iterable<Pair<Long, String>>> input, Emitter<String> emitter) {
+      StringBuilder sb = new StringBuilder();
+      for (Pair<Long, String> p : input.second()) {
+        sb.append(p.second());
+      }
+      emitter.emit(sb.toString());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java
new file mode 100644
index 0000000..35ccc11
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.crunch.lib.join.JoinUtils.TupleWritablePartitioner;
+import org.apache.crunch.types.writable.TupleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TupleWritablePartitionerTest {
+
+  private TupleWritablePartitioner tupleWritableParitioner;
+
+  @Before
+  public void setUp() {
+    tupleWritableParitioner = new TupleWritablePartitioner();
+  }
+
+  @Test
+  public void testGetPartition() {
+    IntWritable intWritable = new IntWritable(3);
+    TupleWritable key = new TupleWritable(new Writable[] { intWritable });
+    assertEquals(3, tupleWritableParitioner.getPartition(key, NullWritable.get(), 5));
+    assertEquals(1, tupleWritableParitioner.getPartition(key, NullWritable.get(), 2));
+  }
+
+  @Test
+  public void testGetPartition_NegativeHashValue() {
+    IntWritable intWritable = new IntWritable(-3);
+    // Sanity check, if this doesn't work then the premise of this test is wrong
+    assertEquals(-3, intWritable.hashCode());
+
+    TupleWritable key = new TupleWritable(new Writable[] { intWritable });
+    assertEquals(3, tupleWritableParitioner.getPartition(key, NullWritable.get(), 5));
+    assertEquals(1, tupleWritableParitioner.getPartition(key, NullWritable.get(), 2));
+  }
+
+  @Test
+  public void testGetPartition_IntegerMinValue() {
+    IntWritable intWritable = new IntWritable(Integer.MIN_VALUE);
+    // Sanity check, if this doesn't work then the premise of this test is wrong
+    assertEquals(Integer.MIN_VALUE, intWritable.hashCode());
+
+    TupleWritable key = new TupleWritable(new Writable[] { intWritable });
+    assertEquals(0, tupleWritableParitioner.getPartition(key, NullWritable.get(), Integer.MAX_VALUE));
+  }
+
+}