You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/04/23 22:41:44 UTC

[42/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/MapsIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/MapsIT.java b/crunch-core/src/it/java/org/apache/crunch/MapsIT.java
new file mode 100644
index 0000000..5b3187b
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/MapsIT.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.util.Map;
+
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
+public class MapsIT {
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Test
+  public void testWritables() throws Exception {
+    run(WritableTypeFamily.getInstance(), tmpDir);
+  }
+
+  @Test
+  public void testAvros() throws Exception {
+    run(AvroTypeFamily.getInstance(), tmpDir);
+  }
+
+  public static void run(PTypeFamily typeFamily, TemporaryPath tmpDir) throws Exception {
+    Pipeline pipeline = new MRPipeline(MapsIT.class, tmpDir.getDefaultConfiguration());
+    String shakesInputPath = tmpDir.copyResourceFileName("shakes.txt");
+    PCollection<String> shakespeare = pipeline.readTextFile(shakesInputPath);
+    Iterable<Pair<String, Map<String, Long>>> output = shakespeare
+        .parallelDo(new DoFn<String, Pair<String, Map<String, Long>>>() {
+          @Override
+          public void process(String input, Emitter<Pair<String, Map<String, Long>>> emitter) {
+            String last = null;
+            for (String word : input.toLowerCase().split("\\W+")) {
+              if (!word.isEmpty()) {
+                String firstChar = word.substring(0, 1);
+                if (last != null) {
+                  Map<String, Long> cc = ImmutableMap.of(firstChar, 1L);
+                  emitter.emit(Pair.of(last, cc));
+                }
+                last = firstChar;
+              }
+            }
+          }
+        }, typeFamily.tableOf(typeFamily.strings(), typeFamily.maps(typeFamily.longs()))).groupByKey()
+        .combineValues(new CombineFn<String, Map<String, Long>>() {
+          @Override
+          public void process(Pair<String, Iterable<Map<String, Long>>> input,
+              Emitter<Pair<String, Map<String, Long>>> emitter) {
+            Map<String, Long> agg = Maps.newHashMap();
+            for (Map<String, Long> in : input.second()) {
+              for (Map.Entry<String, Long> e : in.entrySet()) {
+                if (!agg.containsKey(e.getKey())) {
+                  agg.put(e.getKey(), e.getValue());
+                } else {
+                  agg.put(e.getKey(), e.getValue() + agg.get(e.getKey()));
+                }
+              }
+            }
+            emitter.emit(Pair.of(input.first(), agg));
+          }
+        }).materialize();
+
+    boolean passed = false;
+    for (Pair<String, Map<String, Long>> v : output) {
+      if (v.first().equals("k") && v.second().get("n") == 8L) {
+        passed = true;
+        break;
+      }
+    }
+    pipeline.done();
+
+    assertThat(passed, is(true));
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java b/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java
new file mode 100644
index 0000000..d064993
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.crunch.fn.FilterFns;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.Person;
+import org.apache.crunch.test.StringWrapper;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.junit.Assume;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class MaterializeIT {
+
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Test
+  public void testMaterializeInput_Writables() throws IOException {
+    runMaterializeInput(new MRPipeline(MaterializeIT.class, tmpDir.getDefaultConfiguration()),
+        WritableTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testMaterializeInput_Avro() throws IOException {
+    runMaterializeInput(new MRPipeline(MaterializeIT.class, tmpDir.getDefaultConfiguration()),
+        AvroTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testMaterializeInput_InMemoryWritables() throws IOException {
+    runMaterializeInput(MemPipeline.getInstance(), WritableTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testMaterializeInput_InMemoryAvro() throws IOException {
+    runMaterializeInput(MemPipeline.getInstance(), AvroTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testMaterializeEmptyIntermediate_Writables() throws IOException {
+    runMaterializeEmptyIntermediate(
+        new MRPipeline(MaterializeIT.class, tmpDir.getDefaultConfiguration()),
+        WritableTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testMaterializeEmptyIntermediate_Avro() throws IOException {
+    runMaterializeEmptyIntermediate(
+        new MRPipeline(MaterializeIT.class, tmpDir.getDefaultConfiguration()),
+        AvroTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testMaterializeEmptyIntermediate_InMemoryWritables() throws IOException {
+    runMaterializeEmptyIntermediate(MemPipeline.getInstance(), WritableTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testMaterializeEmptyIntermediate_InMemoryAvro() throws IOException {
+    runMaterializeEmptyIntermediate(MemPipeline.getInstance(), AvroTypeFamily.getInstance());
+  }
+
+  public void runMaterializeInput(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
+    List<String> expectedContent = Lists.newArrayList("b", "c", "a", "e");
+    String inputPath = tmpDir.copyResourceFileName("set1.txt");
+
+    PCollection<String> lines = pipeline.readTextFile(inputPath);
+    assertEquals(expectedContent, Lists.newArrayList(lines.materialize()));
+    pipeline.done();
+  }
+
+  public void runMaterializeEmptyIntermediate(Pipeline pipeline, PTypeFamily typeFamily)
+      throws IOException {
+    String inputPath = tmpDir.copyResourceFileName("set1.txt");
+    PCollection<String> empty = pipeline.readTextFile(inputPath).filter(FilterFns.<String>REJECT_ALL());
+
+    assertTrue(Lists.newArrayList(empty.materialize()).isEmpty());
+    pipeline.done();
+  }
+
+  static class StringToStringWrapperPersonPairMapFn extends MapFn<String, Pair<StringWrapper, Person>> {
+
+    @Override
+    public Pair<StringWrapper, Person> map(String input) {
+      Person person = new Person();
+      person.name = input;
+      person.age = 42;
+      person.siblingnames = Lists.<CharSequence> newArrayList();
+      return Pair.of(new StringWrapper(input), person);
+    }
+
+  }
+
+  @Test
+  public void testMaterializeAvroPersonAndReflectsPair_GroupedTable() throws IOException {
+    Assume.assumeTrue(Avros.CAN_COMBINE_SPECIFIC_AND_REFLECT_SCHEMAS);
+    Pipeline pipeline = new MRPipeline(MaterializeIT.class);
+    List<Pair<StringWrapper, Person>> pairList = Lists.newArrayList(pipeline
+        .readTextFile(tmpDir.copyResourceFileName("set1.txt"))
+        .parallelDo(new StringToStringWrapperPersonPairMapFn(),
+            Avros.pairs(Avros.reflects(StringWrapper.class), Avros.records(Person.class)))
+        .materialize());
+    
+    // We just need to make sure this doesn't crash
+    assertEquals(4, pairList.size());
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/MaterializeToMapIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/MaterializeToMapIT.java b/crunch-core/src/it/java/org/apache/crunch/MaterializeToMapIT.java
new file mode 100644
index 0000000..7fef30e
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/MaterializeToMapIT.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static junit.framework.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.PTypeFamily;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+
+public class MaterializeToMapIT {
+
+  static final ImmutableList<Pair<Integer, String>> kvPairs = ImmutableList.of(Pair.of(0, "a"), Pair.of(1, "b"),
+      Pair.of(2, "c"), Pair.of(3, "e"));
+
+  public void assertMatches(Map<Integer, String> m) {
+    for (Integer k : m.keySet()) {
+      assertEquals(kvPairs.get(k).second(), m.get(k));
+    }
+  }
+
+  @Test
+  public void testMemMaterializeToMap() {
+    assertMatches(MemPipeline.tableOf(kvPairs).materializeToMap());
+  }
+
+  private static class Set1Mapper extends MapFn<String, Pair<Integer, String>> {
+    @Override
+    public Pair<Integer, String> map(String input) {
+
+      int k = -1;
+      if (input.equals("a"))
+        k = 0;
+      else if (input.equals("b"))
+        k = 1;
+      else if (input.equals("c"))
+        k = 2;
+      else if (input.equals("e"))
+        k = 3;
+      return Pair.of(k, input);
+    }
+  }
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Test
+  public void testMRMaterializeToMap() throws IOException {
+    Pipeline p = new MRPipeline(MaterializeToMapIT.class, tmpDir.getDefaultConfiguration());
+    String inputFile = tmpDir.copyResourceFileName("set1.txt");
+    PCollection<String> c = p.readTextFile(inputFile);
+    PTypeFamily tf = c.getTypeFamily();
+    PTable<Integer, String> t = c.parallelDo(new Set1Mapper(), tf.tableOf(tf.ints(), tf.strings()));
+    Map<Integer, String> m = t.materializeToMap();
+    assertMatches(m);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/MultipleOutputIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/MultipleOutputIT.java b/crunch-core/src/it/java/org/apache/crunch/MultipleOutputIT.java
new file mode 100644
index 0000000..1a85b6a
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/MultipleOutputIT.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.At;
+import org.apache.crunch.test.StringWrapper;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.apache.crunch.types.writable.Writables;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+
+public class MultipleOutputIT {
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+
+  public static PCollection<String> evenCountLetters(PCollection<String> words, PTypeFamily typeFamily) {
+    return words.parallelDo("even", new FilterFn<String>() {
+
+      @Override
+      public boolean accept(String input) {
+        return input.length() % 2 == 0;
+      }
+    }, typeFamily.strings());
+  }
+
+  public static PCollection<String> oddCountLetters(PCollection<String> words, PTypeFamily typeFamily) {
+    return words.parallelDo("odd", new FilterFn<String>() {
+
+      @Override
+      public boolean accept(String input) {
+        return input.length() % 2 != 0;
+      }
+    }, typeFamily.strings());
+
+  }
+
+  public static PTable<String, Long> substr(PTable<String, Long> ptable) {
+    return ptable.parallelDo(new DoFn<Pair<String, Long>, Pair<String, Long>>() {
+      public void process(Pair<String, Long> input, Emitter<Pair<String, Long>> emitter) {
+        if (input.first().length() > 0) {
+          emitter.emit(Pair.of(input.first().substring(0, 1), input.second()));
+        }
+      }
+    }, ptable.getPTableType());
+  }
+
+  @Test
+  public void testWritables() throws IOException {
+    run(new MRPipeline(MultipleOutputIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testAvro() throws IOException {
+    run(new MRPipeline(MultipleOutputIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testParallelDosFused() throws IOException {
+
+    PipelineResult result = run(new MRPipeline(MultipleOutputIT.class, tmpDir.getDefaultConfiguration()),
+        WritableTypeFamily.getInstance());
+
+    // Ensure our multiple outputs were fused into a single job.
+    assertEquals("parallel Dos not fused into a single job", 1, result.getStageResults().size());
+  }
+
+  public PipelineResult run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
+    String inputPath = tmpDir.copyResourceFileName("letters.txt");
+    String outputPathEven = tmpDir.getFileName("even");
+    String outputPathOdd = tmpDir.getFileName("odd");
+
+    PCollection<String> words = pipeline.read(At.textFile(inputPath, typeFamily.strings()));
+
+    PCollection<String> evenCountWords = evenCountLetters(words, typeFamily);
+    PCollection<String> oddCountWords = oddCountLetters(words, typeFamily);
+    pipeline.writeTextFile(evenCountWords, outputPathEven);
+    pipeline.writeTextFile(oddCountWords, outputPathOdd);
+
+    PipelineResult result = pipeline.done();
+
+    checkFileContents(outputPathEven, Arrays.asList("bb"));
+    checkFileContents(outputPathOdd, Arrays.asList("a"));
+
+    return result;
+  }
+
+  /**
+   * Mutates the state of an input and then emits the mutated object.
+   */
+  static class AppendFn extends DoFn<StringWrapper, StringWrapper> {
+
+    private String value;
+
+    public AppendFn(String value) {
+      this.value = value;
+    }
+
+    @Override
+    public void process(StringWrapper input, Emitter<StringWrapper> emitter) {
+      input.setValue(input.getValue() + value);
+      emitter.emit(input);
+    }
+
+  }
+
+  /**
+   * Fusing multiple pipelines has a risk of running into object reuse bugs.
+   * This test verifies that mutating the state of an object that is passed
+   * through multiple streams of a pipeline doesn't allow one stream to affect
+   * another.
+   */
+  @Test
+  public void testFusedMappersObjectReuseBug() throws IOException {
+    Pipeline pipeline = new MRPipeline(MultipleOutputIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<StringWrapper> stringWrappers = pipeline.readTextFile(tmpDir.copyResourceFileName("set2.txt"))
+        .parallelDo(new StringWrapper.StringToStringWrapperMapFn(), Avros.reflects(StringWrapper.class));
+
+    PCollection<String> stringsA = stringWrappers.parallelDo(new AppendFn("A"), stringWrappers.getPType())
+        .parallelDo(new StringWrapper.StringWrapperToStringMapFn(), Writables.strings());
+    PCollection<String> stringsB = stringWrappers.parallelDo(new AppendFn("B"), stringWrappers.getPType())
+        .parallelDo(new StringWrapper.StringWrapperToStringMapFn(), Writables.strings());
+
+    String outputA = tmpDir.getFileName("stringsA");
+    String outputB = tmpDir.getFileName("stringsB");
+
+    pipeline.writeTextFile(stringsA, outputA);
+    pipeline.writeTextFile(stringsB, outputB);
+    PipelineResult pipelineResult = pipeline.done();
+
+    // Make sure fusing did actually occur
+    assertEquals(1, pipelineResult.getStageResults().size());
+
+    checkFileContents(outputA, Lists.newArrayList("cA", "dA", "aA"));
+    checkFileContents(outputB, Lists.newArrayList("cB", "dB", "aB"));
+
+  }
+
+  private void checkFileContents(String filePath, List<String> expected) throws IOException {
+    File outputFile = new File(filePath, "part-m-00000");
+    List<String> lines = Files.readLines(outputFile, Charset.defaultCharset());
+    assertEquals(expected, lines);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/PCollectionGetSizeIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/PCollectionGetSizeIT.java b/crunch-core/src/it/java/org/apache/crunch/PCollectionGetSizeIT.java
new file mode 100644
index 0000000..44eb897
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/PCollectionGetSizeIT.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static com.google.common.collect.Lists.newArrayList;
+import static org.apache.crunch.io.At.sequenceFile;
+import static org.apache.crunch.io.At.textFile;
+import static org.apache.crunch.types.writable.Writables.strings;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.io.IOException;
+
+import org.apache.crunch.fn.FilterFns;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class PCollectionGetSizeIT {
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+
+  private String emptyInputPath;
+  private String nonEmptyInputPath;
+  private String outputPath;
+
+  @Before
+  public void setUp() throws IOException {
+    emptyInputPath = tmpDir.copyResourceFileName("emptyTextFile.txt");
+    nonEmptyInputPath = tmpDir.copyResourceFileName("set1.txt");
+    outputPath = tmpDir.getFileName("output");
+  }
+
+  @Test
+  public void testGetSizeOfEmptyInput_MRPipeline() throws IOException {
+    testCollectionGetSizeOfEmptyInput(new MRPipeline(this.getClass(), tmpDir.getDefaultConfiguration()));
+  }
+
+  @Test
+  public void testGetSizeOfEmptyInput_MemPipeline() throws IOException {
+    testCollectionGetSizeOfEmptyInput(MemPipeline.getInstance());
+  }
+
+  private void testCollectionGetSizeOfEmptyInput(Pipeline pipeline) throws IOException {
+
+    assertThat(pipeline.read(textFile(emptyInputPath)).getSize(), is(0L));
+  }
+
+  @Test
+  public void testMaterializeEmptyInput_MRPipeline() throws IOException {
+    testMaterializeEmptyInput(new MRPipeline(this.getClass(), tmpDir.getDefaultConfiguration()));
+  }
+
+  @Test
+  public void testMaterializeEmptyImput_MemPipeline() throws IOException {
+    testMaterializeEmptyInput(MemPipeline.getInstance());
+  }
+
+  private void testMaterializeEmptyInput(Pipeline pipeline) throws IOException {
+    assertThat(newArrayList(pipeline.readTextFile(emptyInputPath).materialize().iterator()).size(), is(0));
+  }
+
+  @Test
+  public void testGetSizeOfEmptyIntermediatePCollection_MRPipeline() throws IOException {
+
+    PCollection<String> emptyIntermediate = createPesistentEmptyIntermediate(
+        new MRPipeline(this.getClass(), tmpDir.getDefaultConfiguration()));
+
+    assertThat(emptyIntermediate.getSize(), is(0L));
+  }
+
+  @Test
+  @Ignore("GetSize of a DoCollection is only an estimate based on scale factor, so we can't count on it being reported as 0")
+  public void testGetSizeOfEmptyIntermediatePCollection_NoSave_MRPipeline() throws IOException {
+
+    PCollection<String> data = new MRPipeline(this.getClass(), tmpDir.getDefaultConfiguration())
+      .readTextFile(nonEmptyInputPath);
+
+    PCollection<String> emptyPCollection = data.filter(FilterFns.<String>REJECT_ALL());
+
+    assertThat(emptyPCollection.getSize(), is(0L));
+  }
+
+  @Test
+  public void testGetSizeOfEmptyIntermediatePCollection_MemPipeline() {
+
+    PCollection<String> emptyIntermediate = createPesistentEmptyIntermediate(MemPipeline.getInstance());
+
+    assertThat(emptyIntermediate.getSize(), is(0L));
+  }
+
+  @Test
+  public void testMaterializeOfEmptyIntermediatePCollection_MRPipeline() throws IOException {
+
+    PCollection<String> emptyIntermediate = createPesistentEmptyIntermediate(
+        new MRPipeline(this.getClass(), tmpDir.getDefaultConfiguration()));
+
+    assertThat(newArrayList(emptyIntermediate.materialize()).size(), is(0));
+  }
+
+  @Test
+  public void testMaterializeOfEmptyIntermediatePCollection_MemPipeline() {
+
+    PCollection<String> emptyIntermediate = createPesistentEmptyIntermediate(MemPipeline.getInstance());
+
+    assertThat(newArrayList(emptyIntermediate.materialize()).size(), is(0));
+  }
+
+  private PCollection<String> createPesistentEmptyIntermediate(Pipeline pipeline) {
+
+    PCollection<String> data = pipeline.readTextFile(nonEmptyInputPath);
+
+    PCollection<String> emptyPCollection = data.filter(FilterFns.<String>REJECT_ALL());
+
+    emptyPCollection.write(sequenceFile(outputPath, strings()));
+
+    pipeline.run();
+
+    return pipeline.read(sequenceFile(outputPath, strings()));
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testExpectExceptionForGettingSizeOfNonExistingFile_MRPipeline() throws IOException {
+    new MRPipeline(this.getClass(), tmpDir.getDefaultConfiguration()).readTextFile("non_existing.file").getSize();
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testExpectExceptionForGettingSizeOfNonExistingFile_MemPipeline() {
+    MemPipeline.getInstance().readTextFile("non_existing.file").getSize();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/PObjectsIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/PObjectsIT.java b/crunch-core/src/it/java/org/apache/crunch/PObjectsIT.java
new file mode 100644
index 0000000..6ee849f
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/PObjectsIT.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.lang.Integer;
+import java.lang.Iterable;
+import java.lang.String;
+import java.util.Iterator;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.materialize.pobject.PObjectImpl;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.junit.Rule;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class PObjectsIT {
+
+  private static final Integer LINES_IN_SHAKES = 3667;
+
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+
+  /**
+   * A mock PObject that should map PCollections of strings to an integer count of the number of
+   * elements in the underlying PCollection.
+   */
+  public static class MockPObjectImpl extends PObjectImpl<String, Integer> {
+    private int numProcessCalls;
+
+    public MockPObjectImpl(PCollection<String> collect) {
+      super(collect);
+      numProcessCalls = 0;
+    }
+
+    @Override
+    public Integer process(Iterable<String> input) {
+      numProcessCalls++;
+      int i = 0;
+      Iterator<String> itr = input.iterator();
+      while (itr.hasNext()) {
+        i++;
+        itr.next();
+      }
+      return i;
+    }
+
+    public int getNumProcessCalls() {
+      return numProcessCalls;
+    }
+  }
+
+  @Test
+  public void testMRPipeline() throws IOException {
+    run(new MRPipeline(PObjectsIT.class, tmpDir.getDefaultConfiguration()));
+  }
+
+  @Test
+  public void testInMemoryPipeline() throws IOException {
+    run(MemPipeline.getInstance());
+  }
+
+  public void run(Pipeline pipeline) throws IOException {
+    String shakesInputPath = tmpDir.copyResourceFileName("shakes.txt");
+    PCollection<String> shakespeare = pipeline.readTextFile(shakesInputPath);
+    MockPObjectImpl lineCount = new MockPObjectImpl(shakespeare);
+    // Get the line count once and verify it's correctness.
+    assertEquals("Incorrect number of lines counted from PCollection.", LINES_IN_SHAKES,
+        lineCount.getValue());
+    // And do it again.
+    assertEquals("Incorrect number of lines counted from PCollection.", LINES_IN_SHAKES,
+        lineCount.getValue());
+    // Make sure process was called only once because the PObject's value was cached after the
+    // first call.
+    assertEquals("Process on PObject not called exactly 1 times.", 1,
+        lineCount.getNumProcessCalls());
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/PTableKeyValueIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/PTableKeyValueIT.java b/crunch-core/src/it/java/org/apache/crunch/PTableKeyValueIT.java
new file mode 100644
index 0000000..d56e122
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/PTableKeyValueIT.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+
+import junit.framework.Assert;
+
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.At;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Lists;
+
+@RunWith(value = Parameterized.class)
+public class PTableKeyValueIT implements Serializable {
+
+  private static final long serialVersionUID = 4374227704751746689L;
+
+  private transient PTypeFamily typeFamily;
+  private transient MRPipeline pipeline;
+  private transient String inputFile;
+  @Rule
+  public transient TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Before
+  public void setUp() throws IOException {
+    pipeline = new MRPipeline(PTableKeyValueIT.class, tmpDir.getDefaultConfiguration());
+    inputFile = tmpDir.copyResourceFileName("set1.txt");
+  }
+
+  @After
+  public void tearDown() {
+    pipeline.done();
+  }
+
+  public PTableKeyValueIT(PTypeFamily typeFamily) {
+    this.typeFamily = typeFamily;
+  }
+
+  @Parameters
+  public static Collection<Object[]> data() {
+    Object[][] data = new Object[][] { { WritableTypeFamily.getInstance() }, { AvroTypeFamily.getInstance() } };
+    return Arrays.asList(data);
+  }
+
+  @Test
+  public void testKeysAndValues() throws Exception {
+
+    PCollection<String> collection = pipeline.read(At.textFile(inputFile, typeFamily.strings()));
+
+    PTable<String, String> table = collection.parallelDo(new DoFn<String, Pair<String, String>>() {
+
+      @Override
+      public void process(String input, Emitter<Pair<String, String>> emitter) {
+        emitter.emit(Pair.of(input.toUpperCase(), input));
+
+      }
+    }, typeFamily.tableOf(typeFamily.strings(), typeFamily.strings()));
+
+    PCollection<String> keys = table.keys();
+    PCollection<String> values = table.values();
+
+    ArrayList<String> keyList = Lists.newArrayList(keys.materialize().iterator());
+    ArrayList<String> valueList = Lists.newArrayList(values.materialize().iterator());
+
+    Assert.assertEquals(keyList.size(), valueList.size());
+    for (int i = 0; i < keyList.size(); i++) {
+      Assert.assertEquals(keyList.get(i), valueList.get(i).toUpperCase());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java b/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java
new file mode 100644
index 0000000..6291ef8
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.lib.Aggregate;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.PTypes;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+public class PageRankIT {
+
+  public static class PageRankData {
+    public float score;
+    public float lastScore;
+    public List<String> urls;
+
+    public PageRankData() {
+    }
+
+    public PageRankData(float score, float lastScore, Iterable<String> urls) {
+      this.score = score;
+      this.lastScore = lastScore;
+      this.urls = Lists.newArrayList(urls);
+    }
+
+    public PageRankData next(float newScore) {
+      return new PageRankData(newScore, score, urls);
+    }
+
+    public float propagatedScore() {
+      return score / urls.size();
+    }
+
+    @Override
+    public String toString() {
+      return score + " " + lastScore + " " + urls;
+    }
+  }
+
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Test
+  public void testAvroReflect() throws Exception {
+    PTypeFamily tf = AvroTypeFamily.getInstance();
+    PType<PageRankData> prType = Avros.reflects(PageRankData.class);
+    String urlInput = tmpDir.copyResourceFileName("urls.txt");
+    run(new MRPipeline(PageRankIT.class, tmpDir.getDefaultConfiguration()),
+        urlInput, prType, tf);
+  }
+
+  @Test
+  public void testAvroMReflectInMemory() throws Exception {
+    PTypeFamily tf = AvroTypeFamily.getInstance();
+    PType<PageRankData> prType = Avros.reflects(PageRankData.class);
+    String urlInput = tmpDir.copyResourceFileName("urls.txt");
+    run(MemPipeline.getInstance(), urlInput, prType, tf);
+  }
+
+  @Test
+  public void testAvroJSON() throws Exception {
+    PTypeFamily tf = AvroTypeFamily.getInstance();
+    PType<PageRankData> prType = PTypes.jsonString(PageRankData.class, tf);
+    String urlInput = tmpDir.copyResourceFileName("urls.txt");
+    run(new MRPipeline(PageRankIT.class, tmpDir.getDefaultConfiguration()),
+        urlInput, prType, tf);
+  }
+
+  @Test
+  public void testWritablesJSON() throws Exception {
+    PTypeFamily tf = WritableTypeFamily.getInstance();
+    PType<PageRankData> prType = PTypes.jsonString(PageRankData.class, tf);
+    String urlInput = tmpDir.copyResourceFileName("urls.txt");
+    run(new MRPipeline(PageRankIT.class, tmpDir.getDefaultConfiguration()),
+        urlInput, prType, tf);
+  }
+
+  public static PTable<String, PageRankData> pageRank(PTable<String, PageRankData> input, final float d) {
+    PTypeFamily ptf = input.getTypeFamily();
+    PTable<String, Float> outbound = input.parallelDo(new DoFn<Pair<String, PageRankData>, Pair<String, Float>>() {
+      @Override
+      public void process(Pair<String, PageRankData> input, Emitter<Pair<String, Float>> emitter) {
+        PageRankData prd = input.second();
+        for (String link : prd.urls) {
+          emitter.emit(Pair.of(link, prd.propagatedScore()));
+        }
+      }
+    }, ptf.tableOf(ptf.strings(), ptf.floats()));
+
+    return input.cogroup(outbound).parallelDo(
+        new MapFn<Pair<String, Pair<Collection<PageRankData>, Collection<Float>>>, Pair<String, PageRankData>>() {
+          @Override
+          public Pair<String, PageRankData> map(Pair<String, Pair<Collection<PageRankData>, Collection<Float>>> input) {
+            PageRankData prd = Iterables.getOnlyElement(input.second().first());
+            Collection<Float> propagatedScores = input.second().second();
+            float sum = 0.0f;
+            for (Float s : propagatedScores) {
+              sum += s;
+            }
+            return Pair.of(input.first(), prd.next(d + (1.0f - d) * sum));
+          }
+        }, input.getPTableType());
+  }
+
+  public static void run(Pipeline pipeline, String urlInput,
+      PType<PageRankData> prType, PTypeFamily ptf) throws Exception {
+    PTable<String, PageRankData> scores = pipeline.readTextFile(urlInput)
+        .parallelDo(new MapFn<String, Pair<String, String>>() {
+          @Override
+          public Pair<String, String> map(String input) {
+            String[] urls = input.split("\\t");
+            return Pair.of(urls[0], urls[1]);
+          }
+        }, ptf.tableOf(ptf.strings(), ptf.strings())).groupByKey()
+        .parallelDo(new MapFn<Pair<String, Iterable<String>>, Pair<String, PageRankData>>() {
+          @Override
+          public Pair<String, PageRankData> map(Pair<String, Iterable<String>> input) {
+            return Pair.of(input.first(), new PageRankData(1.0f, 0.0f, input.second()));
+          }
+        }, ptf.tableOf(ptf.strings(), prType));
+
+    Float delta = 1.0f;
+    while (delta > 0.01) {
+      scores = pageRank(scores, 0.5f);
+      scores.materialize().iterator(); // force the write
+      delta = Aggregate.max(scores.parallelDo(new MapFn<Pair<String, PageRankData>, Float>() {
+        @Override
+        public Float map(Pair<String, PageRankData> input) {
+          PageRankData prd = input.second();
+          return Math.abs(prd.score - prd.lastScore);
+        }
+      }, ptf.floats())).getValue();
+    }
+    assertEquals(0.0048, delta, 0.001);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/StageResultsCountersIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/StageResultsCountersIT.java b/crunch-core/src/it/java/org/apache/crunch/StageResultsCountersIT.java
new file mode 100644
index 0000000..19fc302
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/StageResultsCountersIT.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.crunch.PipelineResult.StageResult;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.From;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.apache.hadoop.mapreduce.Counter;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+public class StageResultsCountersIT {
+
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+
+  public static HashSet<String> SPECIAL_KEYWORDS = Sets.newHashSet("AND", "OR", "NOT");
+
+  public static String KEYWORDS_COUNTER_GROUP = "KEYWORDS_COUNTER_GROUP";
+
+  @After
+  public void after() {
+    MemPipeline.clearCounters();
+  }
+  
+  @Test
+  public void testStageResultsCountersMRWritables() throws Exception {
+    testSpecialKeywordCount(new MRPipeline(StageResultsCountersIT.class, tmpDir.getDefaultConfiguration()),
+        WritableTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testStageResultsCountersMRAvro() throws Exception {
+    testSpecialKeywordCount(new MRPipeline(StageResultsCountersIT.class, tmpDir.getDefaultConfiguration()),
+        AvroTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testStageResultsCountersMemWritables() throws Exception {
+    testSpecialKeywordCount(MemPipeline.getInstance(), WritableTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testStageResultsCountersMemAvro() throws Exception {
+    testSpecialKeywordCount(MemPipeline.getInstance(), AvroTypeFamily.getInstance());
+  }
+
+  public void testSpecialKeywordCount(Pipeline pipeline, PTypeFamily tf) throws Exception {
+
+    String rowsInputPath = tmpDir.copyResourceFileName("shakes.txt");
+
+    PipelineResult result = coutSpecialKeywords(pipeline, rowsInputPath, tf);
+
+    assertTrue(result.succeeded());
+
+    Map<String, Long> keywordsMap = countersToMap(result.getStageResults(), KEYWORDS_COUNTER_GROUP);
+
+    assertEquals(3, keywordsMap.size());
+
+    assertEquals("{NOT=157, AND=596, OR=81}", keywordsMap.toString());
+  }
+
+  private static PipelineResult coutSpecialKeywords(Pipeline pipeline, String inputFileName, PTypeFamily tf) {
+
+    pipeline.read(From.textFile(inputFileName)).parallelDo(new DoFn<String, Void>() {
+
+      @Override
+      public void process(String text, Emitter<Void> emitter) {
+
+        if (!StringUtils.isBlank(text)) {
+
+          String[] tokens = text.toUpperCase().split("\\s");
+
+          for (String token : tokens) {
+            if (SPECIAL_KEYWORDS.contains(token)) {
+              getCounter(KEYWORDS_COUNTER_GROUP, token).increment(1);
+            }
+          }
+        }
+      }
+    }, tf.nulls()).materialize(); // TODO can we avoid the materialize ?
+
+    return pipeline.done();
+  }
+
+  private static Map<String, Long> countersToMap(List<StageResult> stages, String counterGroupName) {
+
+    Map<String, Long> countersMap = Maps.newHashMap();
+
+    for (StageResult sr : stages) {
+      Iterator<Counter> iterator = sr.getCounters().getGroup(counterGroupName).iterator();
+      while (iterator.hasNext()) {
+        Counter counter = (Counter) iterator.next();
+        countersMap.put(counter.getDisplayName(), counter.getValue());
+      }
+    }
+
+    return countersMap;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/TermFrequencyIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/TermFrequencyIT.java b/crunch-core/src/it/java/org/apache/crunch/TermFrequencyIT.java
new file mode 100644
index 0000000..ca66aa8
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/TermFrequencyIT.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.At;
+import org.apache.crunch.io.ReadableSourceTarget;
+import org.apache.crunch.lib.Aggregate;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.junit.Rule;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class TermFrequencyIT implements Serializable {
+  @Rule
+  public transient TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Test
+  public void testTermFrequencyWithNoTransform() throws IOException {
+    run(new MRPipeline(TermFrequencyIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance(), false);
+  }
+
+  @Test
+  public void testTermFrequencyWithTransform() throws IOException {
+    run(new MRPipeline(TermFrequencyIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance(), true);
+  }
+
+  @Test
+  public void testTermFrequencyNoTransformInMemory() throws IOException {
+    run(MemPipeline.getInstance(), WritableTypeFamily.getInstance(), false);
+  }
+
+  @Test
+  public void testTermFrequencyWithTransformInMemory() throws IOException {
+    run(MemPipeline.getInstance(), WritableTypeFamily.getInstance(), true);
+  }
+
+  public void run(Pipeline pipeline, PTypeFamily typeFamily, boolean transformTF) throws IOException {
+    String input = tmpDir.copyResourceFileName("docs.txt");
+
+    File transformedOutput = tmpDir.getFile("transformed-output");
+    File tfOutput = tmpDir.getFile("tf-output");
+
+    PCollection<String> docs = pipeline.readTextFile(input);
+
+    PTypeFamily ptf = docs.getTypeFamily();
+
+    /*
+     * Input: String Input title text
+     * 
+     * Output: PTable<Pair<String, String>, Long> Pair<Pair<word, title>, count
+     * in title>
+     */
+    PTable<Pair<String, String>, Long> tf = Aggregate.count(docs.parallelDo("term document frequency",
+        new DoFn<String, Pair<String, String>>() {
+          @Override
+          public void process(String doc, Emitter<Pair<String, String>> emitter) {
+            String[] kv = doc.split("\t");
+            String title = kv[0];
+            String text = kv[1];
+            for (String word : text.split("\\W+")) {
+              if (word.length() > 0) {
+                Pair<String, String> pair = Pair.of(word.toLowerCase(), title);
+                emitter.emit(pair);
+              }
+            }
+          }
+        }, ptf.pairs(ptf.strings(), ptf.strings())));
+
+    if (transformTF) {
+      /*
+       * Input: Pair<Pair<String, String>, Long> Pair<Pair<word, title>, count
+       * in title>
+       * 
+       * Output: PTable<String, Pair<String, Long>> PTable<word, Pair<title,
+       * count in title>>
+       */
+      PTable<String, Pair<String, Long>> wordDocumentCountPair = tf.parallelDo("transform wordDocumentPairCount",
+          new MapFn<Pair<Pair<String, String>, Long>, Pair<String, Pair<String, Long>>>() {
+            @Override
+            public Pair<String, Pair<String, Long>> map(Pair<Pair<String, String>, Long> input) {
+              Pair<String, String> wordDocumentPair = input.first();
+              return Pair.of(wordDocumentPair.first(), Pair.of(wordDocumentPair.second(), input.second()));
+            }
+          }, ptf.tableOf(ptf.strings(), ptf.pairs(ptf.strings(), ptf.longs())));
+
+      pipeline.writeTextFile(wordDocumentCountPair, transformedOutput.getAbsolutePath());
+    }
+
+    SourceTarget<String> st = At.textFile(tfOutput.getAbsolutePath());
+    pipeline.write(tf, st);
+
+    pipeline.run();
+
+    // test the case we should see
+    Iterable<String> lines = ((ReadableSourceTarget<String>) st).read(pipeline.getConfiguration());
+    boolean passed = false;
+    for (String line : lines) {
+      if ("[well,A]\t0".equals(line)) {
+        fail("Found " + line + " but well is in Document A 1 time");
+      }
+      if ("[well,A]\t1".equals(line)) {
+        passed = true;
+      }
+    }
+    assertTrue(passed);
+    pipeline.done();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/TextPairIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/TextPairIT.java b/crunch-core/src/it/java/org/apache/crunch/TextPairIT.java
new file mode 100644
index 0000000..55d9af9
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/TextPairIT.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.From;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.writable.Writables;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class TextPairIT {
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Test
+  public void testWritables() throws IOException {
+    run(new MRPipeline(TextPairIT.class, tmpDir.getDefaultConfiguration()));
+  }
+
+  private static final String CANARY = "Writables.STRING_TO_TEXT";
+
+  public static PCollection<Pair<String, String>> wordDuplicate(PCollection<String> words) {
+    return words.parallelDo("my word duplicator", new DoFn<String, Pair<String, String>>() {
+      public void process(String line, Emitter<Pair<String, String>> emitter) {
+        for (String word : line.split("\\W+")) {
+          if (word.length() > 0) {
+            Pair<String, String> pair = Pair.of(CANARY, word);
+            emitter.emit(pair);
+          }
+        }
+      }
+    }, Writables.pairs(Writables.strings(), Writables.strings()));
+  }
+
+  public void run(Pipeline pipeline) throws IOException {
+    String input = tmpDir.copyResourceFileName("shakes.txt");
+
+    PCollection<String> shakespeare = pipeline.read(From.textFile(input));
+    Iterable<Pair<String, String>> lines = pipeline.materialize(wordDuplicate(shakespeare));
+    boolean passed = false;
+    for (Pair<String, String> line : lines) {
+      if (line.first().contains(CANARY)) {
+        passed = true;
+        break;
+      }
+    }
+
+    pipeline.done();
+    assertTrue(passed);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/TfIdfIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/TfIdfIT.java b/crunch-core/src/it/java/org/apache/crunch/TfIdfIT.java
new file mode 100644
index 0000000..218f538
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/TfIdfIT.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.crunch.fn.MapKeysFn;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.seq.SeqFileSourceTarget;
+import org.apache.crunch.lib.Aggregate;
+import org.apache.crunch.lib.Join;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.apache.hadoop.fs.Path;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+
+@SuppressWarnings("serial")
+public class TfIdfIT implements Serializable {
+  @Rule
+  public transient TemporaryPath tmpDir = TemporaryPaths.create();
+
+  // total number of documents, should calculate
+  protected static final double N = 2;
+
+  @Test
+  public void testWritablesSingleRun() throws IOException {
+    run(new MRPipeline(TfIdfIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance(), true);
+  }
+
+  @Test
+  public void testWritablesMultiRun() throws IOException {
+    run(new MRPipeline(TfIdfIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance(), false);
+  }
+
+  /**
+   * This method should generate a TF-IDF score for the input.
+   */
+  public PTable<String, Collection<Pair<String, Double>>> generateTFIDF(PCollection<String> docs, Path termFreqPath,
+      PTypeFamily ptf) throws IOException {
+
+    /*
+     * Input: String Input title text
+     * 
+     * Output: PTable<Pair<String, String>, Long> Pair<Pair<word, title>, count
+     * in title>
+     */
+    PTable<Pair<String, String>, Long> tf = Aggregate.count(docs.parallelDo("term document frequency",
+        new DoFn<String, Pair<String, String>>() {
+          @Override
+          public void process(String doc, Emitter<Pair<String, String>> emitter) {
+            String[] kv = doc.split("\t");
+            String title = kv[0];
+            String text = kv[1];
+            for (String word : text.split("\\W+")) {
+              if (word.length() > 0) {
+                Pair<String, String> pair = Pair.of(word.toLowerCase(), title);
+                emitter.emit(pair);
+              }
+            }
+          }
+        }, ptf.pairs(ptf.strings(), ptf.strings())));
+
+    tf.write(new SeqFileSourceTarget<Pair<Pair<String, String>, Long>>(termFreqPath, tf.getPType()));
+
+    /*
+     * Input: Pair<Pair<String, String>, Long> Pair<Pair<word, title>, count in
+     * title>
+     * 
+     * Output: PTable<String, Long> PTable<word, # of docs containing word>
+     */
+    PTable<String, Long> n = Aggregate.count(tf.parallelDo("little n (# of docs contain word)",
+        new DoFn<Pair<Pair<String, String>, Long>, String>() {
+          @Override
+          public void process(Pair<Pair<String, String>, Long> input, Emitter<String> emitter) {
+            emitter.emit(input.first().first());
+          }
+        }, ptf.strings()));
+
+    /*
+     * Input: Pair<Pair<String, String>, Long> Pair<Pair<word, title>, count in
+     * title>
+     * 
+     * Output: PTable<String, Pair<String, Long>> PTable<word, Pair<title, count
+     * in title>>
+     */
+    PTable<String, Collection<Pair<String, Long>>> wordDocumentCountPair = tf.parallelDo(
+        "transform wordDocumentPairCount",
+        new DoFn<Pair<Pair<String, String>, Long>, Pair<String, Collection<Pair<String, Long>>>>() {
+          Collection<Pair<String, Long>> buffer;
+          String key;
+
+          @Override
+          public void process(Pair<Pair<String, String>, Long> input,
+              Emitter<Pair<String, Collection<Pair<String, Long>>>> emitter) {
+            Pair<String, String> wordDocumentPair = input.first();
+            if (!wordDocumentPair.first().equals(key)) {
+              flush(emitter);
+              key = wordDocumentPair.first();
+              buffer = Lists.newArrayList();
+            }
+            buffer.add(Pair.of(wordDocumentPair.second(), input.second()));
+          }
+
+          protected void flush(Emitter<Pair<String, Collection<Pair<String, Long>>>> emitter) {
+            if (buffer != null) {
+              emitter.emit(Pair.of(key, buffer));
+              buffer = null;
+            }
+          }
+
+          @Override
+          public void cleanup(Emitter<Pair<String, Collection<Pair<String, Long>>>> emitter) {
+            flush(emitter);
+          }
+        }, ptf.tableOf(ptf.strings(), ptf.collections(ptf.pairs(ptf.strings(), ptf.longs()))));
+
+    PTable<String, Pair<Long, Collection<Pair<String, Long>>>> joinedResults = Join.join(n, wordDocumentCountPair);
+
+    /*
+     * Input: Pair<String, Pair<Long, Collection<Pair<String, Long>>> Pair<word,
+     * Pair<# of docs containing word, Collection<Pair<title, term frequency>>>
+     * 
+     * Output: Pair<String, Collection<Pair<String, Double>>> Pair<word,
+     * Collection<Pair<title, tfidf>>>
+     */
+    return joinedResults
+        .parallelDo(
+            "calculate tfidf",
+            new MapFn<Pair<String, Pair<Long, Collection<Pair<String, Long>>>>, Pair<String, Collection<Pair<String, Double>>>>() {
+              @Override
+              public Pair<String, Collection<Pair<String, Double>>> map(
+                  Pair<String, Pair<Long, Collection<Pair<String, Long>>>> input) {
+                Collection<Pair<String, Double>> tfidfs = Lists.newArrayList();
+                String word = input.first();
+                double n = input.second().first();
+                double idf = Math.log(N / n);
+                for (Pair<String, Long> tf : input.second().second()) {
+                  double tfidf = tf.second() * idf;
+                  tfidfs.add(Pair.of(tf.first(), tfidf));
+                }
+                return Pair.of(word, tfidfs);
+              }
+
+            }, ptf.tableOf(ptf.strings(), ptf.collections(ptf.pairs(ptf.strings(), ptf.doubles()))));
+  }
+
+  public void run(Pipeline pipeline, PTypeFamily typeFamily, boolean singleRun) throws IOException {
+    String inputFile = tmpDir.copyResourceFileName("docs.txt");
+    String outputPath1 = tmpDir.getFileName("output1");
+    String outputPath2 = tmpDir.getFileName("output2");
+
+    Path tfPath = tmpDir.getPath("termfreq");
+
+    PCollection<String> docs = pipeline.readTextFile(inputFile);
+
+    PTable<String, Collection<Pair<String, Double>>> results = generateTFIDF(docs, tfPath, typeFamily);
+    pipeline.writeTextFile(results, outputPath1);
+    if (!singleRun) {
+      pipeline.run();
+    }
+
+    PTable<String, Collection<Pair<String, Double>>> uppercased = results.parallelDo(
+        new MapKeysFn<String, String, Collection<Pair<String, Double>>>() {
+          @Override
+          public String map(String k1) {
+            return k1.toUpperCase();
+          }
+        }, results.getPTableType());
+    pipeline.writeTextFile(uppercased, outputPath2);
+    pipeline.done();
+
+    // Check the lowercase version...
+    File outputFile = new File(outputPath1, "part-r-00000");
+    List<String> lines = Files.readLines(outputFile, Charset.defaultCharset());
+    boolean passed = false;
+    for (String line : lines) {
+      if (line.startsWith("[the") && line.contains("B,0.6931471805599453")) {
+        passed = true;
+        break;
+      }
+    }
+    assertTrue(passed);
+
+    // ...and the uppercase version
+    outputFile = new File(outputPath2, "part-r-00000");
+    lines = Files.readLines(outputFile, Charset.defaultCharset());
+    passed = false;
+    for (String line : lines) {
+      if (line.startsWith("[THE") && line.contains("B,0.6931471805599453")) {
+        passed = true;
+        break;
+      }
+    }
+    assertTrue(passed);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/TupleNClassCastBugIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/TupleNClassCastBugIT.java b/crunch-core/src/it/java/org/apache/crunch/TupleNClassCastBugIT.java
new file mode 100644
index 0000000..e49f4d5
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/TupleNClassCastBugIT.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.List;
+
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.io.Files;
+
+
+public class TupleNClassCastBugIT {
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+
+  public static PCollection<TupleN> mapGroupDo(PCollection<String> lines, PTypeFamily ptf) {
+    PTable<String, TupleN> mapped = lines.parallelDo(new MapFn<String, Pair<String, TupleN>>() {
+
+      @Override
+      public Pair<String, TupleN> map(String line) {
+        String[] columns = line.split("\\t");
+        String docId = columns[0];
+        String docLine = columns[1];
+        return Pair.of(docId, new TupleN(docId, docLine));
+      }
+    }, ptf.tableOf(ptf.strings(), ptf.tuples(ptf.strings(), ptf.strings())));
+    return mapped.groupByKey().parallelDo(new DoFn<Pair<String, Iterable<TupleN>>, TupleN>() {
+      @Override
+      public void process(Pair<String, Iterable<TupleN>> input, Emitter<TupleN> tupleNEmitter) {
+        for (TupleN tuple : input.second()) {
+          tupleNEmitter.emit(tuple);
+        }
+      }
+    }, ptf.tuples(ptf.strings(), ptf.strings()));
+  }
+
+  @Test
+  public void testWritables() throws IOException {
+    run(new MRPipeline(TupleNClassCastBugIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testAvro() throws IOException {
+    run(new MRPipeline(TupleNClassCastBugIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance());
+  }
+
+  public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
+    String inputPath = tmpDir.copyResourceFileName("docs.txt");
+    String outputPath = tmpDir.getFileName("output");
+
+    PCollection<String> docLines = pipeline.readTextFile(inputPath);
+    pipeline.writeTextFile(mapGroupDo(docLines, typeFamily), outputPath);
+    pipeline.done();
+
+    // *** We are not directly testing the output, we are looking for a
+    // ClassCastException
+    // *** which is thrown in a different thread during the reduce phase. If all
+    // is well
+    // *** the file will exist and have six lines. Otherwise the bug is present.
+    File outputFile = new File(outputPath, "part-r-00000");
+    List<String> lines = Files.readLines(outputFile, Charset.defaultCharset());
+    int lineCount = 0;
+    for (String line : lines) {
+      lineCount++;
+    }
+    assertEquals(6, lineCount);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/UnionFromSameSourceIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/UnionFromSameSourceIT.java b/crunch-core/src/it/java/org/apache/crunch/UnionFromSameSourceIT.java
new file mode 100644
index 0000000..501a944
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/UnionFromSameSourceIT.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+
+import org.apache.crunch.fn.IdentityFn;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.writable.Writables;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Collection of tests re-using the same PCollection in various unions.
+ */
+public class UnionFromSameSourceIT {
+
+  private static final int NUM_ELEMENTS = 4;
+
+  @Rule
+  public transient TemporaryPath tmpDir = TemporaryPaths.create();
+
+  private Pipeline pipeline;
+  private PType<String> elementType = Writables.strings();
+  private PTableType<String, String> tableType = Writables.tableOf(Writables.strings(),
+    Writables.strings());
+
+  @Before
+  public void setUp() {
+    pipeline = new MRPipeline(UnionFromSameSourceIT.class, tmpDir.getDefaultConfiguration());
+  }
+
+  @Test
+  public void testUnion_SingleRead() throws IOException {
+    PCollection<String> strings = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt"));
+    PCollection<String> union = strings.union(strings.parallelDo(IdentityFn.<String> getInstance(),
+      strings.getPType()));
+
+    assertEquals(NUM_ELEMENTS * 2, getCount(union));
+  }
+
+  @Test
+  public void testUnion_TwoReads() throws IOException {
+    PCollection<String> stringsA = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt"));
+    PCollection<String> stringsB = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt"));
+
+    PCollection<String> union = stringsA.union(stringsB);
+
+    assertEquals(NUM_ELEMENTS * 2, getCount(union));
+  }
+
+  @Test
+  public void testDoubleUnion_EndingWithGBK() throws IOException {
+    runDoubleUnionPipeline(true);
+  }
+
+  @Test
+  public void testDoubleUnion_EndingWithoutGBK() throws IOException {
+    runDoubleUnionPipeline(false);
+  }
+
+  private void runDoubleUnionPipeline(boolean endWithGBK) throws IOException {
+    PCollection<String> strings = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt"));
+    PTable<String, String> tableA = strings.parallelDo("to table A", new ToTableFn(), tableType);
+    PTable<String, String> tableB = strings.parallelDo("to table B", new ToTableFn(), tableType);
+
+    PGroupedTable<String, String> groupedTable = tableA.union(tableB).groupByKey();
+    PCollection<String> ungrouped = groupedTable.parallelDo("ungroup before union",
+      new FromGroupedTableFn(), elementType).union(
+      strings.parallelDo("fake id", IdentityFn.<String> getInstance(), elementType));
+
+    PTable<String, String> table = ungrouped.parallelDo("union back to table", new ToTableFn(),
+      tableType);
+
+    if (endWithGBK) {
+      table = table.groupByKey().ungroup();
+    }
+
+    assertEquals(3 * NUM_ELEMENTS, getCount(table));
+  }
+
+  private int getCount(PCollection<?> pcollection) {
+    int cnt = 0;
+    for (Object v : pcollection.materialize()) {
+      cnt++;
+    }
+    return cnt;
+  }
+
+  private static class ToTableFn extends MapFn<String, Pair<String, String>> {
+
+    @Override
+    public Pair<String, String> map(String input) {
+      return Pair.of(input, input);
+    }
+
+  }
+
+  private static class FromGroupedTableFn extends DoFn<Pair<String, Iterable<String>>, String> {
+
+    @Override
+    public void process(Pair<String, Iterable<String>> input, Emitter<String> emitter) {
+      for (String value : input.second()) {
+        emitter.emit(value);
+      }
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/UnionIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/UnionIT.java b/crunch-core/src/it/java/org/apache/crunch/UnionIT.java
new file mode 100644
index 0000000..1c60a1b
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/UnionIT.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.crunch.fn.Aggregators;
+import org.apache.crunch.fn.IdentityFn;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.test.Tests;
+import org.apache.crunch.types.avro.Avros;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMultiset;
+
+
+public class UnionIT {
+
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+  private MRPipeline pipeline;
+  private PCollection<String> words1;
+  private PCollection<String> words2;
+
+  @Before
+  public void setUp() throws IOException {
+    pipeline = new MRPipeline(UnionIT.class, tmpDir.getDefaultConfiguration());
+    words1 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src1.txt")));
+    words2 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src2.txt")));
+  }
+
+  @After
+  public void tearDown() {
+    pipeline.done();
+  }
+
+  @Test
+  public void testUnion() throws Exception {
+    IdentityFn<String> identity = IdentityFn.getInstance();
+    words1 = words1.parallelDo(identity, Avros.strings());
+    words2 = words2.parallelDo(identity, Avros.strings());
+
+    PCollection<String> union = words1.union(words2);
+
+    ImmutableMultiset<String> actual = ImmutableMultiset.copyOf(union.materialize());
+    assertThat(actual.elementSet().size(), is(3));
+    assertThat(actual.count("a1"), is(4));
+    assertThat(actual.count("b2"), is(2));
+    assertThat(actual.count("c3"), is(2));
+  }
+
+  @Test
+  public void testTableUnion() throws IOException {
+    PTable<String, String> words1ByFirstLetter = byFirstLetter(words1);
+    PTable<String, String> words2ByFirstLetter = byFirstLetter(words2);
+
+    PTable<String, String> union = words1ByFirstLetter.union(words2ByFirstLetter);
+
+    ImmutableMultiset<Pair<String, String>> actual = ImmutableMultiset.copyOf(union.materialize());
+
+    assertThat(actual.elementSet().size(), is(3));
+    assertThat(actual.count(Pair.of("a", "1")), is(4));
+    assertThat(actual.count(Pair.of("b", "2")), is(2));
+    assertThat(actual.count(Pair.of("c", "3")), is(2));
+  }
+
+  @Test
+  public void testUnionThenGroupByKey() throws IOException {
+    PCollection<String> union = words1.union(words2);
+
+    PGroupedTable<String, String> grouped = byFirstLetter(union).groupByKey();
+
+    Map<String, String> actual = grouped.combineValues(Aggregators.STRING_CONCAT("", true))
+        .materializeToMap();
+
+    Map<String, String> expected = ImmutableMap.of("a", "1111", "b", "22", "c", "33");
+    assertThat(actual, is(expected));
+  }
+
+  @Test
+  public void testTableUnionThenGroupByKey() throws IOException {
+    PTable<String, String> words1ByFirstLetter = byFirstLetter(words1);
+    PTable<String, String> words2ByFirstLetter = byFirstLetter(words2);
+
+    PTable<String, String> union = words1ByFirstLetter.union(words2ByFirstLetter);
+
+    PGroupedTable<String, String> grouped = union.groupByKey();
+
+    Map<String, String> actual = grouped.combineValues(Aggregators.STRING_CONCAT("", true))
+        .materializeToMap();
+
+    Map<String, String> expected = ImmutableMap.of("a", "1111", "b", "22", "c", "33");
+    assertThat(actual, is(expected));
+  }
+
+
+  private static PTable<String, String> byFirstLetter(PCollection<String> values) {
+    return values.parallelDo("byFirstLetter", new FirstLetterKeyFn(),
+        Avros.tableOf(Avros.strings(), Avros.strings()));
+  }
+
+  private static class FirstLetterKeyFn extends DoFn<String, Pair<String, String>> {
+    @Override
+    public void process(String input, Emitter<Pair<String, String>> emitter) {
+      if (input.length() > 1) {
+        emitter.emit(Pair.of(input.substring(0, 1), input.substring(1)));
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/UnionResultsIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/UnionResultsIT.java b/crunch-core/src/it/java/org/apache/crunch/UnionResultsIT.java
new file mode 100644
index 0000000..df0511a
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/UnionResultsIT.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.At;
+import org.apache.crunch.test.CrunchTestSupport;
+import org.apache.crunch.types.writable.Writables;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class UnionResultsIT extends CrunchTestSupport implements Serializable {
+
+  static class StringLengthMapFn extends MapFn<String, Pair<String, Long>> {
+
+    @Override
+    public Pair<String, Long> map(String input) {
+      return new Pair<String, Long>(input, 10L);
+    }
+  }
+
+
+  /**
+   * Tests combining a GBK output with a map-only job output into a single
+   * unioned collection.
+   */
+  @Test
+  public void testUnionOfGroupedOutputAndNonGroupedOutput() throws IOException {
+    String inputPath = tempDir.copyResourceFileName("set1.txt");
+    String inputPath2 = tempDir.copyResourceFileName("set2.txt");
+
+    Pipeline pipeline = new MRPipeline(UnionResultsIT.class);
+
+    PCollection<String> set1Lines = pipeline.read(At.textFile(inputPath, Writables.strings()));
+    PCollection<Pair<String, Long>> set1Lengths = set1Lines.parallelDo(new StringLengthMapFn(),
+        Writables.pairs(Writables.strings(), Writables.longs()));
+    PCollection<Pair<String, Long>> set2Counts = pipeline.read(At.textFile(inputPath2, Writables.strings())).count();
+
+    PCollection<Pair<String, Long>> union = set1Lengths.union(set2Counts);
+
+    List<Pair<String, Long>> unionValues = Lists.newArrayList(union.materialize());
+    assertEquals(7, unionValues.size());
+
+    Set<Pair<String, Long>> expectedPairs = Sets.newHashSet();
+    expectedPairs.add(Pair.of("b", 10L));
+    expectedPairs.add(Pair.of("c", 10L));
+    expectedPairs.add(Pair.of("a", 10L));
+    expectedPairs.add(Pair.of("e", 10L));
+    expectedPairs.add(Pair.of("a", 1L));
+    expectedPairs.add(Pair.of("c", 1L));
+    expectedPairs.add(Pair.of("d", 1L));
+
+    assertEquals(expectedPairs, Sets.newHashSet(unionValues));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java b/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java
new file mode 100644
index 0000000..c646663
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.List;
+
+import org.apache.crunch.fn.Aggregators;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.At;
+import org.apache.crunch.io.To;
+import org.apache.crunch.lib.Aggregate;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+
+public class WordCountIT {
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+
+  enum WordCountStats {
+    ANDS
+  };
+
+  public static PTable<String, Long> wordCount(PCollection<String> words, PTypeFamily typeFamily) {
+    return Aggregate.count(words.parallelDo(new DoFn<String, String>() {
+
+      @Override
+      public void process(String line, Emitter<String> emitter) {
+        for (String word : line.split("\\s+")) {
+          emitter.emit(word);
+          if ("and".equals(word)) {
+            increment(WordCountStats.ANDS);
+          }
+        }
+      }
+    }, typeFamily.strings()));
+  }
+
+  public static PTable<String, Long> substr(PTable<String, Long> ptable) {
+    return ptable.parallelDo(new DoFn<Pair<String, Long>, Pair<String, Long>>() {
+
+      public void process(Pair<String, Long> input, Emitter<Pair<String, Long>> emitter) {
+        if (input.first().length() > 0) {
+          emitter.emit(Pair.of(input.first().substring(0, 1), input.second()));
+        }
+      }
+    }, ptable.getPTableType());
+  }
+
+  private boolean runSecond = false;
+  private boolean useToOutput = false;
+
+  @Test
+  public void testWritables() throws IOException {
+    run(new MRPipeline(WordCountIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testWritablesWithSecond() throws IOException {
+    runSecond = true;
+    run(new MRPipeline(WordCountIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testWritablesWithSecondUseToOutput() throws IOException {
+    runSecond = true;
+    useToOutput = true;
+    run(new MRPipeline(WordCountIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testAvro() throws IOException {
+    run(new MRPipeline(WordCountIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testAvroWithSecond() throws IOException {
+    runSecond = true;
+    run(new MRPipeline(WordCountIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testWithTopWritable() throws IOException {
+    runWithTop(WritableTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testWithTopAvro() throws IOException {
+    runWithTop(AvroTypeFamily.getInstance());
+  }
+
+  public void runWithTop(PTypeFamily tf) throws IOException {
+    Pipeline pipeline = new MRPipeline(WordCountIT.class, tmpDir.getDefaultConfiguration());
+    String inputPath = tmpDir.copyResourceFileName("shakes.txt");
+
+    PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, tf.strings()));
+    PTable<String, Long> wordCount = wordCount(shakespeare, tf);
+    List<Pair<String, Long>> top5 = Lists.newArrayList(Aggregate.top(wordCount, 5, true).materialize());
+    assertEquals(
+        ImmutableList.of(Pair.of("", 1470L), Pair.of("the", 620L), Pair.of("and", 427L), Pair.of("of", 396L),
+            Pair.of("to", 367L)), top5);
+  }
+
+  public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
+    String inputPath = tmpDir.copyResourceFileName("shakes.txt");
+    String outputPath = tmpDir.getFileName("output");
+
+    PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, typeFamily.strings()));
+    PTable<String, Long> wordCount = wordCount(shakespeare, typeFamily);
+    if (useToOutput) {
+      wordCount.write(To.textFile(outputPath));
+    } else {
+      pipeline.writeTextFile(wordCount, outputPath);
+    }
+
+    if (runSecond) {
+      String substrPath = tmpDir.getFileName("substr");
+      PTable<String, Long> we = substr(wordCount).groupByKey().combineValues(Aggregators.SUM_LONGS());
+      pipeline.writeTextFile(we, substrPath);
+    }
+    PipelineResult res = pipeline.done();
+    assertTrue(res.succeeded());
+    List<PipelineResult.StageResult> stageResults = res.getStageResults();
+    if (runSecond) {
+      assertEquals(2, stageResults.size());
+    } else {
+      assertEquals(1, stageResults.size());
+      assertEquals(427, stageResults.get(0).getCounterValue(WordCountStats.ANDS));
+    }
+
+    File outputFile = new File(outputPath, "part-r-00000");
+    List<String> lines = Files.readLines(outputFile, Charset.defaultCharset());
+    boolean passed = false;
+    for (String line : lines) {
+      if (line.startsWith("Macbeth\t28") || line.startsWith("[Macbeth,28]")) {
+        passed = true;
+        break;
+      }
+    }
+    assertTrue(passed);
+  }
+}