You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/04/23 22:41:26 UTC
[24/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/fn/AggregatorsTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/fn/AggregatorsTest.java b/crunch-core/src/test/java/org/apache/crunch/fn/AggregatorsTest.java
new file mode 100644
index 0000000..6ee1972
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/fn/AggregatorsTest.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import static org.apache.crunch.fn.Aggregators.MAX_BIGINTS;
+import static org.apache.crunch.fn.Aggregators.MAX_DOUBLES;
+import static org.apache.crunch.fn.Aggregators.MAX_FLOATS;
+import static org.apache.crunch.fn.Aggregators.MAX_INTS;
+import static org.apache.crunch.fn.Aggregators.MAX_LONGS;
+import static org.apache.crunch.fn.Aggregators.MAX_N;
+import static org.apache.crunch.fn.Aggregators.MIN_BIGINTS;
+import static org.apache.crunch.fn.Aggregators.MIN_DOUBLES;
+import static org.apache.crunch.fn.Aggregators.MIN_FLOATS;
+import static org.apache.crunch.fn.Aggregators.MIN_INTS;
+import static org.apache.crunch.fn.Aggregators.MIN_LONGS;
+import static org.apache.crunch.fn.Aggregators.MIN_N;
+import static org.apache.crunch.fn.Aggregators.STRING_CONCAT;
+import static org.apache.crunch.fn.Aggregators.SUM_BIGINTS;
+import static org.apache.crunch.fn.Aggregators.SUM_DOUBLES;
+import static org.apache.crunch.fn.Aggregators.SUM_FLOATS;
+import static org.apache.crunch.fn.Aggregators.SUM_INTS;
+import static org.apache.crunch.fn.Aggregators.SUM_LONGS;
+import static org.hamcrest.Matchers.closeTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import java.math.BigInteger;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.crunch.Aggregator;
+import org.apache.crunch.CombineFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.Tuple4;
+import org.apache.crunch.TupleN;
+import org.apache.crunch.impl.mem.emit.InMemoryEmitter;
+import org.junit.Test;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+
+public class AggregatorsTest {
+
+ @Test
+ public void testSums2() {
+ assertThat(sapply(SUM_INTS(), 1, 2, 3, -4), is(2));
+ assertThat(sapply(SUM_LONGS(), 1L, 2L, 3L, -4L, 5000000000L), is(5000000002L));
+ assertThat(sapply(SUM_FLOATS(), 1f, 2f, 3f, -4f), is(2f));
+ assertThat(sapply(SUM_DOUBLES(), 0.1, 0.2, 0.3), is(closeTo(0.6, 0.00001)));
+ assertThat(sapply(SUM_BIGINTS(), bigInt("7"), bigInt("3")), is(bigInt("10")));
+ }
+
+ @Test
+ public void testSums() {
+ assertThat(sapply(SUM_LONGS(), 29L, 17L, 1729L), is(1775L));
+ assertThat(sapply(SUM_LONGS(), 29L, 7L, 1729L), is(1765L));
+ assertThat(sapply(SUM_INTS(), 29, 17, 1729), is(1775));
+ assertThat(sapply(SUM_FLOATS(), 29f, 17f, 1729f), is(1775.0f));
+ assertThat(sapply(SUM_DOUBLES(), 29.0, 17.0, 1729.0), is(1775.0));
+ assertThat(sapply(SUM_BIGINTS(), bigInt("29"), bigInt("17"), bigInt("1729")), is(bigInt("1775")));
+ }
+
+ @Test
+ public void testMax() {
+ assertThat(sapply(MAX_LONGS(), 29L, 17L, 1729L), is(1729L));
+ assertThat(sapply(MAX_INTS(), 29, 17, 1729), is(1729));
+ assertThat(sapply(MAX_FLOATS(), 29f, 17f, 1729f), is(1729.0f));
+ assertThat(sapply(MAX_DOUBLES(), 29.0, 17.0, 1729.0), is(1729.0));
+ assertThat(sapply(MAX_FLOATS(), 29f, 1745f, 17f, 1729f), is(1745.0f));
+ assertThat(sapply(MAX_BIGINTS(), bigInt("29"), bigInt("17"), bigInt("1729")), is(bigInt("1729")));
+ }
+
+ @Test
+ public void testMin() {
+ assertThat(sapply(MIN_LONGS(), 29L, 17L, 1729L), is(17L));
+ assertThat(sapply(MIN_INTS(), 29, 17, 1729), is(17));
+ assertThat(sapply(MIN_FLOATS(), 29f, 17f, 1729f), is(17.0f));
+ assertThat(sapply(MIN_DOUBLES(), 29.0, 17.0, 1729.0), is(17.0));
+ assertThat(sapply(MIN_INTS(), 29, 170, 1729), is(29));
+ assertThat(sapply(MIN_BIGINTS(), bigInt("29"), bigInt("17"), bigInt("1729")), is(bigInt("17")));
+ }
+
+ @Test
+ public void testMaxN() {
+ assertThat(apply(MAX_INTS(2), 17, 34, 98, 29, 1009), is(ImmutableList.of(98, 1009)));
+ assertThat(apply(MAX_N(1, String.class), "b", "a"), is(ImmutableList.of("b")));
+ assertThat(apply(MAX_N(3, String.class), "b", "a", "d", "c"), is(ImmutableList.of("b", "c", "d")));
+ }
+
+ @Test
+ public void testMinN() {
+ assertThat(apply(MIN_INTS(2), 17, 34, 98, 29, 1009), is(ImmutableList.of(17, 29)));
+ assertThat(apply(MIN_N(1, String.class), "b", "a"), is(ImmutableList.of("a")));
+ assertThat(apply(MIN_N(3, String.class), "b", "a", "d", "c"), is(ImmutableList.of("a", "b", "c")));
+ }
+
+ @Test
+ public void testFirstN() {
+ assertThat(apply(Aggregators.<Integer>FIRST_N(2), 17, 34, 98, 29, 1009), is(ImmutableList.of(17, 34)));
+ }
+
+ @Test
+ public void testLastN() {
+ assertThat(apply(Aggregators.<Integer>LAST_N(2), 17, 34, 98, 29, 1009), is(ImmutableList.of(29, 1009)));
+ }
+
+ @Test
+ public void testUniqueElements() {
+ assertThat(ImmutableSet.copyOf(apply(Aggregators.<Integer>UNIQUE_ELEMENTS(), 17, 29, 29, 16, 17)),
+ is(ImmutableSet.of(17, 29, 16)));
+
+ Iterable<Integer> samp = apply(Aggregators.<Integer>SAMPLE_UNIQUE_ELEMENTS(2), 17, 29, 16, 17, 29, 16);
+ assertThat(Iterables.size(samp), is(2));
+ assertThat(ImmutableSet.copyOf(samp).size(), is(2)); // check that the two elements are unique
+ }
+
+ @Test
+ public void testPairs() {
+ List<Pair<Long, Double>> input = ImmutableList.of(Pair.of(1720L, 17.29), Pair.of(9L, -3.14));
+ Aggregator<Pair<Long, Double>> a = Aggregators.pairAggregator(SUM_LONGS(), MIN_DOUBLES());
+
+ assertThat(sapply(a, input), is(Pair.of(1729L, -3.14)));
+ }
+
+ @Test
+ public void testPairsTwoLongs() {
+ List<Pair<Long, Long>> input = ImmutableList.of(Pair.of(1720L, 1L), Pair.of(9L, 19L));
+ Aggregator<Pair<Long, Long>> a = Aggregators.pairAggregator(SUM_LONGS(), SUM_LONGS());
+
+ assertThat(sapply(a, input), is(Pair.of(1729L, 20L)));
+ }
+
+ @Test
+ public void testTrips() {
+ List<Tuple3<Float, Double, Double>> input = ImmutableList.of(Tuple3.of(17.29f, 12.2, 0.1),
+ Tuple3.of(3.0f, 1.2, 3.14), Tuple3.of(-1.0f, 14.5, -0.98));
+ Aggregator<Tuple3<Float, Double, Double>> a = Aggregators.tripAggregator(
+ MAX_FLOATS(), MAX_DOUBLES(), MIN_DOUBLES());
+
+ assertThat(sapply(a, input), is(Tuple3.of(17.29f, 14.5, -0.98)));
+ }
+
+ @Test
+ public void testQuads() {
+ List<Tuple4<Float, Double, Double, Integer>> input = ImmutableList.of(Tuple4.of(17.29f, 12.2, 0.1, 1),
+ Tuple4.of(3.0f, 1.2, 3.14, 2), Tuple4.of(-1.0f, 14.5, -0.98, 3));
+ Aggregator<Tuple4<Float, Double, Double, Integer>> a = Aggregators.quadAggregator(
+ MAX_FLOATS(), MAX_DOUBLES(), MIN_DOUBLES(), SUM_INTS());
+
+ assertThat(sapply(a, input), is(Tuple4.of(17.29f, 14.5, -0.98, 6)));
+ }
+
+ @Test
+ public void testTupleN() {
+ List<TupleN> input = ImmutableList.of(new TupleN(1, 3.0, 1, 2.0, 4L), new TupleN(4, 17.0, 1, 9.7, 12L));
+ Aggregator<TupleN> a = Aggregators.tupleAggregator(
+ MIN_INTS(), SUM_DOUBLES(), MAX_INTS(), MIN_DOUBLES(), MAX_LONGS());
+
+ assertThat(sapply(a, input), is(new TupleN(1, 20.0, 1, 2.0, 12L)));
+ }
+
+ @Test
+ public void testConcatenation() {
+ assertThat(sapply(STRING_CONCAT("", true), "foo", "foobar", "bar"), is("foofoobarbar"));
+ assertThat(sapply(STRING_CONCAT("/", false), "foo", "foobar", "bar"), is("foo/foobar/bar"));
+ assertThat(sapply(STRING_CONCAT(" ", true), " ", ""), is(" "));
+ assertThat(sapply(STRING_CONCAT(" ", true), Arrays.asList(null, "")), is(""));
+ assertThat(sapply(STRING_CONCAT(" ", true, 20, 3), "foo", "foobar", "bar"), is("foo bar"));
+ assertThat(sapply(STRING_CONCAT(" ", true, 10, 6), "foo", "foobar", "bar"), is("foo foobar"));
+ assertThat(sapply(STRING_CONCAT(" ", true, 9, 6), "foo", "foobar", "bar"), is("foo bar"));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testConcatenationNullException() {
+ sapply(STRING_CONCAT(" ", false), Arrays.asList(null, "" ));
+ }
+
+
+ private static <T> T sapply(Aggregator<T> a, T... values) {
+ return sapply(a, ImmutableList.copyOf(values));
+ }
+
+ private static <T> T sapply(Aggregator<T> a, Iterable<T> values) {
+ return Iterables.getOnlyElement(apply(a, values));
+ }
+
+ private static <T> ImmutableList<T> apply(Aggregator<T> a, T... values) {
+ return apply(a, ImmutableList.copyOf(values));
+ }
+
+ private static <T> ImmutableList<T> apply(Aggregator<T> a, Iterable<T> values) {
+ CombineFn<String, T> fn = Aggregators.toCombineFn(a);
+
+ InMemoryEmitter<Pair<String, T>> e1 = new InMemoryEmitter<Pair<String,T>>();
+ fn.process(Pair.of("", values), e1);
+
+ // and a second time to make sure Aggregator.reset() works
+ InMemoryEmitter<Pair<String, T>> e2 = new InMemoryEmitter<Pair<String,T>>();
+ fn.process(Pair.of("", values), e2);
+
+ assertEquals(getValues(e1), getValues(e2));
+
+ return getValues(e1);
+ }
+
+ private static <K, V> ImmutableList<V> getValues(InMemoryEmitter<Pair<K, V>> emitter) {
+ return ImmutableList.copyOf(
+ Iterables.transform(emitter.getOutput(), new Function<Pair<K, V>, V>() {
+ @Override
+ public V apply(Pair<K, V> input) {
+ return input.second();
+ }
+ }));
+ }
+
+ private static BigInteger bigInt(String value) {
+ return new BigInteger(value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/fn/ExtractKeyFnTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/fn/ExtractKeyFnTest.java b/crunch-core/src/test/java/org/apache/crunch/fn/ExtractKeyFnTest.java
new file mode 100644
index 0000000..b5b2a1b
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/fn/ExtractKeyFnTest.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class ExtractKeyFnTest {
+
+ protected static final MapFn<String, Integer> mapFn = new MapFn<String, Integer>() {
+ @Override
+ public Integer map(String input) {
+ return input.hashCode();
+ }
+ };
+
+ protected static final ExtractKeyFn<Integer, String> one = new ExtractKeyFn<Integer, String>(mapFn);
+
+ @Test
+ public void test() {
+ StoreLastEmitter<Pair<Integer, String>> emitter = StoreLastEmitter.create();
+ one.process("boza", emitter);
+ assertEquals(Pair.of("boza".hashCode(), "boza"), emitter.getLast());
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/fn/FilterFnTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/fn/FilterFnTest.java b/crunch-core/src/test/java/org/apache/crunch/fn/FilterFnTest.java
new file mode 100644
index 0000000..a649f99
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/fn/FilterFnTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import org.apache.crunch.FilterFn;
+import org.junit.Test;
+
+import com.google.common.base.Predicates;
+
+
+public class FilterFnTest {
+
+ private static final FilterFn<String> TRUE = FilterFns.<String>ACCEPT_ALL();
+ private static final FilterFn<String> FALSE = FilterFns.<String>REJECT_ALL();
+
+ @Test
+ public void testAcceptAll() {
+ assertThat(TRUE.accept(""), is(true));
+ assertThat(TRUE.accept("foo"), is(true));
+ }
+
+ @Test
+ public void testRejectAll() {
+ assertThat(FALSE.accept(""), is(false));
+ assertThat(FALSE.accept("foo"), is(false));
+
+ Predicates.or(Predicates.alwaysFalse(), Predicates.alwaysTrue());
+ }
+
+ @Test
+ public void testAnd() {
+ assertThat(FilterFns.and(TRUE, TRUE).accept("foo"), is(true));
+ assertThat(FilterFns.and(TRUE, FALSE).accept("foo"), is(false));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testGeneric() {
+ assertThat(FilterFns.and(TRUE).accept("foo"), is(true));
+ assertThat(FilterFns.and(FALSE).accept("foo"), is(false));
+ assertThat(FilterFns.and(FALSE, FALSE, FALSE).accept("foo"), is(false));
+ assertThat(FilterFns.and(TRUE, TRUE, FALSE).accept("foo"), is(false));
+ assertThat(FilterFns.and(FALSE, FALSE, FALSE, FALSE).accept("foo"), is(false));
+ }
+
+ @Test
+ public void testOr() {
+ assertThat(FilterFns.or(FALSE, TRUE).accept("foo"), is(true));
+ assertThat(FilterFns.or(TRUE, FALSE).accept("foo"), is(true));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testOrGeneric() {
+ assertThat(FilterFns.or(TRUE).accept("foo"), is(true));
+ assertThat(FilterFns.or(FALSE).accept("foo"), is(false));
+ assertThat(FilterFns.or(TRUE, FALSE, TRUE).accept("foo"), is(true));
+ assertThat(FilterFns.or(FALSE, FALSE, TRUE).accept("foo"), is(true));
+ assertThat(FilterFns.or(FALSE, FALSE, FALSE).accept("foo"), is(false));
+ }
+
+ @Test
+ public void testNot() {
+ assertThat(FilterFns.not(TRUE).accept("foo"), is(false));
+ assertThat(FilterFns.not(FALSE).accept("foo"), is(true));
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/fn/MapKeysTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/fn/MapKeysTest.java b/crunch-core/src/test/java/org/apache/crunch/fn/MapKeysTest.java
new file mode 100644
index 0000000..6b73700
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/fn/MapKeysTest.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.crunch.Pair;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class MapKeysTest {
+
+ protected static final MapKeysFn<String, Integer, Integer> one = new MapKeysFn<String, Integer, Integer>() {
+ @Override
+ public Integer map(String input) {
+ return 1;
+ }
+ };
+
+ protected static final MapKeysFn<String, Integer, Integer> two = new MapKeysFn<String, Integer, Integer>() {
+ @Override
+ public Integer map(String input) {
+ return 2;
+ }
+ };
+
+ @Test
+ public void test() {
+ StoreLastEmitter<Pair<Integer, Integer>> emitter = StoreLastEmitter.create();
+ one.process(Pair.of("k", Integer.MAX_VALUE), emitter);
+ assertEquals(Pair.of(1, Integer.MAX_VALUE), emitter.getLast());
+ two.process(Pair.of("k", Integer.MAX_VALUE), emitter);
+ assertEquals(Pair.of(2, Integer.MAX_VALUE), emitter.getLast());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/fn/MapValuesTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/fn/MapValuesTest.java b/crunch-core/src/test/java/org/apache/crunch/fn/MapValuesTest.java
new file mode 100644
index 0000000..097b008
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/fn/MapValuesTest.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.crunch.Pair;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class MapValuesTest {
+
+ static final MapValuesFn<String, String, Integer> one = new MapValuesFn<String, String, Integer>() {
+ @Override
+ public Integer map(String input) {
+ return 1;
+ }
+ };
+
+ static final MapValuesFn<String, String, Integer> two = new MapValuesFn<String, String, Integer>() {
+ @Override
+ public Integer map(String input) {
+ return 2;
+ }
+ };
+
+ @Test
+ public void test() {
+ StoreLastEmitter<Pair<String, Integer>> emitter = StoreLastEmitter.create();
+ one.process(Pair.of("k", "v"), emitter);
+ assertEquals(Pair.of("k", 1), emitter.getLast());
+ two.process(Pair.of("k", "v"), emitter);
+ assertEquals(Pair.of("k", 2), emitter.getLast());
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/fn/PairMapTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/fn/PairMapTest.java b/crunch-core/src/test/java/org/apache/crunch/fn/PairMapTest.java
new file mode 100644
index 0000000..bef6c85
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/fn/PairMapTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class PairMapTest {
+
+ static final MapFn<String, Integer> one = new MapFn<String, Integer>() {
+ @Override
+ public Integer map(String input) {
+ return 1;
+ }
+ };
+
+ static final MapFn<String, Integer> two = new MapFn<String, Integer>() {
+ @Override
+ public Integer map(String input) {
+ return 2;
+ }
+ };
+
+ @Test
+ public void testPairMap() {
+ StoreLastEmitter<Pair<Integer, Integer>> emitter = StoreLastEmitter.create();
+ PairMapFn<String, String, Integer, Integer> fn = new PairMapFn<String, String, Integer, Integer>(one, two);
+ fn.process(Pair.of("a", "b"), emitter);
+ Pair<Integer, Integer> pair = emitter.getLast();
+ assertTrue(pair.first() == 1);
+ assertTrue(pair.second() == 2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/fn/StoreLastEmitter.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/fn/StoreLastEmitter.java b/crunch-core/src/test/java/org/apache/crunch/fn/StoreLastEmitter.java
new file mode 100644
index 0000000..cdd8754
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/fn/StoreLastEmitter.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import org.apache.crunch.Emitter;
+
+class StoreLastEmitter<T> implements Emitter<T> {
+ private T last;
+
+ @Override
+ public void emit(T emitted) {
+ last = emitted;
+ }
+
+ public T getLast() {
+ return last;
+ }
+
+ @Override
+ public void flush() {
+ }
+
+ public static <T> StoreLastEmitter<T> create() {
+ return new StoreLastEmitter<T>();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/impl/SingleUseIterableTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/SingleUseIterableTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/SingleUseIterableTest.java
new file mode 100644
index 0000000..811a0a3
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/impl/SingleUseIterableTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class SingleUseIterableTest {
+
+ @Test
+ public void testIterator() {
+ List<Integer> values = Lists.newArrayList(1,2,3);
+
+ SingleUseIterable<Integer> iterable = new SingleUseIterable<Integer>(values);
+
+ List<Integer> retrievedValues = Lists.newArrayList(iterable);
+
+ assertEquals(values, retrievedValues);
+ }
+
+ @Test(expected=IllegalStateException.class)
+ public void testIterator_MultipleCalls() {
+ List<Integer> values = Lists.newArrayList(1,2,3);
+
+ SingleUseIterable<Integer> iterable = new SingleUseIterable<Integer>(values);
+
+ List<Integer> retrievedValues = Lists.newArrayList(iterable);
+
+ for (Integer n : iterable) {
+
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/impl/mr/MRPipelineTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/MRPipelineTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mr/MRPipelineTest.java
new file mode 100644
index 0000000..9ed7a46
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/impl/mr/MRPipelineTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.impl.mr.collect.PCollectionImpl;
+import org.apache.crunch.impl.mr.run.RuntimeParameters;
+import org.apache.crunch.io.ReadableSourceTarget;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+
+@RunWith(MockitoJUnitRunner.class)
+public class MRPipelineTest {
+ @Rule
+ public TemporaryFolder tempDir = new TemporaryFolder();
+ @Mock
+ private PCollectionImpl<String> pcollection;
+ @Mock
+ private ReadableSourceTarget<String> readableSourceTarget;
+ @Mock
+ private SourceTarget<String> nonReadableSourceTarget;
+ private MRPipeline pipeline;
+
+ @Before
+ public void setUp() throws IOException {
+ Configuration conf = new Configuration();
+ conf.set(RuntimeParameters.TMP_DIR, tempDir.getRoot().getAbsolutePath());
+ pipeline = spy(new MRPipeline(MRPipelineTest.class, conf));
+ }
+
+ @Test
+ public void testGetMaterializeSourceTarget_AlreadyMaterialized() {
+ when(pcollection.getMaterializedAt()).thenReturn(readableSourceTarget);
+
+ assertEquals(readableSourceTarget, pipeline.getMaterializeSourceTarget(pcollection));
+ }
+
+ @Test
+ public void testGetMaterializeSourceTarget_NotMaterialized_HasOutput() {
+ when(pcollection.getPType()).thenReturn(Avros.strings());
+ doReturn(readableSourceTarget).when(pipeline).createIntermediateOutput(Avros.strings());
+ when(pcollection.getMaterializedAt()).thenReturn(null);
+
+ assertEquals(readableSourceTarget, pipeline.getMaterializeSourceTarget(pcollection));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testGetMaterializeSourceTarget_NotMaterialized_NotReadableSourceTarget() {
+ when(pcollection.getPType()).thenReturn(Avros.strings());
+ doReturn(nonReadableSourceTarget).when(pipeline).createIntermediateOutput(Avros.strings());
+ when(pcollection.getMaterializedAt()).thenReturn(null);
+
+ pipeline.getMaterializeSourceTarget(pcollection);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoCollectionImplTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoCollectionImplTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoCollectionImplTest.java
new file mode 100644
index 0000000..fd582bc
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoCollectionImplTest.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.collect;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.impl.mr.plan.DoNode;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.writable.Writables;
+import org.junit.Test;
+
+public class DoCollectionImplTest {
+
+ @Test
+ public void testGetSizeInternal_NoScaleFactor() {
+ runScaleTest(100L, 1.0f, 100L);
+ }
+
+ @Test
+ public void testGetSizeInternal_ScaleFactorBelowZero() {
+ runScaleTest(100L, 0.5f, 50L);
+ }
+
+ @Test
+ public void testGetSizeInternal_ScaleFactorAboveZero() {
+ runScaleTest(100L, 1.5f, 150L);
+ }
+
+ private void runScaleTest(long inputSize, float scaleFactor, long expectedScaledSize) {
+ PCollectionImpl<String> parentCollection = new SizedPCollectionImpl("Sized collection", inputSize);
+
+ DoCollectionImpl<String> doCollectionImpl = new DoCollectionImpl<String>("Scaled collection", parentCollection,
+ new ScaledFunction(scaleFactor), Writables.strings());
+
+ assertEquals(expectedScaledSize, doCollectionImpl.getSizeInternal());
+ }
+
+ static class ScaledFunction extends DoFn<String, String> {
+
+ private float scaleFactor;
+
+ public ScaledFunction(float scaleFactor) {
+ this.scaleFactor = scaleFactor;
+ }
+
+ @Override
+ public void process(String input, Emitter<String> emitter) {
+ emitter.emit(input);
+ }
+
+ @Override
+ public float scaleFactor() {
+ return scaleFactor;
+ }
+
+ }
+
+ static class SizedPCollectionImpl extends PCollectionImpl<String> {
+
+ private long internalSize;
+
+ public SizedPCollectionImpl(String name, long internalSize) {
+ super(name);
+ this.internalSize = internalSize;
+ }
+
+ @Override
+ public PType getPType() {
+ return null;
+ }
+
+ @Override
+ public DoNode createDoNode() {
+ return null;
+ }
+
+ @Override
+ public List getParents() {
+ return null;
+ }
+
+ @Override
+ protected void acceptInternal(Visitor visitor) {
+ }
+
+ @Override
+ protected long getSizeInternal() {
+ return internalSize;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoTableImplTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoTableImplTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoTableImplTest.java
new file mode 100644
index 0000000..89b9944
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoTableImplTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.collect;
+
+import static org.apache.crunch.types.writable.Writables.strings;
+import static org.apache.crunch.types.writable.Writables.tableOf;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+import org.junit.Test;
+
+public class DoTableImplTest {
+
+ @Test
+ public void testGetSizeInternal_NoScaleFactor() {
+ runScaleTest(100L, 1.0f, 100L);
+ }
+
+ @Test
+ public void testGetSizeInternal_ScaleFactorBelowZero() {
+ runScaleTest(100L, 0.5f, 50L);
+ }
+
+ @Test
+ public void testGetSizeInternal_ScaleFactorAboveZero() {
+ runScaleTest(100L, 1.5f, 150L);
+ }
+
+ private void runScaleTest(long inputSize, float scaleFactor, long expectedScaledSize) {
+
+ @SuppressWarnings("unchecked")
+ PCollectionImpl<String> parentCollection = (PCollectionImpl<String>) mock(PCollectionImpl.class);
+
+ when(parentCollection.getSize()).thenReturn(inputSize);
+
+ DoTableImpl<String, String> doTableImpl = new DoTableImpl<String, String>("Scalled table collection",
+ parentCollection, new TableScaledFunction(scaleFactor), tableOf(strings(), strings()));
+
+ assertEquals(expectedScaledSize, doTableImpl.getSizeInternal());
+
+ verify(parentCollection).getSize();
+
+ verifyNoMoreInteractions(parentCollection);
+ }
+
+ static class TableScaledFunction extends DoFn<String, Pair<String, String>> {
+
+ private float scaleFactor;
+
+ public TableScaledFunction(float scaleFactor) {
+ this.scaleFactor = scaleFactor;
+ }
+
+ @Override
+ public float scaleFactor() {
+ return scaleFactor;
+ }
+
+ @Override
+ public void process(String input, Emitter<Pair<String, String>> emitter) {
+ emitter.emit(Pair.of(input, input));
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java
new file mode 100644
index 0000000..dd72364
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.emit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+import org.apache.crunch.impl.mr.run.RTNode;
+import org.apache.crunch.test.StringWrapper;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import com.google.common.collect.Lists;
+
+public class IntermediateEmitterTest {
+
+ private StringWrapper stringWrapper;
+ private PType ptype;
+
+ @Before
+ public void setUp() {
+ stringWrapper = new StringWrapper("test");
+ ptype = spy(Avros.reflects(StringWrapper.class));
+ }
+
+ @Test
+ public void testEmit_SingleChild() {
+ RTNode singleChild = mock(RTNode.class);
+ IntermediateEmitter emitter = new IntermediateEmitter(ptype, Lists.newArrayList(singleChild),
+ new Configuration());
+ emitter.emit(stringWrapper);
+
+ ArgumentCaptor<StringWrapper> argumentCaptor = ArgumentCaptor.forClass(StringWrapper.class);
+ verify(singleChild).process(argumentCaptor.capture());
+ assertSame(stringWrapper, argumentCaptor.getValue());
+ }
+
+ @Test
+ public void testEmit_MultipleChildren() {
+ RTNode childA = mock(RTNode.class);
+ RTNode childB = mock(RTNode.class);
+ IntermediateEmitter emitter = new IntermediateEmitter(ptype, Lists.newArrayList(childA, childB),
+ new Configuration());
+ emitter.emit(stringWrapper);
+
+ ArgumentCaptor<StringWrapper> argumentCaptorA = ArgumentCaptor.forClass(StringWrapper.class);
+ ArgumentCaptor<StringWrapper> argumentCaptorB = ArgumentCaptor.forClass(StringWrapper.class);
+
+ verify(childA).process(argumentCaptorA.capture());
+ verify(childB).process(argumentCaptorB.capture());
+
+ assertEquals(stringWrapper, argumentCaptorA.getValue());
+ assertEquals(stringWrapper, argumentCaptorB.getValue());
+
+ // Make sure that multiple children means deep copies are performed
+ assertNotSame(stringWrapper, argumentCaptorA.getValue());
+ assertNotSame(stringWrapper, argumentCaptorB.getValue());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/impl/mr/exec/CappedExponentialCounterTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/exec/CappedExponentialCounterTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mr/exec/CappedExponentialCounterTest.java
new file mode 100644
index 0000000..958df12
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/impl/mr/exec/CappedExponentialCounterTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.exec;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+public class CappedExponentialCounterTest {
+
+ @Test
+ public void testGet() {
+ CappedExponentialCounter c = new CappedExponentialCounter(1L, Long.MAX_VALUE);
+ assertEquals(1L, c.get());
+ assertEquals(2L, c.get());
+ assertEquals(4L, c.get());
+ assertEquals(8L, c.get());
+ }
+
+ @Test
+ public void testCap() {
+ CappedExponentialCounter c = new CappedExponentialCounter(1L, 2);
+ assertEquals(1L, c.get());
+ assertEquals(2L, c.get());
+ assertEquals(2L, c.get());
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobHooksTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobHooksTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobHooksTest.java
new file mode 100644
index 0000000..f03c3e2
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobHooksTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.exec;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+public class CrunchJobHooksTest {
+
+ @Test
+ public void testExtractPartitionNumber() {
+ assertEquals(0, CrunchJobHooks.extractPartitionNumber("out1-r-00000"));
+ assertEquals(10, CrunchJobHooks.extractPartitionNumber("out2-r-00010"));
+ assertEquals(99999, CrunchJobHooks.extractPartitionNumber("out3-r-99999"));
+ }
+
+ @Test
+ public void testExtractPartitionNumber_WithSuffix() {
+ assertEquals(10, CrunchJobHooks.extractPartitionNumber("out2-r-00010.avro"));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testExtractPartitionNumber_MapOutputFile() {
+ CrunchJobHooks.extractPartitionNumber("out1-m-00000");
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java
new file mode 100644
index 0000000..562238d
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+
+import org.apache.crunch.Source;
+import org.apache.crunch.Target;
+import org.apache.crunch.impl.mr.collect.InputCollection;
+import org.apache.crunch.impl.mr.collect.PCollectionImpl;
+import org.apache.crunch.impl.mr.plan.DotfileWriter.MRTaskType;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.google.common.collect.Lists;
+
+public class DotfileWriterTest {
+
+ private DotfileWriter dotfileWriter;
+
+ @Before
+ public void setUp() {
+ dotfileWriter = new DotfileWriter();
+ }
+
+ @Test
+ public void testFormatPCollectionNodeDeclaration() {
+ PCollectionImpl<?> pcollectionImpl = mock(PCollectionImpl.class);
+ JobPrototype jobPrototype = mock(JobPrototype.class);
+ when(pcollectionImpl.getName()).thenReturn("collection");
+
+ assertEquals("\"collection@" + pcollectionImpl.hashCode() + "@" + jobPrototype.hashCode()
+ + "\" [label=\"collection\" shape=box];",
+ dotfileWriter.formatPCollectionNodeDeclaration(pcollectionImpl, jobPrototype));
+ }
+
+ @Test
+ public void testFormatPCollectionNodeDeclaration_InputPCollection() {
+ InputCollection<?> inputCollection = mock(InputCollection.class, Mockito.RETURNS_DEEP_STUBS);
+ JobPrototype jobPrototype = mock(JobPrototype.class);
+ when(inputCollection.getName()).thenReturn("input");
+ when(inputCollection.getSource().toString()).thenReturn("source");
+
+ assertEquals("\"source\" [label=\"input\" shape=folder];",
+ dotfileWriter.formatPCollectionNodeDeclaration(inputCollection, jobPrototype));
+ }
+
+ @Test
+ public void testFormatTargetNodeDeclaration() {
+ Target target = mock(Target.class);
+ when(target.toString()).thenReturn("target/path");
+
+ assertEquals("\"target/path\" [label=\"target/path\" shape=folder];",
+ dotfileWriter.formatTargetNodeDeclaration(target));
+ }
+
+ @Test
+ public void testFormatPCollection() {
+ PCollectionImpl<?> pcollectionImpl = mock(PCollectionImpl.class);
+ JobPrototype jobPrototype = mock(JobPrototype.class);
+ when(pcollectionImpl.getName()).thenReturn("collection");
+
+ assertEquals("\"collection@" + pcollectionImpl.hashCode() + "@" + jobPrototype.hashCode() + "\"",
+ dotfileWriter.formatPCollection(pcollectionImpl, jobPrototype));
+ }
+
+ @Test
+ public void testFormatPCollection_InputCollection() {
+ InputCollection<Object> inputCollection = mock(InputCollection.class);
+ Source<Object> source = mock(Source.class);
+ JobPrototype jobPrototype = mock(JobPrototype.class);
+ when(source.toString()).thenReturn("mocksource");
+ when(inputCollection.getSource()).thenReturn(source);
+
+ assertEquals("\"mocksource\"", dotfileWriter.formatPCollection(inputCollection, jobPrototype));
+ }
+
+ @Test
+ public void testFormatNodeCollection() {
+ List<String> nodeCollection = Lists.newArrayList("one", "two", "three");
+ assertEquals("one -> two -> three;", dotfileWriter.formatNodeCollection(nodeCollection));
+ }
+
+ @Test
+ public void testFormatNodePath() {
+ PCollectionImpl<?> tail = mock(PCollectionImpl.class);
+ PCollectionImpl<?> head = mock(PCollectionImpl.class);
+ JobPrototype jobPrototype = mock(JobPrototype.class);
+
+ when(tail.getName()).thenReturn("tail");
+ when(head.getName()).thenReturn("head");
+
+ NodePath nodePath = new NodePath(tail);
+ nodePath.close(head);
+
+ assertEquals(
+ Lists.newArrayList("\"head@" + head.hashCode() + "@" + jobPrototype.hashCode() + "\" -> \"tail@"
+ + tail.hashCode() + "@" + jobPrototype.hashCode() + "\";"),
+ dotfileWriter.formatNodePath(nodePath, jobPrototype));
+ }
+
+ @Test
+ public void testGetTaskGraphAttributes_Map() {
+ assertEquals("label = Map; color = blue;", dotfileWriter.getTaskGraphAttributes(MRTaskType.MAP));
+ }
+
+ @Test
+ public void testGetTaskGraphAttributes_Reduce() {
+ assertEquals("label = Reduce; color = red;", dotfileWriter.getTaskGraphAttributes(MRTaskType.REDUCE));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java
new file mode 100644
index 0000000..7963c83
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.crunch.types.writable.Writables;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class JobNameBuilderTest {
+
+ @Test
+ public void testBuild() {
+ final String pipelineName = "PipelineName";
+ final String nodeName = "outputNode";
+ DoNode doNode = DoNode.createOutputNode(nodeName, Writables.strings());
+ JobNameBuilder jobNameBuilder = new JobNameBuilder(pipelineName);
+ jobNameBuilder.visit(Lists.newArrayList(doNode));
+ String jobName = jobNameBuilder.build();
+
+ assertEquals(String.format("%s: %s", pipelineName, nodeName), jobName);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/io/SequentialFileNamingSchemeTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/io/SequentialFileNamingSchemeTest.java b/crunch-core/src/test/java/org/apache/crunch/io/SequentialFileNamingSchemeTest.java
new file mode 100644
index 0000000..467da15
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/io/SequentialFileNamingSchemeTest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class SequentialFileNamingSchemeTest {
+
+ // The partition id used for testing. This partition id should be ignored by
+ // the SequentialFileNamingScheme.
+ private static final int PARTITION_ID = 42;
+
+ private SequentialFileNamingScheme namingScheme;
+ private Configuration configuration;
+
+ @Rule
+ public TemporaryFolder tmpOutputDir = new TemporaryFolder();
+
+ @Before
+ public void setUp() throws IOException {
+ configuration = new Configuration();
+ namingScheme = new SequentialFileNamingScheme();
+ }
+
+ @Test
+ public void testGetMapOutputName_EmptyDirectory() throws IOException {
+ assertEquals("part-m-00000",
+ namingScheme.getMapOutputName(configuration, new Path(tmpOutputDir.getRoot().getAbsolutePath())));
+ }
+
+ @Test
+ public void testGetMapOutputName_NonEmptyDirectory() throws IOException {
+ File outputDirectory = tmpOutputDir.getRoot();
+
+ new File(outputDirectory, "existing-1").createNewFile();
+ new File(outputDirectory, "existing-2").createNewFile();
+
+ assertEquals("part-m-00002",
+ namingScheme.getMapOutputName(configuration, new Path(outputDirectory.getAbsolutePath())));
+ }
+
+ @Test
+ public void testGetReduceOutputName_EmptyDirectory() throws IOException {
+ assertEquals("part-r-00000", namingScheme.getReduceOutputName(configuration, new Path(tmpOutputDir.getRoot()
+ .getAbsolutePath()), PARTITION_ID));
+ }
+
+ @Test
+ public void testGetReduceOutputName_NonEmptyDirectory() throws IOException {
+ File outputDirectory = tmpOutputDir.getRoot();
+
+ new File(outputDirectory, "existing-1").createNewFile();
+ new File(outputDirectory, "existing-2").createNewFile();
+
+ assertEquals("part-r-00002",
+ namingScheme.getReduceOutputName(configuration, new Path(outputDirectory.getAbsolutePath()), PARTITION_ID));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/io/SourceTargetHelperTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/io/SourceTargetHelperTest.java b/crunch-core/src/test/java/org/apache/crunch/io/SourceTargetHelperTest.java
new file mode 100644
index 0000000..5b0ea55
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/io/SourceTargetHelperTest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+public class SourceTargetHelperTest {
+
+ @Test
+ public void testGetNonexistentPathSize() throws Exception {
+ File tmp = File.createTempFile("pathsize", "");
+ Path tmpPath = new Path(tmp.getAbsolutePath());
+ tmp.delete();
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+ assertEquals(-1L, SourceTargetHelper.getPathSize(fs, tmpPath));
+ }
+
+ @Test
+ public void testGetNonExistentPathSize_NonExistantPath() throws IOException {
+ FileSystem mockFs = new MockFileSystem();
+ assertEquals(-1L, SourceTargetHelper.getPathSize(mockFs, new Path("does/not/exist")));
+ }
+
+ /**
+ * Mock FileSystem that returns null for {@link FileSystem#listStatus(Path)}.
+ */
+ static class MockFileSystem extends LocalFileSystem {
+
+ @Override
+ public FileStatus[] listStatus(Path f) throws IOException {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/io/avro/AvroFileReaderFactoryTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/io/avro/AvroFileReaderFactoryTest.java b/crunch-core/src/test/java/org/apache/crunch/io/avro/AvroFileReaderFactoryTest.java
new file mode 100644
index 0000000..62085f8
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/io/avro/AvroFileReaderFactoryTest.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.avro;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.crunch.Pair;
+import org.apache.crunch.test.Person;
+import org.apache.crunch.test.StringWrapper;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class AvroFileReaderFactoryTest {
+
+ private File avroFile;
+
+ @Before
+ public void setUp() throws IOException {
+ avroFile = File.createTempFile("test", ".av");
+ }
+
+ @After
+ public void tearDown() {
+ avroFile.delete();
+ }
+
+ private void populateGenericFile(List<GenericRecord> genericRecords, Schema outputSchema) throws IOException {
+ FileOutputStream outputStream = new FileOutputStream(this.avroFile);
+ GenericDatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<GenericRecord>(outputSchema);
+
+ DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(genericDatumWriter);
+ dataFileWriter.create(outputSchema, outputStream);
+
+ for (GenericRecord record : genericRecords) {
+ dataFileWriter.append(record);
+ }
+
+ dataFileWriter.close();
+ outputStream.close();
+
+ }
+
+ private <T> AvroFileReaderFactory<T> createFileReaderFactory(AvroType<T> avroType) {
+ return new AvroFileReaderFactory<T>(avroType);
+ }
+
+ @Test
+ public void testRead_GenericReader() throws IOException {
+ GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
+ savedRecord.put("name", "John Doe");
+ savedRecord.put("age", 42);
+ savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+ populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+ AvroFileReaderFactory<GenericData.Record> genericReader = createFileReaderFactory(Avros.generics(Person.SCHEMA$));
+ Iterator<GenericData.Record> recordIterator = genericReader.read(FileSystem.getLocal(new Configuration()),
+ new Path(this.avroFile.getAbsolutePath()));
+
+ GenericRecord genericRecord = recordIterator.next();
+ assertEquals(savedRecord, genericRecord);
+ assertFalse(recordIterator.hasNext());
+ }
+
+ @Test
+ public void testRead_SpecificReader() throws IOException {
+ GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
+ savedRecord.put("name", "John Doe");
+ savedRecord.put("age", 42);
+ savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+ populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+ AvroFileReaderFactory<Person> genericReader = createFileReaderFactory(Avros.records(Person.class));
+ Iterator<Person> recordIterator = genericReader.read(FileSystem.getLocal(new Configuration()), new Path(
+ this.avroFile.getAbsolutePath()));
+
+ Person expectedPerson = new Person();
+ expectedPerson.age = 42;
+ expectedPerson.name = "John Doe";
+ List<CharSequence> siblingNames = Lists.newArrayList();
+ siblingNames.add("Jimmy");
+ siblingNames.add("Jane");
+ expectedPerson.siblingnames = siblingNames;
+
+ Person person = recordIterator.next();
+
+ assertEquals(expectedPerson, person);
+ assertFalse(recordIterator.hasNext());
+ }
+
+ @Test
+ public void testRead_ReflectReader() throws IOException {
+ Schema reflectSchema = ReflectData.get().getSchema(StringWrapper.class);
+ GenericRecord savedRecord = new GenericData.Record(reflectSchema);
+ savedRecord.put("value", "stringvalue");
+ populateGenericFile(Lists.newArrayList(savedRecord), reflectSchema);
+
+ AvroFileReaderFactory<StringWrapper> genericReader = createFileReaderFactory(Avros.reflects(StringWrapper.class));
+ Iterator<StringWrapper> recordIterator = genericReader.read(FileSystem.getLocal(new Configuration()), new Path(
+ this.avroFile.getAbsolutePath()));
+
+ StringWrapper stringWrapper = recordIterator.next();
+
+ assertEquals("stringvalue", stringWrapper.getValue());
+ assertFalse(recordIterator.hasNext());
+ }
+
+ @Test
+ public void testCreateDatumReader_Generic() {
+ DatumReader<Record> datumReader = AvroFileReaderFactory.createDatumReader(Avros.generics(Person.SCHEMA$));
+ assertEquals(GenericDatumReader.class, datumReader.getClass());
+ }
+
+ @Test
+ public void testCreateDatumReader_Reflect() {
+ DatumReader<StringWrapper> datumReader = AvroFileReaderFactory.createDatumReader(Avros
+ .reflects(StringWrapper.class));
+ assertEquals(ReflectDatumReader.class, datumReader.getClass());
+ }
+
+ @Test
+ public void testCreateDatumReader_Specific() {
+ DatumReader<Person> datumReader = AvroFileReaderFactory.createDatumReader(Avros.records(Person.class));
+ assertEquals(SpecificDatumReader.class, datumReader.getClass());
+ }
+
+ @Test
+ public void testCreateDatumReader_ReflectAndSpecific() {
+ Assume.assumeTrue(Avros.CAN_COMBINE_SPECIFIC_AND_REFLECT_SCHEMAS);
+
+ DatumReader<Pair<Person, StringWrapper>> datumReader = AvroFileReaderFactory.createDatumReader(Avros.pairs(
+ Avros.records(Person.class), Avros.reflects(StringWrapper.class)));
+ assertEquals(ReflectDatumReader.class, datumReader.getClass());
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testCreateDatumReader_ReflectAndSpecific_NotSupported() {
+ Assume.assumeTrue(!Avros.CAN_COMBINE_SPECIFIC_AND_REFLECT_SCHEMAS);
+ AvroFileReaderFactory.createDatumReader(Avros.pairs(Avros.records(Person.class),
+ Avros.reflects(StringWrapper.class)));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/io/avro/AvroFileSourceTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/io/avro/AvroFileSourceTest.java b/crunch-core/src/test/java/org/apache/crunch/io/avro/AvroFileSourceTest.java
new file mode 100644
index 0000000..ceef2b2
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/io/avro/AvroFileSourceTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.avro;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.mapred.AvroJob;
+import org.apache.crunch.test.Person;
+import org.apache.crunch.test.StringWrapper;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AvroFileSourceTest {
+
+ private Job job;
+ File tempFile;
+
+ @Before
+ public void setUp() throws IOException {
+ job = new Job();
+ tempFile = File.createTempFile("test", ".avr");
+ }
+
+ @After
+ public void tearDown() {
+ tempFile.delete();
+ }
+
+ @Test
+ public void testConfigureJob_SpecificData() throws IOException {
+ AvroType<Person> avroSpecificType = Avros.records(Person.class);
+ AvroFileSource<Person> personFileSource = new AvroFileSource<Person>(new Path(tempFile.getAbsolutePath()),
+ avroSpecificType);
+
+ personFileSource.configureSource(job, -1);
+
+ assertFalse(job.getConfiguration().getBoolean(AvroJob.INPUT_IS_REFLECT, true));
+ assertEquals(Person.SCHEMA$.toString(), job.getConfiguration().get(AvroJob.INPUT_SCHEMA));
+ }
+
+ @Test
+ public void testConfigureJob_GenericData() throws IOException {
+ AvroType<Record> avroGenericType = Avros.generics(Person.SCHEMA$);
+ AvroFileSource<Record> personFileSource = new AvroFileSource<Record>(new Path(tempFile.getAbsolutePath()),
+ avroGenericType);
+
+ personFileSource.configureSource(job, -1);
+
+ assertFalse(job.getConfiguration().getBoolean(AvroJob.INPUT_IS_REFLECT, true));
+
+ }
+
+ @Test
+ public void testConfigureJob_ReflectData() throws IOException {
+ AvroType<StringWrapper> avroReflectType = Avros.reflects(StringWrapper.class);
+ AvroFileSource<StringWrapper> personFileSource = new AvroFileSource<StringWrapper>(new Path(
+ tempFile.getAbsolutePath()), avroReflectType);
+
+ personFileSource.configureSource(job, -1);
+
+ assertTrue(job.getConfiguration().getBoolean(AvroJob.INPUT_IS_REFLECT, false));
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/lib/AvroIndexedRecordPartitionerTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/AvroIndexedRecordPartitionerTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/AvroIndexedRecordPartitionerTest.java
new file mode 100644
index 0000000..0dfed32
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/lib/AvroIndexedRecordPartitionerTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.crunch.lib.join.JoinUtils.AvroIndexedRecordPartitioner;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AvroIndexedRecordPartitionerTest {
+
+ private AvroIndexedRecordPartitioner avroPartitioner;
+
+ @Before
+ public void setUp() {
+ avroPartitioner = new AvroIndexedRecordPartitioner();
+ }
+
+ @Test
+ public void testGetPartition() {
+ IndexedRecord indexedRecord = new MockIndexedRecord(3);
+ AvroKey<IndexedRecord> avroKey = new AvroKey<IndexedRecord>(indexedRecord);
+
+ assertEquals(3, avroPartitioner.getPartition(avroKey, new AvroValue<Object>(), 5));
+ assertEquals(1, avroPartitioner.getPartition(avroKey, new AvroValue<Object>(), 2));
+ }
+
+ @Test
+ public void testGetPartition_NegativeHashValue() {
+ IndexedRecord indexedRecord = new MockIndexedRecord(-3);
+ AvroKey<IndexedRecord> avroKey = new AvroKey<IndexedRecord>(indexedRecord);
+
+ assertEquals(3, avroPartitioner.getPartition(avroKey, new AvroValue<Object>(), 5));
+ assertEquals(1, avroPartitioner.getPartition(avroKey, new AvroValue<Object>(), 2));
+ }
+
+ @Test
+ public void testGetPartition_IntegerMinValue() {
+ IndexedRecord indexedRecord = new MockIndexedRecord(Integer.MIN_VALUE);
+ AvroKey<IndexedRecord> avroKey = new AvroKey<IndexedRecord>(indexedRecord);
+
+ assertEquals(0, avroPartitioner.getPartition(avroKey, new AvroValue<Object>(), Integer.MAX_VALUE));
+ }
+
+ /**
+ * Mock implementation of IndexedRecord to give us control over the hashCode.
+ */
+ static class MockIndexedRecord implements IndexedRecord {
+
+ private Integer value;
+
+ public MockIndexedRecord(Integer value) {
+ this.value = value;
+ }
+
+ @Override
+ public int hashCode() {
+ return value.hashCode();
+ }
+
+ @Override
+ public Schema getSchema() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Object get(int arg0) {
+ return this.value;
+ }
+
+ @Override
+ public void put(int arg0, Object arg1) {
+ throw new UnsupportedOperationException();
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/lib/CartesianTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/CartesianTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/CartesianTest.java
new file mode 100644
index 0000000..b19097c
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/lib/CartesianTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.types.writable.Writables;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class CartesianTest {
+
+ @Test
+ public void testCartesianCollection_SingleValues() {
+
+ PCollection<String> letters = MemPipeline.typedCollectionOf(Writables.strings(), "a", "b");
+ PCollection<Integer> ints = MemPipeline.typedCollectionOf(Writables.ints(), 1, 2);
+
+ PCollection<Pair<String, Integer>> cartesianProduct = Cartesian.cross(letters, ints);
+
+ @SuppressWarnings("unchecked")
+ List<Pair<String, Integer>> expectedResults = Lists.newArrayList(Pair.of("a", 1), Pair.of("a", 2), Pair.of("b", 1),
+ Pair.of("b", 2));
+ List<Pair<String, Integer>> actualResults = Lists.newArrayList(cartesianProduct.materialize());
+ Collections.sort(actualResults);
+
+ assertEquals(expectedResults, actualResults);
+ }
+
+ @Test
+ public void testCartesianCollection_Tables() {
+
+ PTable<String, Integer> leftTable = MemPipeline.typedTableOf(
+ Writables.tableOf(Writables.strings(), Writables.ints()), "a", 1, "b", 2);
+ PTable<String, Float> rightTable = MemPipeline.typedTableOf(
+ Writables.tableOf(Writables.strings(), Writables.floats()), "A", 1.0f, "B", 2.0f);
+
+ PTable<Pair<String, String>, Pair<Integer, Float>> cartesianProduct = Cartesian.cross(leftTable, rightTable);
+
+ List<Pair<Pair<String, String>, Pair<Integer, Float>>> expectedResults = Lists.newArrayList();
+ expectedResults.add(Pair.of(Pair.of("a", "A"), Pair.of(1, 1.0f)));
+ expectedResults.add(Pair.of(Pair.of("a", "B"), Pair.of(1, 2.0f)));
+ expectedResults.add(Pair.of(Pair.of("b", "A"), Pair.of(2, 1.0f)));
+ expectedResults.add(Pair.of(Pair.of("b", "B"), Pair.of(2, 2.0f)));
+
+ List<Pair<Pair<String, String>, Pair<Integer, Float>>> actualResults = Lists.newArrayList(cartesianProduct
+ .materialize());
+ Collections.sort(actualResults);
+
+ assertEquals(expectedResults, actualResults);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/lib/DistinctTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/DistinctTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/DistinctTest.java
new file mode 100644
index 0000000..8c0b3bf
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/lib/DistinctTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.types.avro.Avros;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableSet;
+
+public class DistinctTest {
+ private static final List<Integer> DATA = Arrays.asList(
+ 17, 29, 17, 29, 17, 29, 36, 45, 17, 45, 36, 29
+ );
+
+ @Test
+ public void testDistinct() {
+ PCollection<Integer> input = MemPipeline.typedCollectionOf(Avros.ints(), DATA);
+ Iterable<Integer> unique = Distinct.distinct(input).materialize();
+
+ assertEquals(ImmutableSet.copyOf(DATA), ImmutableSet.copyOf(unique));
+ }
+
+ @Test
+ public void testDistinctFlush() {
+ PCollection<Integer> input = MemPipeline.typedCollectionOf(Avros.ints(), DATA);
+ Iterable<Integer> unique = Distinct.distinct(input, 2).materialize();
+
+ assertEquals(ImmutableSet.copyOf(DATA), ImmutableSet.copyOf(unique));
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/lib/SampleTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/SampleTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/SampleTest.java
new file mode 100644
index 0000000..bd6fd81
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/lib/SampleTest.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.types.writable.Writables;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
+public class SampleTest {
+ private PCollection<Pair<String, Double>> values = MemPipeline.typedCollectionOf(
+ Writables.pairs(Writables.strings(), Writables.doubles()),
+ ImmutableList.of(
+ Pair.of("foo", 200.0),
+ Pair.of("bar", 400.0),
+ Pair.of("baz", 100.0),
+ Pair.of("biz", 100.0)));
+
+ @Test
+ public void testWRS() throws Exception {
+ Map<String, Integer> histogram = Maps.newHashMap();
+
+ for (int i = 0; i < 100; i++) {
+ PCollection<String> sample = Sample.weightedReservoirSample(values, 1, 1729L + i);
+ for (String s : sample.materialize()) {
+ if (!histogram.containsKey(s)) {
+ histogram.put(s, 1);
+ } else {
+ histogram.put(s, 1 + histogram.get(s));
+ }
+ }
+ }
+
+ Map<String, Integer> expected = ImmutableMap.of(
+ "foo", 24, "bar", 51, "baz", 13, "biz", 12);
+ assertEquals(expected, histogram);
+ }
+
+ @Test
+ public void testSample() {
+ PCollection<Integer> pcollect = MemPipeline.collectionOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+ Iterable<Integer> sample = Sample.sample(pcollect, 123998L, 0.2).materialize();
+ List<Integer> sampleValues = ImmutableList.copyOf(sample);
+ assertEquals(ImmutableList.of(6, 7), sampleValues);
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/lib/SecondarySortTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/SecondarySortTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/SecondarySortTest.java
new file mode 100644
index 0000000..933b986
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/lib/SecondarySortTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.apache.crunch.types.avro.Avros.*;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+
+
+public class SecondarySortTest {
+ @Test
+ public void testInMemory() throws Exception {
+ PTable<Long, Pair<Long, String>> input = MemPipeline.typedTableOf(tableOf(longs(), pairs(longs(), strings())),
+ 1729L, Pair.of(17L, "a"), 100L, Pair.of(29L, "b"), 1729L, Pair.of(29L, "c"));
+ PCollection<String> letters = SecondarySort.sortAndApply(input, new StringifyFn(), strings());
+ assertEquals(ImmutableList.of("b", "ac"), letters.materialize());
+ }
+
+ private static class StringifyFn extends DoFn<Pair<Long, Iterable<Pair<Long, String>>>, String> {
+ @Override
+ public void process(Pair<Long, Iterable<Pair<Long, String>>> input, Emitter<String> emitter) {
+ StringBuilder sb = new StringBuilder();
+ for (Pair<Long, String> p : input.second()) {
+ sb.append(p.second());
+ }
+ emitter.emit(sb.toString());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java
new file mode 100644
index 0000000..35ccc11
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.crunch.lib.join.JoinUtils.TupleWritablePartitioner;
+import org.apache.crunch.types.writable.TupleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TupleWritablePartitionerTest {
+
+ private TupleWritablePartitioner tupleWritableParitioner;
+
+ @Before
+ public void setUp() {
+ tupleWritableParitioner = new TupleWritablePartitioner();
+ }
+
+ @Test
+ public void testGetPartition() {
+ IntWritable intWritable = new IntWritable(3);
+ TupleWritable key = new TupleWritable(new Writable[] { intWritable });
+ assertEquals(3, tupleWritableParitioner.getPartition(key, NullWritable.get(), 5));
+ assertEquals(1, tupleWritableParitioner.getPartition(key, NullWritable.get(), 2));
+ }
+
+ @Test
+ public void testGetPartition_NegativeHashValue() {
+ IntWritable intWritable = new IntWritable(-3);
+ // Sanity check, if this doesn't work then the premise of this test is wrong
+ assertEquals(-3, intWritable.hashCode());
+
+ TupleWritable key = new TupleWritable(new Writable[] { intWritable });
+ assertEquals(3, tupleWritableParitioner.getPartition(key, NullWritable.get(), 5));
+ assertEquals(1, tupleWritableParitioner.getPartition(key, NullWritable.get(), 2));
+ }
+
+ @Test
+ public void testGetPartition_IntegerMinValue() {
+ IntWritable intWritable = new IntWritable(Integer.MIN_VALUE);
+ // Sanity check, if this doesn't work then the premise of this test is wrong
+ assertEquals(Integer.MIN_VALUE, intWritable.hashCode());
+
+ TupleWritable key = new TupleWritable(new Writable[] { intWritable });
+ assertEquals(0, tupleWritableParitioner.getPartition(key, NullWritable.get(), Integer.MAX_VALUE));
+ }
+
+}