You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/04/23 22:41:24 UTC

[22/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java b/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java
new file mode 100644
index 0000000..5396fba
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.writable;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.Tuple4;
+import org.apache.crunch.TupleN;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class WritablesTest {
+
+  @Test
+  public void testNulls() throws Exception {
+    Void n = null;
+    NullWritable nw = NullWritable.get();
+    testInputOutputFn(Writables.nulls(), n, nw);
+  }
+
+  @Test
+  public void testStrings() throws Exception {
+    String s = "abc";
+    Text text = new Text(s);
+    testInputOutputFn(Writables.strings(), s, text);
+  }
+
+  @Test
+  public void testInts() throws Exception {
+    int j = 55;
+    IntWritable w = new IntWritable(j);
+    testInputOutputFn(Writables.ints(), j, w);
+  }
+
+  @Test
+  public void testLongs() throws Exception {
+    long j = 55;
+    LongWritable w = new LongWritable(j);
+    testInputOutputFn(Writables.longs(), j, w);
+  }
+
+  @Test
+  public void testFloats() throws Exception {
+    float j = 55.5f;
+    FloatWritable w = new FloatWritable(j);
+    testInputOutputFn(Writables.floats(), j, w);
+  }
+
+  @Test
+  public void testDoubles() throws Exception {
+    double j = 55.5d;
+    DoubleWritable w = new DoubleWritable(j);
+    testInputOutputFn(Writables.doubles(), j, w);
+  }
+
+  @Test
+  public void testBoolean() throws Exception {
+    boolean j = false;
+    BooleanWritable w = new BooleanWritable(j);
+    testInputOutputFn(Writables.booleans(), j, w);
+  }
+
+  @Test
+  public void testBytes() throws Exception {
+    byte[] bytes = new byte[] { 17, 26, -98 };
+    BytesWritable bw = new BytesWritable(bytes);
+    ByteBuffer bb = ByteBuffer.wrap(bytes);
+    testInputOutputFn(Writables.bytes(), bb, bw);
+  }
+
+  @Test
+  public void testCollections() throws Exception {
+    String s = "abc";
+    Collection<String> j = Lists.newArrayList();
+    j.add(s);
+    GenericArrayWritable<Text> w = new GenericArrayWritable<Text>(Text.class);
+    w.set(new Text[] { new Text(s) });
+    testInputOutputFn(Writables.collections(Writables.strings()), j, w);
+  }
+
+  @Test
+  public void testPairs() throws Exception {
+    Pair<String, String> j = Pair.of("a", "b");
+    TupleWritable w = new TupleWritable(new Text[] { new Text("a"), new Text("b"), });
+    w.setWritten(0);
+    w.setWritten(1);
+    testInputOutputFn(Writables.pairs(Writables.strings(), Writables.strings()), j, w);
+  }
+
+  @Test
+  public void testNestedTables() throws Exception {
+    PTableType<Long, Long> pll = Writables.tableOf(Writables.longs(), Writables.longs());
+    PTableType<Pair<Long, Long>, String> nest = Writables.tableOf(pll, Writables.strings());
+    assertNotNull(nest);
+  }
+
+  @Test
+  public void testPairEquals() throws Exception {
+    PType<Pair<Long, ByteBuffer>> t1 = Writables.pairs(Writables.longs(), Writables.bytes());
+    PType<Pair<Long, ByteBuffer>> t2 = Writables.pairs(Writables.longs(), Writables.bytes());
+    assertEquals(t1, t2);
+    assertEquals(t1.hashCode(), t2.hashCode());
+  }
+
+  @Test
+  @SuppressWarnings("rawtypes")
+  public void testTriples() throws Exception {
+    Tuple3 j = Tuple3.of("a", "b", "c");
+    TupleWritable w = new TupleWritable(new Text[] { new Text("a"), new Text("b"), new Text("c"), });
+    w.setWritten(0);
+    w.setWritten(1);
+    w.setWritten(2);
+    WritableType<?, ?> wt = Writables.triples(Writables.strings(), Writables.strings(), Writables.strings());
+    testInputOutputFn(wt, j, w);
+  }
+
+  @Test
+  @SuppressWarnings("rawtypes")
+  public void testQuads() throws Exception {
+    Tuple4 j = Tuple4.of("a", "b", "c", "d");
+    TupleWritable w = new TupleWritable(new Text[] { new Text("a"), new Text("b"), new Text("c"), new Text("d"), });
+    w.setWritten(0);
+    w.setWritten(1);
+    w.setWritten(2);
+    w.setWritten(3);
+    WritableType<?, ?> wt = Writables.quads(Writables.strings(), Writables.strings(), Writables.strings(),
+        Writables.strings());
+    testInputOutputFn(wt, j, w);
+  }
+
+  @Test
+  public void testTupleN() throws Exception {
+    TupleN j = new TupleN("a", "b", "c", "d", "e");
+    TupleWritable w = new TupleWritable(new Text[] { new Text("a"), new Text("b"), new Text("c"), new Text("d"),
+        new Text("e"), });
+    w.setWritten(0);
+    w.setWritten(1);
+    w.setWritten(2);
+    w.setWritten(3);
+    w.setWritten(4);
+    WritableType<?, ?> wt = Writables.tuples(Writables.strings(), Writables.strings(), Writables.strings(),
+        Writables.strings(), Writables.strings());
+    testInputOutputFn(wt, j, w);
+  }
+
+  protected static class TestWritable implements Writable {
+    String left;
+    int right;
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeUTF(left);
+      out.writeInt(right);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      left = in.readUTF();
+      right = in.readInt();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj)
+        return true;
+      if (obj == null)
+        return false;
+      if (getClass() != obj.getClass())
+        return false;
+      TestWritable other = (TestWritable) obj;
+      if (left == null) {
+        if (other.left != null)
+          return false;
+      } else if (!left.equals(other.left))
+        return false;
+      if (right != other.right)
+        return false;
+      return true;
+    }
+
+  }
+
+  @Test
+  public void testRecords() throws Exception {
+    TestWritable j = new TestWritable();
+    j.left = "a";
+    j.right = 1;
+    TestWritable w = new TestWritable();
+    w.left = "a";
+    w.right = 1;
+    WritableType<?, ?> wt = Writables.records(TestWritable.class);
+    testInputOutputFn(wt, j, w);
+  }
+
+  @Test
+  public void testTableOf() throws Exception {
+    Pair<String, String> j = Pair.of("a", "b");
+    Pair<Text, Text> w = Pair.of(new Text("a"), new Text("b"));
+    WritableTableType<String, String> wtt = Writables.tableOf(Writables.strings(), Writables.strings());
+    testInputOutputFn(wtt, j, w);
+  }
+
+  @Test
+  public void testRegister() throws Exception {
+    WritableType<TestWritable, TestWritable> wt = Writables.writables(TestWritable.class);
+    Writables.register(TestWritable.class, wt);
+    assertSame(Writables.records(TestWritable.class), wt);
+  }
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  protected static void testInputOutputFn(PType ptype, Object java, Object writable) {
+    ptype.getInputMapFn().initialize();
+    ptype.getOutputMapFn().initialize();
+    assertEquals(java, ptype.getInputMapFn().map(writable));
+    assertEquals(writable, ptype.getOutputMapFn().map(java));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/util/DistCacheTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/util/DistCacheTest.java b/crunch-core/src/test/java/org/apache/crunch/util/DistCacheTest.java
new file mode 100644
index 0000000..6784f14
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/util/DistCacheTest.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class DistCacheTest {
+
+  // A temporary folder used to hold files created for the test.
+  @Rule
+  public TemporaryFolder testFolder = new TemporaryFolder();
+
+  // A configuration and lists of paths to use in tests.
+  private Configuration testConf;
+  private String[] testFilePaths;
+  private String[] testFileQualifiedPaths;
+
+  /**
+   * Setup resources for tests. These include:
+   * <ol>
+   * <li>A Hadoop configuration.
+   * <li>A directory of temporary files that includes 3 .jar files and 1 other
+   * file.
+   * <li>Arrays containing the canonical paths and qualified paths to the test
+   * files.
+   * </ol>
+   */
+  @Before
+  public void setup() throws IOException {
+    // Create a configuration for tests.
+    testConf = new Configuration();
+
+    // Create the test files and add their paths to the list of test file paths.
+    testFilePaths = new String[3];
+    testFilePaths[0] = testFolder.newFile("jar1.jar").getCanonicalPath();
+    testFilePaths[1] = testFolder.newFile("jar2.jar").getCanonicalPath();
+    testFilePaths[2] = testFolder.newFile("jar3.jar").getCanonicalPath();
+    testFolder.newFile("notJar.other");
+
+    // Populate a list of qualified paths from the test file paths.
+    testFileQualifiedPaths = new String[3];
+    for (int i = 0; i < testFilePaths.length; i++) {
+      testFileQualifiedPaths[i] = "file:" + testFilePaths[i];
+    }
+  }
+
+  /**
+   * Tests adding jars one-by-one to a job's configuration.
+   * 
+   * @throws IOException
+   *           If there is a problem adding the jars.
+   */
+  @Test
+  public void testAddJar() throws IOException {
+    // Add each valid jar path to the distributed cache configuration, and
+    // verify each was
+    // added correctly in turn.
+    for (int i = 0; i < testFilePaths.length; i++) {
+      DistCache.addJarToDistributedCache(testConf, testFilePaths[i]);
+      assertEquals("tmpjars configuration var does not contain expected value.",
+          StringUtils.join(testFileQualifiedPaths, ",", 0, i + 1), testConf.get("tmpjars"));
+    }
+  }
+
+  /**
+   * Tests that attempting to add the path to a jar that does not exist to the
+   * configuration throws an exception.
+   * 
+   * @throws IOException
+   *           If the added jar path does not exist. This exception is expected.
+   */
+  @Test(expected = IOException.class)
+  public void testAddJarThatDoesntExist() throws IOException {
+    DistCache.addJarToDistributedCache(testConf, "/garbage/doesntexist.jar");
+  }
+
+  /**
+   * Tests that adding a directory of jars to the configuration works as
+   * expected. .jar files under the added directory should be added to the
+   * configuration, and all other files should be skipped.
+   * 
+   * @throws IOException
+   *           If there is a problem adding the jar directory to the
+   *           configuration.
+   */
+  @Test
+  public void testAddJarDirectory() throws IOException {
+    DistCache.addJarDirToDistributedCache(testConf, testFolder.getRoot().getCanonicalPath());
+    // Throw the added jar paths in a set to detect duplicates.
+    String[] splitJarPaths = StringUtils.split(testConf.get("tmpjars"), ",");
+    Set<String> addedJarPaths = new HashSet<String>();
+    for (String path : splitJarPaths) {
+      addedJarPaths.add(path);
+    }
+    assertEquals("Incorrect number of jar paths added.", testFilePaths.length, addedJarPaths.size());
+
+    // Ensure all expected paths were added.
+    for (int i = 0; i < testFileQualifiedPaths.length; i++) {
+      assertTrue("Expected jar path missing from jar paths added to tmpjars: " + testFileQualifiedPaths[i],
+          addedJarPaths.contains(testFileQualifiedPaths[i]));
+    }
+  }
+
+  /**
+   * Tests that adding a jar directory that does not exist to the configuration
+   * throws an exception.
+   * 
+   * @throws IOException
+   *           If the added jar directory does not exist. This exception is
+   *           expected.
+   */
+  @Test(expected = IOException.class)
+  public void testAddJarDirectoryThatDoesntExist() throws IOException {
+    DistCache.addJarDirToDistributedCache(testConf, "/garbage/doesntexist");
+  }
+
+  /**
+   * Tests that adding a jar directory that is not a directory to the
+   * configuration throws an exception.
+   * 
+   * @throws IOException
+   *           If the added jar directory is not a directory. This exception is
+   *           expected.
+   */
+  @Test(expected = IOException.class)
+  public void testAddJarDirectoryNotDirectory() throws IOException {
+    DistCache.addJarDirToDistributedCache(testConf, testFilePaths[0]);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-dist/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-dist/pom.xml b/crunch-dist/pom.xml
index 749a767..cdd4256 100644
--- a/crunch-dist/pom.xml
+++ b/crunch-dist/pom.xml
@@ -35,7 +35,7 @@ under the License.
   <dependencies>
     <dependency>
       <groupId>org.apache.crunch</groupId>
-      <artifactId>crunch</artifactId>
+      <artifactId>crunch-core</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.crunch</groupId>

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-examples/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-examples/pom.xml b/crunch-examples/pom.xml
index fd790c3..fcbe30c 100644
--- a/crunch-examples/pom.xml
+++ b/crunch-examples/pom.xml
@@ -36,7 +36,7 @@ under the License.
 
     <dependency>
       <groupId>org.apache.crunch</groupId>
-      <artifactId>crunch</artifactId>
+      <artifactId>crunch-core</artifactId>
     </dependency>
 
     <dependency>

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-hbase/pom.xml b/crunch-hbase/pom.xml
index 656c6cc..df21ef8 100644
--- a/crunch-hbase/pom.xml
+++ b/crunch-hbase/pom.xml
@@ -31,7 +31,7 @@ under the License.
   <dependencies>
     <dependency>
       <groupId>org.apache.crunch</groupId>
-      <artifactId>crunch</artifactId>
+      <artifactId>crunch-core</artifactId>
     </dependency>
 
     <dependency>

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-scrunch/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-scrunch/pom.xml b/crunch-scrunch/pom.xml
index 7db5ac7..b97766a 100644
--- a/crunch-scrunch/pom.xml
+++ b/crunch-scrunch/pom.xml
@@ -43,7 +43,7 @@ under the License.
     </dependency>
     <dependency>
       <groupId>org.apache.crunch</groupId>
-      <artifactId>crunch</artifactId>
+      <artifactId>crunch-core</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/pom.xml
----------------------------------------------------------------------
diff --git a/crunch/pom.xml b/crunch/pom.xml
deleted file mode 100644
index 2a38913..0000000
--- a/crunch/pom.xml
+++ /dev/null
@@ -1,182 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.crunch</groupId>
-    <artifactId>crunch-parent</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
-  </parent>
-
-  <artifactId>crunch</artifactId>
-  <name>Apache Crunch Core</name>
-
-  <dependencies>
-    <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.avro</groupId>
-      <artifactId>avro</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.avro</groupId>
-      <artifactId>avro-mapred</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.javassist</groupId>
-      <artifactId>javassist</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-client</artifactId>
-      <scope>provided</scope>
-    </dependency>
-
-    <!-- Override the slf4j dependency from Avro, which is incompatible with
-         Hadoop's. -->
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-      <scope>provided</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>commons-codec</groupId>
-      <artifactId>commons-codec</artifactId>
-      <scope>provided</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.codehaus.jackson</groupId>
-      <artifactId>jackson-core-asl</artifactId>
-      <scope>provided</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.codehaus.jackson</groupId>
-      <artifactId>jackson-mapper-asl</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    
-    <!-- Both Protobufs and Thrift are supported as
-         derived serialization types, and you can use
-         (almost) any version of them you like, Crunch
-         only relies on the stable public APIs, not the
-         structure of the files themselves.
-
-         Both dependencies are scoped as provided, in
-         order to not expand the size of the assembly jars
-         unnecessarily.
-    -->
-
-    <dependency>
-      <groupId>com.google.protobuf</groupId>
-      <artifactId>protobuf-java</artifactId>
-      <scope>provided</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.thrift</groupId>
-      <artifactId>libthrift</artifactId>
-      <scope>provided</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>commons-logging</groupId>
-      <artifactId>commons-logging</artifactId>
-      <scope>provided</scope>
-    </dependency>
-   
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-log4j12</artifactId>
-      <scope>provided</scope>
-    </dependency>
-
-    <!-- Used by LocalJobRunner in integration tests -->
-    <dependency>
-      <groupId>commons-httpclient</groupId>
-      <artifactId>commons-httpclient</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.crunch</groupId>
-      <artifactId>crunch-test</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.mockito</groupId>
-      <artifactId>mockito-all</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.hamcrest</groupId>
-      <artifactId>hamcrest-all</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-  </dependencies>
-
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.codehaus.mojo</groupId>
-        <artifactId>build-helper-maven-plugin</artifactId>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-failsafe-plugin</artifactId>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.avro</groupId>
-        <artifactId>avro-maven-plugin</artifactId>
-        <executions>
-          <execution>
-            <id>schemas</id>
-            <phase>generate-sources</phase>
-            <goals>
-              <goal>schema</goal>
-            </goals>
-            <configuration>
-              <testSourceDirectory>${project.basedir}/src/test/avro/</testSourceDirectory>
-              <testOutputDirectory>target/generated-test-sources/</testOutputDirectory>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
-  </build>
-
-</project>

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/CancelJobsIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/CancelJobsIT.java b/crunch/src/it/java/org/apache/crunch/CancelJobsIT.java
deleted file mode 100644
index ff01a2f..0000000
--- a/crunch/src/it/java/org/apache/crunch/CancelJobsIT.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.To;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.junit.Rule;
-import org.junit.Test;
-
-/**
- *
- */
-public class CancelJobsIT {
-
-  @Rule
-  public TemporaryPath tmpDir = TemporaryPaths.create();
-
-  @Test
-  public void testRun() throws Exception {
-    PipelineExecution pe = run();
-    pe.waitUntilDone();
-    PipelineResult pr = pe.getResult();
-    assertEquals(PipelineExecution.Status.SUCCEEDED, pe.getStatus());
-    assertEquals(2, pr.getStageResults().size());
-  }
-  
-  @Test
-  public void testKill() throws Exception {
-    PipelineExecution pe = run();
-    pe.kill();
-    pe.waitUntilDone();
-    assertEquals(PipelineExecution.Status.KILLED, pe.getStatus());
-  }
-
-  @Test
-  public void testKillMultipleTimes() throws Exception {
-    PipelineExecution pe = run();
-    for (int i = 0; i < 10; i++) {
-      pe.kill();
-    }
-    pe.waitUntilDone();
-    assertEquals(PipelineExecution.Status.KILLED, pe.getStatus());
-  }
-
-  @Test
-  public void testKillAfterDone() throws Exception {
-    PipelineExecution pe = run();
-    pe.waitUntilDone();
-    assertEquals(PipelineExecution.Status.SUCCEEDED, pe.getStatus());
-    pe.kill(); // expect no-op
-    assertEquals(PipelineExecution.Status.SUCCEEDED, pe.getStatus());
-  }
-  
-  public PipelineExecution run() throws IOException {
-    String shakes = tmpDir.copyResourceFileName("shakes.txt");
-    String out = tmpDir.getFileName("cancel");
-    Pipeline p = new MRPipeline(CancelJobsIT.class, tmpDir.getDefaultConfiguration());
-    PCollection<String> words = p.readTextFile(shakes);
-    p.write(words.count().top(20), To.textFile(out));
-    return p.runAsync(); // need to hack to slow down job start up if this test becomes flaky.
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/CleanTextIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/CleanTextIT.java b/crunch/src/it/java/org/apache/crunch/CleanTextIT.java
deleted file mode 100644
index 2f4004e..0000000
--- a/crunch/src/it/java/org/apache/crunch/CleanTextIT.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.nio.charset.Charset;
-import java.util.List;
-
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.To;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.avro.Avros;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.io.Files;
-
-/**
- *
- */
-public class CleanTextIT {
-
-  private static final int LINES_IN_SHAKES = 3667;
-  
-  @Rule
-  public TemporaryPath tmpDir = TemporaryPaths.create();
-  
-  static DoFn<String, String> CLEANER = new DoFn<String, String>() {
-    @Override
-    public void process(String input, Emitter<String> emitter) {
-      emitter.emit(input.toLowerCase());
-    }
-  };
-  
-  static DoFn<String, String> SPLIT = new DoFn<String, String>() {
-    @Override
-    public void process(String input, Emitter<String> emitter) {
-      for (String word : input.split("\\S+")) {
-        if (!word.isEmpty()) {
-          emitter.emit(word);
-        }
-      }
-    }
-  };
-  
-  @Test
-  public void testMapSideOutputs() throws Exception {
-    Pipeline pipeline = new MRPipeline(CleanTextIT.class, tmpDir.getDefaultConfiguration());
-    String shakesInputPath = tmpDir.copyResourceFileName("shakes.txt");
-    PCollection<String> shakespeare = pipeline.readTextFile(shakesInputPath);
-    
-    PCollection<String> cleanShakes = shakespeare.parallelDo(CLEANER, Avros.strings());
-    File cso = tmpDir.getFile("cleanShakes");
-    cleanShakes.write(To.textFile(cso.getAbsolutePath()));
-    
-    File wc = tmpDir.getFile("wordCounts");
-    cleanShakes.parallelDo(SPLIT, Avros.strings()).count().write(To.textFile(wc.getAbsolutePath()));
-    pipeline.done();
-    
-    File cleanFile = new File(cso, "part-m-00000");
-    List<String> lines = Files.readLines(cleanFile, Charset.defaultCharset());
-    assertEquals(LINES_IN_SHAKES, lines.size());
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/CollectionPObjectIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/CollectionPObjectIT.java b/crunch/src/it/java/org/apache/crunch/CollectionPObjectIT.java
deleted file mode 100644
index 7e0c75c..0000000
--- a/crunch/src/it/java/org/apache/crunch/CollectionPObjectIT.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.lang.String;
-import java.util.Collection;
-
-import org.apache.crunch.PCollection;
-import org.apache.crunch.PObject;
-import org.apache.crunch.Pipeline;
-import org.apache.crunch.impl.mem.MemPipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.materialize.pobject.CollectionPObject;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.junit.Rule;
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class CollectionPObjectIT {
-
-  private static final int LINES_IN_SHAKES = 3667;
-
-  private static final String FIRST_SHAKESPEARE_LINE =
-      "***The Project Gutenberg's Etext of Shakespeare's First Folio***";
-
-  private static final String LAST_SHAKESPEARE_LINE =
-      "FINIS. THE TRAGEDIE OF MACBETH.";
-
-  @Rule
-  public TemporaryPath tmpDir = TemporaryPaths.create();
-
-  @Test
-  public void testPObjectMRPipeline() throws IOException {
-    runPObject(new MRPipeline(CollectionPObjectIT.class, tmpDir.getDefaultConfiguration()));
-  }
-
-  @Test
-  public void testAsCollectionMRPipeline() throws IOException {
-    runAsCollection(new MRPipeline(CollectionPObjectIT.class, tmpDir.getDefaultConfiguration()));
-  }
-
-  @Test
-  public void testPObjectMemPipeline() throws IOException {
-    runPObject(MemPipeline.getInstance());
-  }
-
-  @Test
-  public void testAsCollectionMemPipeline() throws IOException {
-    runAsCollection(MemPipeline.getInstance());
-  }
-
-  private PCollection<String> getPCollection(Pipeline pipeline) throws IOException {
-    String shakesInputPath = tmpDir.copyResourceFileName("shakes.txt");
-    PCollection<String> shakespeare = pipeline.readTextFile(shakesInputPath);
-    return shakespeare;
-  }
-
-  private void verifyLines(String[] lines) {
-    assertEquals("Not enough lines in Shakespeare.", LINES_IN_SHAKES, lines.length);
-    assertEquals("First line in Shakespeare is wrong.", FIRST_SHAKESPEARE_LINE, lines[0]);
-    assertEquals("Last line in Shakespeare is wrong.", LAST_SHAKESPEARE_LINE,
-        lines[lines.length - 1]);
-  }
-
-  public void runPObject(Pipeline pipeline) throws IOException {
-    PCollection<String> shakespeare = getPCollection(pipeline);
-    PObject<Collection<String>> linesP = new CollectionPObject<String>(shakespeare);
-    String[] lines = new String[LINES_IN_SHAKES];
-    lines = linesP.getValue().toArray(lines);
-    verifyLines(lines);
-  }
-
-  public void runAsCollection(Pipeline pipeline) throws IOException {
-    PCollection<String> shakespeare = getPCollection(pipeline);
-    String[] lines = new String[LINES_IN_SHAKES];
-    lines = shakespeare.asCollection().getValue().toArray(lines);
-    verifyLines(lines);
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/CollectionsIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/CollectionsIT.java b/crunch/src/it/java/org/apache/crunch/CollectionsIT.java
deleted file mode 100644
index 17d0cae..0000000
--- a/crunch/src/it/java/org/apache/crunch/CollectionsIT.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import org.apache.crunch.fn.Aggregators.SimpleAggregator;
-import org.apache.crunch.impl.mem.MemPipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
-@SuppressWarnings("serial")
-public class CollectionsIT {
-
-  private static class AggregateStringListFn extends SimpleAggregator<Collection<String>> {
-    private final Collection<String> rtn = Lists.newArrayList();
-
-    @Override
-    public void reset() {
-      rtn.clear();
-    }
-
-    @Override
-    public void update(Collection<String> values) {
-      rtn.addAll(values);
-    }
-
-    @Override
-    public Iterable<Collection<String>> results() {
-      return ImmutableList.of(rtn);
-    }
-  }
-
-  private static PTable<String, Collection<String>> listOfCharcters(PCollection<String> lines, PTypeFamily typeFamily) {
-
-    return lines.parallelDo(new DoFn<String, Pair<String, Collection<String>>>() {
-      @Override
-      public void process(String line, Emitter<Pair<String, Collection<String>>> emitter) {
-        for (String word : line.split("\\s+")) {
-          Collection<String> characters = Lists.newArrayList();
-          for (char c : word.toCharArray()) {
-            characters.add(String.valueOf(c));
-          }
-          emitter.emit(Pair.of(word, characters));
-        }
-      }
-    }, typeFamily.tableOf(typeFamily.strings(), typeFamily.collections(typeFamily.strings())))
-        .groupByKey().combineValues(new AggregateStringListFn());
-  }
-
-  @Rule
-  public TemporaryPath tmpDir = TemporaryPaths.create();
-
-  @Test
-  public void testWritables() throws IOException {
-    run(new MRPipeline(CollectionsIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance());
-  }
-
-  @Test
-  public void testAvro() throws IOException {
-    run(new MRPipeline(CollectionsIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance());
-  }
-
-  @Test
-  public void testInMemoryWritables() throws IOException {
-    run(MemPipeline.getInstance(), WritableTypeFamily.getInstance());
-  }
-
-  @Test
-  public void testInMemoryAvro() throws IOException {
-    run(MemPipeline.getInstance(), AvroTypeFamily.getInstance());
-  }
-
-  public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
-    String shakesInputPath = tmpDir.copyResourceFileName("shakes.txt");
-
-    PCollection<String> shakespeare = pipeline.readTextFile(shakesInputPath);
-    Iterable<Pair<String, Collection<String>>> lines = listOfCharcters(shakespeare, typeFamily).materialize();
-
-    boolean passed = false;
-    for (Pair<String, Collection<String>> line : lines) {
-      if (line.first().startsWith("yellow")) {
-        passed = true;
-        break;
-      }
-    }
-    pipeline.done();
-    assertTrue(passed);
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/CollectionsLengthIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/CollectionsLengthIT.java b/crunch/src/it/java/org/apache/crunch/CollectionsLengthIT.java
deleted file mode 100644
index 3a38b92..0000000
--- a/crunch/src/it/java/org/apache/crunch/CollectionsLengthIT.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.lang.Long;
-
-import org.apache.crunch.impl.mem.MemPipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-import org.junit.Rule;
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class CollectionsLengthIT {
-
-  public static final Long LINES_IN_SHAKESPEARE = 3667L;
-
-  @Rule
-  public TemporaryPath tmpDir = TemporaryPaths.create();
-
-  @Test
-  public void testWritables() throws IOException {
-    run(new MRPipeline(CollectionsIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance());
-  }
-
-  @Test
-  public void testAvro() throws IOException {
-    run(new MRPipeline(CollectionsIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance());
-  }
-
-  @Test
-  public void testInMemoryWritables() throws IOException {
-    run(MemPipeline.getInstance(), WritableTypeFamily.getInstance());
-  }
-
-  @Test
-  public void testInMemoryAvro() throws IOException {
-    run(MemPipeline.getInstance(), AvroTypeFamily.getInstance());
-  }
-
-  public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
-    String shakesInputPath = tmpDir.copyResourceFileName("shakes.txt");
-
-    PCollection<String> shakespeare = pipeline.readTextFile(shakesInputPath);
-    Long length = shakespeare.length().getValue();
-    assertEquals("Incorrect length for shakespear PCollection.", LINES_IN_SHAKESPEARE, length);
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/DeepCopyCustomTuplesIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/DeepCopyCustomTuplesIT.java b/crunch/src/it/java/org/apache/crunch/DeepCopyCustomTuplesIT.java
deleted file mode 100644
index f1323ca..0000000
--- a/crunch/src/it/java/org/apache/crunch/DeepCopyCustomTuplesIT.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.apache.crunch.types.avro.Avros.*;
-import static org.junit.Assert.assertEquals;
-
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PType;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.Iterables;
-
-/**
- *
- */
-public class DeepCopyCustomTuplesIT {
-  @Rule
-  public TemporaryPath tmpDir = TemporaryPaths.create();
-  
-  public static class PID extends Pair<Integer, String> {
-    public PID(Integer first, String second) {
-      super(first, second);
-    }
-  }
-  
-  private static PType<PID> pids = tuples(PID.class, ints(), strings());
-  
-  @Test
-  public void testDeepCopyCustomTuple() throws Exception {
-    Pipeline p = new MRPipeline(DeepCopyCustomTuplesIT.class, tmpDir.getDefaultConfiguration());
-    String shakesInputPath = tmpDir.copyResourceFileName("shakes.txt");
-    PCollection<String> shakes = p.readTextFile(shakesInputPath);
-    Iterable<String> out = shakes
-        .parallelDo(new PreProcFn(), tableOf(ints(), pairs(ints(), pids)))
-        .groupByKey()
-        .parallelDo(new PostProcFn(), strings())
-        .materialize();
-    assertEquals(65, Iterables.size(out));
-    p.done();
-  }
-  
-  private static class PreProcFn extends MapFn<String, Pair<Integer, Pair<Integer, PID>>> {
-    private int counter = 0;
-    @Override
-    public Pair<Integer, Pair<Integer, PID>> map(String input) {
-      return Pair.of(counter++, Pair.of(counter++, new PID(input.length(), input)));
-    }
-  };
-  
-  private static class PostProcFn extends DoFn<Pair<Integer, Iterable<Pair<Integer, PID>>>, String> {
-    @Override
-    public void process(Pair<Integer, Iterable<Pair<Integer, PID>>> input, Emitter<String> emitter) {
-      for (Pair<Integer, PID> p : input.second()) {
-        if (p.second().first() > 0 && p.second().first() < 10) {
-          emitter.emit(p.second().second());
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/EnumPairIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/EnumPairIT.java b/crunch/src/it/java/org/apache/crunch/EnumPairIT.java
deleted file mode 100644
index 1d0974e..0000000
--- a/crunch/src/it/java/org/apache/crunch/EnumPairIT.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PTypes;
-import org.apache.crunch.types.writable.Writables;
-import org.junit.Rule;
-import org.junit.Test;
-
-public class EnumPairIT implements Serializable {
-  @Rule
-  public transient TemporaryPath tmpDir = TemporaryPaths.create();
-
-  static enum etypes {
-    type1,
-  }
-
-  @Test
-  public void testEnumPTypes() throws IOException {
-    String inputFile1 = tmpDir.copyResourceFileName("set1.txt");
-    Pipeline pipeline = new MRPipeline(EnumPairIT.class);
-    PCollection<String> set1 = pipeline.readTextFile(inputFile1);
-    PTable<String, etypes> data = set1.parallelDo(new DoFn<String, Pair<String, etypes>>() {
-      @Override
-      public void process(String input, Emitter<Pair<String, etypes>> emitter) {
-        emitter.emit(new Pair<String, etypes>(input, etypes.type1));
-      }
-    }, Writables.tableOf(Writables.strings(), PTypes.enums(etypes.class, set1.getTypeFamily())));
-
-    Iterable<Pair<String, etypes>> materialized = data.materialize();
-    pipeline.run();
-    for (Pair<String, etypes> pair : materialized) {
-      assertEquals(etypes.type1, pair.second());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/FirstElementPObjectIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/FirstElementPObjectIT.java b/crunch/src/it/java/org/apache/crunch/FirstElementPObjectIT.java
deleted file mode 100644
index d985e10..0000000
--- a/crunch/src/it/java/org/apache/crunch/FirstElementPObjectIT.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.lang.String;
-
-import org.apache.crunch.PCollection;
-import org.apache.crunch.PObject;
-import org.apache.crunch.impl.mem.MemPipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.materialize.pobject.FirstElementPObject;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.junit.Rule;
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class FirstElementPObjectIT {
-
-  private static final String FIRST_SHAKESPEARE_LINE =
-      "***The Project Gutenberg's Etext of Shakespeare's First Folio***";
-
-  @Rule
-  public TemporaryPath tmpDir = TemporaryPaths.create();
-
-  @Test
-  public void testMRPipeline() throws IOException {
-    run(new MRPipeline(FirstElementPObjectIT.class, tmpDir.getDefaultConfiguration()));
-  }
-
-  @Test
-  public void testInMemoryPipeline() throws IOException {
-    run(MemPipeline.getInstance());
-  }
-
-  public void run(Pipeline pipeline) throws IOException {
-    String shakesInputPath = tmpDir.copyResourceFileName("shakes.txt");
-    PCollection<String> shakespeare = pipeline.readTextFile(shakesInputPath);
-    PObject<String> firstLine = new FirstElementPObject<String>(shakespeare);
-    String first = firstLine.getValue();
-    assertEquals("First line in Shakespeare is wrong.", FIRST_SHAKESPEARE_LINE, first);
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/IterableReuseProtectionIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/IterableReuseProtectionIT.java b/crunch/src/it/java/org/apache/crunch/IterableReuseProtectionIT.java
deleted file mode 100644
index da487eb..0000000
--- a/crunch/src/it/java/org/apache/crunch/IterableReuseProtectionIT.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.crunch.fn.IdentityFn;
-import org.apache.crunch.impl.mem.MemPipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.writable.Writables;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-/**
- * Verify that calling the iterator method on a Reducer-based Iterable 
- * is forcefully disallowed.
- */
-public class IterableReuseProtectionIT {
-
-  @Rule
-  public TemporaryPath tmpDir = TemporaryPaths.create();
-  
-  
-  public void checkIteratorReuse(Pipeline pipeline) throws IOException {
-    Iterable<String> values = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt"))
-        .by(IdentityFn.<String>getInstance(), Writables.strings())
-        .groupByKey()
-        .combineValues(new TestIterableReuseFn())
-        .values().materialize();
-    
-    List<String> valueList = Lists.newArrayList(values);
-    Collections.sort(valueList);
-    assertEquals(Lists.newArrayList("a", "b", "c", "e"), valueList);
-  }
-  
-  @Test
-  public void testIteratorReuse_MRPipeline() throws IOException {
-    checkIteratorReuse(new MRPipeline(IterableReuseProtectionIT.class, tmpDir.getDefaultConfiguration()));
-  }
-  
-  @Test
-  public void testIteratorReuse_InMemoryPipeline() throws IOException {
-    checkIteratorReuse(MemPipeline.getInstance());
-  }
-  
-  static class TestIterableReuseFn extends CombineFn<String, String> {
-
-    @Override
-    public void process(Pair<String, Iterable<String>> input, Emitter<Pair<String, String>> emitter) {
-      StringBuilder combinedBuilder = new StringBuilder();
-      for (String v : input.second()) {
-        combinedBuilder.append(v);
-      }
-      
-      try {
-        input.second().iterator();
-        throw new RuntimeException("Second call to iterator should throw an exception");
-      } catch (IllegalStateException e) {
-        // Expected situation
-      }
-      emitter.emit(Pair.of(input.first(), combinedBuilder.toString()));
-    }
-    
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java b/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java
deleted file mode 100644
index 7670e88..0000000
--- a/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.crunch.fn.FilterFns;
-import org.apache.crunch.fn.IdentityFn;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.To;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.writable.Writables;
-import org.junit.Rule;
-import org.junit.Test;
-
-public class MRPipelineIT implements Serializable {
-  @Rule
-  public transient TemporaryPath tmpDir = TemporaryPaths.create();
-
-  @Test
-  public void materializedColShouldBeWritten() throws Exception {
-    File textFile = tmpDir.copyResourceFile("shakes.txt");
-    Pipeline pipeline = new MRPipeline(MRPipelineIT.class, tmpDir.getDefaultConfiguration());
-    PCollection<String> genericCollection = pipeline.readTextFile(textFile.getAbsolutePath());
-    pipeline.run();
-    PCollection<String> filter = genericCollection.filter("Filtering data", FilterFns.<String>ACCEPT_ALL());
-    filter.materialize();
-    pipeline.run();
-    File file = tmpDir.getFile("output.txt");
-    Target outFile = To.textFile(file.getAbsolutePath());
-    PCollection<String> write = filter.write(outFile);
-    write.materialize();
-    pipeline.run();
-  }
-  
-  
-  
-  @Test
-  public void testPGroupedTableToMultipleOutputs() throws IOException{
-    Pipeline pipeline = new MRPipeline(MRPipelineIT.class, tmpDir.getDefaultConfiguration());
-    PGroupedTable<String, String> groupedLineTable = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt")).by(IdentityFn.<String>getInstance(), Writables.strings()).groupByKey();
-    
-    PTable<String, String> ungroupedTableA = groupedLineTable.ungroup();
-    PTable<String, String> ungroupedTableB = groupedLineTable.ungroup();
-    
-    File outputDirA = tmpDir.getFile("output_a");
-    File outputDirB = tmpDir.getFile("output_b");
-    
-    pipeline.writeTextFile(ungroupedTableA, outputDirA.getAbsolutePath());
-    pipeline.writeTextFile(ungroupedTableB, outputDirB.getAbsolutePath());
-    pipeline.done();
-
-    // Verify that output from a single PGroupedTable can be sent to multiple collections
-    assertTrue(new File(outputDirA, "part-r-00000").exists());
-    assertTrue(new File(outputDirB, "part-r-00000").exists());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/MapPObjectIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/MapPObjectIT.java b/crunch/src/it/java/org/apache/crunch/MapPObjectIT.java
deleted file mode 100644
index c48284f..0000000
--- a/crunch/src/it/java/org/apache/crunch/MapPObjectIT.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static junit.framework.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.crunch.impl.mem.MemPipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.materialize.pobject.MapPObject;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PTypeFamily;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.ImmutableList;
-
-public class MapPObjectIT {
-
-  static final ImmutableList<Pair<Integer, String>> kvPairs = ImmutableList.of(Pair.of(0, "a"), Pair.of(1, "b"),
-      Pair.of(2, "c"), Pair.of(3, "e"));
-
-  public void assertMatches(Map<Integer, String> m) {
-    for (Integer k : m.keySet()) {
-      assertEquals(kvPairs.get(k).second(), m.get(k));
-    }
-  }
-
-  private static class Set1Mapper extends MapFn<String, Pair<Integer, String>> {
-    @Override
-    public Pair<Integer, String> map(String input) {
-
-      int k = -1;
-      if (input.equals("a"))
-        k = 0;
-      else if (input.equals("b"))
-        k = 1;
-      else if (input.equals("c"))
-        k = 2;
-      else if (input.equals("e"))
-        k = 3;
-      return Pair.of(k, input);
-    }
-  }
-  @Rule
-  public TemporaryPath tmpDir = TemporaryPaths.create();
-
-  @Test
-  public void testMemMapPObject() {
-    PTable<Integer, String> table = MemPipeline.tableOf(kvPairs);
-    PObject<Map<Integer, String>> map = new MapPObject<Integer, String>(table);
-    assertMatches(map.getValue());
-  }
-
-  @Test
-  public void testMemAsMap() {
-    PTable<Integer, String> table = MemPipeline.tableOf(kvPairs);
-    assertMatches(table.asMap().getValue());
-  }
-
-  private PTable<Integer, String> getMRPTable() throws IOException {
-    Pipeline p = new MRPipeline(MaterializeToMapIT.class, tmpDir.getDefaultConfiguration());
-    String inputFile = tmpDir.copyResourceFileName("set1.txt");
-    PCollection<String> c = p.readTextFile(inputFile);
-    PTypeFamily tf = c.getTypeFamily();
-    PTable<Integer, String> table = c.parallelDo(new Set1Mapper(), tf.tableOf(tf.ints(),
-        tf.strings()));
-    return table;
-  }
-
-  @Test
-  public void testMRMapPObject() throws IOException {
-    PTable<Integer, String> table = getMRPTable();
-    PObject<Map<Integer, String>> map = new MapPObject<Integer, String>(table);
-    assertMatches(map.getValue());
-  }
-
-  @Test
-  public void testMRAsMap() throws IOException {
-    PTable<Integer, String> table = getMRPTable();
-    assertMatches(table.asMap().getValue());
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/MapsIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/MapsIT.java b/crunch/src/it/java/org/apache/crunch/MapsIT.java
deleted file mode 100644
index 5b3187b..0000000
--- a/crunch/src/it/java/org/apache/crunch/MapsIT.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-
-import java.util.Map;
-
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-
-public class MapsIT {
-  @Rule
-  public TemporaryPath tmpDir = TemporaryPaths.create();
-
-  @Test
-  public void testWritables() throws Exception {
-    run(WritableTypeFamily.getInstance(), tmpDir);
-  }
-
-  @Test
-  public void testAvros() throws Exception {
-    run(AvroTypeFamily.getInstance(), tmpDir);
-  }
-
-  public static void run(PTypeFamily typeFamily, TemporaryPath tmpDir) throws Exception {
-    Pipeline pipeline = new MRPipeline(MapsIT.class, tmpDir.getDefaultConfiguration());
-    String shakesInputPath = tmpDir.copyResourceFileName("shakes.txt");
-    PCollection<String> shakespeare = pipeline.readTextFile(shakesInputPath);
-    Iterable<Pair<String, Map<String, Long>>> output = shakespeare
-        .parallelDo(new DoFn<String, Pair<String, Map<String, Long>>>() {
-          @Override
-          public void process(String input, Emitter<Pair<String, Map<String, Long>>> emitter) {
-            String last = null;
-            for (String word : input.toLowerCase().split("\\W+")) {
-              if (!word.isEmpty()) {
-                String firstChar = word.substring(0, 1);
-                if (last != null) {
-                  Map<String, Long> cc = ImmutableMap.of(firstChar, 1L);
-                  emitter.emit(Pair.of(last, cc));
-                }
-                last = firstChar;
-              }
-            }
-          }
-        }, typeFamily.tableOf(typeFamily.strings(), typeFamily.maps(typeFamily.longs()))).groupByKey()
-        .combineValues(new CombineFn<String, Map<String, Long>>() {
-          @Override
-          public void process(Pair<String, Iterable<Map<String, Long>>> input,
-              Emitter<Pair<String, Map<String, Long>>> emitter) {
-            Map<String, Long> agg = Maps.newHashMap();
-            for (Map<String, Long> in : input.second()) {
-              for (Map.Entry<String, Long> e : in.entrySet()) {
-                if (!agg.containsKey(e.getKey())) {
-                  agg.put(e.getKey(), e.getValue());
-                } else {
-                  agg.put(e.getKey(), e.getValue() + agg.get(e.getKey()));
-                }
-              }
-            }
-            emitter.emit(Pair.of(input.first(), agg));
-          }
-        }).materialize();
-
-    boolean passed = false;
-    for (Pair<String, Map<String, Long>> v : output) {
-      if (v.first().equals("k") && v.second().get("n") == 8L) {
-        passed = true;
-        break;
-      }
-    }
-    pipeline.done();
-
-    assertThat(passed, is(true));
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/MaterializeIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/MaterializeIT.java b/crunch/src/it/java/org/apache/crunch/MaterializeIT.java
deleted file mode 100644
index d064993..0000000
--- a/crunch/src/it/java/org/apache/crunch/MaterializeIT.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.crunch.fn.FilterFns;
-import org.apache.crunch.impl.mem.MemPipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.test.Person;
-import org.apache.crunch.test.StringWrapper;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.avro.Avros;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-import org.junit.Assume;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-public class MaterializeIT {
-
-  @Rule
-  public TemporaryPath tmpDir = TemporaryPaths.create();
-
-  @Test
-  public void testMaterializeInput_Writables() throws IOException {
-    runMaterializeInput(new MRPipeline(MaterializeIT.class, tmpDir.getDefaultConfiguration()),
-        WritableTypeFamily.getInstance());
-  }
-
-  @Test
-  public void testMaterializeInput_Avro() throws IOException {
-    runMaterializeInput(new MRPipeline(MaterializeIT.class, tmpDir.getDefaultConfiguration()),
-        AvroTypeFamily.getInstance());
-  }
-
-  @Test
-  public void testMaterializeInput_InMemoryWritables() throws IOException {
-    runMaterializeInput(MemPipeline.getInstance(), WritableTypeFamily.getInstance());
-  }
-
-  @Test
-  public void testMaterializeInput_InMemoryAvro() throws IOException {
-    runMaterializeInput(MemPipeline.getInstance(), AvroTypeFamily.getInstance());
-  }
-
-  @Test
-  public void testMaterializeEmptyIntermediate_Writables() throws IOException {
-    runMaterializeEmptyIntermediate(
-        new MRPipeline(MaterializeIT.class, tmpDir.getDefaultConfiguration()),
-        WritableTypeFamily.getInstance());
-  }
-
-  @Test
-  public void testMaterializeEmptyIntermediate_Avro() throws IOException {
-    runMaterializeEmptyIntermediate(
-        new MRPipeline(MaterializeIT.class, tmpDir.getDefaultConfiguration()),
-        AvroTypeFamily.getInstance());
-  }
-
-  @Test
-  public void testMaterializeEmptyIntermediate_InMemoryWritables() throws IOException {
-    runMaterializeEmptyIntermediate(MemPipeline.getInstance(), WritableTypeFamily.getInstance());
-  }
-
-  @Test
-  public void testMaterializeEmptyIntermediate_InMemoryAvro() throws IOException {
-    runMaterializeEmptyIntermediate(MemPipeline.getInstance(), AvroTypeFamily.getInstance());
-  }
-
-  public void runMaterializeInput(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
-    List<String> expectedContent = Lists.newArrayList("b", "c", "a", "e");
-    String inputPath = tmpDir.copyResourceFileName("set1.txt");
-
-    PCollection<String> lines = pipeline.readTextFile(inputPath);
-    assertEquals(expectedContent, Lists.newArrayList(lines.materialize()));
-    pipeline.done();
-  }
-
-  public void runMaterializeEmptyIntermediate(Pipeline pipeline, PTypeFamily typeFamily)
-      throws IOException {
-    String inputPath = tmpDir.copyResourceFileName("set1.txt");
-    PCollection<String> empty = pipeline.readTextFile(inputPath).filter(FilterFns.<String>REJECT_ALL());
-
-    assertTrue(Lists.newArrayList(empty.materialize()).isEmpty());
-    pipeline.done();
-  }
-
-  static class StringToStringWrapperPersonPairMapFn extends MapFn<String, Pair<StringWrapper, Person>> {
-
-    @Override
-    public Pair<StringWrapper, Person> map(String input) {
-      Person person = new Person();
-      person.name = input;
-      person.age = 42;
-      person.siblingnames = Lists.<CharSequence> newArrayList();
-      return Pair.of(new StringWrapper(input), person);
-    }
-
-  }
-
-  @Test
-  public void testMaterializeAvroPersonAndReflectsPair_GroupedTable() throws IOException {
-    Assume.assumeTrue(Avros.CAN_COMBINE_SPECIFIC_AND_REFLECT_SCHEMAS);
-    Pipeline pipeline = new MRPipeline(MaterializeIT.class);
-    List<Pair<StringWrapper, Person>> pairList = Lists.newArrayList(pipeline
-        .readTextFile(tmpDir.copyResourceFileName("set1.txt"))
-        .parallelDo(new StringToStringWrapperPersonPairMapFn(),
-            Avros.pairs(Avros.reflects(StringWrapper.class), Avros.records(Person.class)))
-        .materialize());
-    
-    // We just need to make sure this doesn't crash
-    assertEquals(4, pairList.size());
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/MaterializeToMapIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/MaterializeToMapIT.java b/crunch/src/it/java/org/apache/crunch/MaterializeToMapIT.java
deleted file mode 100644
index 7fef30e..0000000
--- a/crunch/src/it/java/org/apache/crunch/MaterializeToMapIT.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static junit.framework.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.crunch.impl.mem.MemPipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PTypeFamily;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.ImmutableList;
-
-public class MaterializeToMapIT {
-
-  static final ImmutableList<Pair<Integer, String>> kvPairs = ImmutableList.of(Pair.of(0, "a"), Pair.of(1, "b"),
-      Pair.of(2, "c"), Pair.of(3, "e"));
-
-  public void assertMatches(Map<Integer, String> m) {
-    for (Integer k : m.keySet()) {
-      assertEquals(kvPairs.get(k).second(), m.get(k));
-    }
-  }
-
-  @Test
-  public void testMemMaterializeToMap() {
-    assertMatches(MemPipeline.tableOf(kvPairs).materializeToMap());
-  }
-
-  private static class Set1Mapper extends MapFn<String, Pair<Integer, String>> {
-    @Override
-    public Pair<Integer, String> map(String input) {
-
-      int k = -1;
-      if (input.equals("a"))
-        k = 0;
-      else if (input.equals("b"))
-        k = 1;
-      else if (input.equals("c"))
-        k = 2;
-      else if (input.equals("e"))
-        k = 3;
-      return Pair.of(k, input);
-    }
-  }
-  @Rule
-  public TemporaryPath tmpDir = TemporaryPaths.create();
-
-  @Test
-  public void testMRMaterializeToMap() throws IOException {
-    Pipeline p = new MRPipeline(MaterializeToMapIT.class, tmpDir.getDefaultConfiguration());
-    String inputFile = tmpDir.copyResourceFileName("set1.txt");
-    PCollection<String> c = p.readTextFile(inputFile);
-    PTypeFamily tf = c.getTypeFamily();
-    PTable<Integer, String> t = c.parallelDo(new Set1Mapper(), tf.tableOf(tf.ints(), tf.strings()));
-    Map<Integer, String> m = t.materializeToMap();
-    assertMatches(m);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/MultipleOutputIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/MultipleOutputIT.java b/crunch/src/it/java/org/apache/crunch/MultipleOutputIT.java
deleted file mode 100644
index 1a85b6a..0000000
--- a/crunch/src/it/java/org/apache/crunch/MultipleOutputIT.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.At;
-import org.apache.crunch.test.StringWrapper;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.avro.Avros;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-import org.apache.crunch.types.writable.Writables;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-import com.google.common.io.Files;
-
-public class MultipleOutputIT {
-  @Rule
-  public TemporaryPath tmpDir = TemporaryPaths.create();
-
-  public static PCollection<String> evenCountLetters(PCollection<String> words, PTypeFamily typeFamily) {
-    return words.parallelDo("even", new FilterFn<String>() {
-
-      @Override
-      public boolean accept(String input) {
-        return input.length() % 2 == 0;
-      }
-    }, typeFamily.strings());
-  }
-
-  public static PCollection<String> oddCountLetters(PCollection<String> words, PTypeFamily typeFamily) {
-    return words.parallelDo("odd", new FilterFn<String>() {
-
-      @Override
-      public boolean accept(String input) {
-        return input.length() % 2 != 0;
-      }
-    }, typeFamily.strings());
-
-  }
-
-  public static PTable<String, Long> substr(PTable<String, Long> ptable) {
-    return ptable.parallelDo(new DoFn<Pair<String, Long>, Pair<String, Long>>() {
-      public void process(Pair<String, Long> input, Emitter<Pair<String, Long>> emitter) {
-        if (input.first().length() > 0) {
-          emitter.emit(Pair.of(input.first().substring(0, 1), input.second()));
-        }
-      }
-    }, ptable.getPTableType());
-  }
-
-  @Test
-  public void testWritables() throws IOException {
-    run(new MRPipeline(MultipleOutputIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance());
-  }
-
-  @Test
-  public void testAvro() throws IOException {
-    run(new MRPipeline(MultipleOutputIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance());
-  }
-
-  @Test
-  public void testParallelDosFused() throws IOException {
-
-    PipelineResult result = run(new MRPipeline(MultipleOutputIT.class, tmpDir.getDefaultConfiguration()),
-        WritableTypeFamily.getInstance());
-
-    // Ensure our multiple outputs were fused into a single job.
-    assertEquals("parallel Dos not fused into a single job", 1, result.getStageResults().size());
-  }
-
-  public PipelineResult run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
-    String inputPath = tmpDir.copyResourceFileName("letters.txt");
-    String outputPathEven = tmpDir.getFileName("even");
-    String outputPathOdd = tmpDir.getFileName("odd");
-
-    PCollection<String> words = pipeline.read(At.textFile(inputPath, typeFamily.strings()));
-
-    PCollection<String> evenCountWords = evenCountLetters(words, typeFamily);
-    PCollection<String> oddCountWords = oddCountLetters(words, typeFamily);
-    pipeline.writeTextFile(evenCountWords, outputPathEven);
-    pipeline.writeTextFile(oddCountWords, outputPathOdd);
-
-    PipelineResult result = pipeline.done();
-
-    checkFileContents(outputPathEven, Arrays.asList("bb"));
-    checkFileContents(outputPathOdd, Arrays.asList("a"));
-
-    return result;
-  }
-
-  /**
-   * Mutates the state of an input and then emits the mutated object.
-   */
-  static class AppendFn extends DoFn<StringWrapper, StringWrapper> {
-
-    private String value;
-
-    public AppendFn(String value) {
-      this.value = value;
-    }
-
-    @Override
-    public void process(StringWrapper input, Emitter<StringWrapper> emitter) {
-      input.setValue(input.getValue() + value);
-      emitter.emit(input);
-    }
-
-  }
-
-  /**
-   * Fusing multiple pipelines has a risk of running into object reuse bugs.
-   * This test verifies that mutating the state of an object that is passed
-   * through multiple streams of a pipeline doesn't allow one stream to affect
-   * another.
-   */
-  @Test
-  public void testFusedMappersObjectReuseBug() throws IOException {
-    Pipeline pipeline = new MRPipeline(MultipleOutputIT.class, tmpDir.getDefaultConfiguration());
-    PCollection<StringWrapper> stringWrappers = pipeline.readTextFile(tmpDir.copyResourceFileName("set2.txt"))
-        .parallelDo(new StringWrapper.StringToStringWrapperMapFn(), Avros.reflects(StringWrapper.class));
-
-    PCollection<String> stringsA = stringWrappers.parallelDo(new AppendFn("A"), stringWrappers.getPType())
-        .parallelDo(new StringWrapper.StringWrapperToStringMapFn(), Writables.strings());
-    PCollection<String> stringsB = stringWrappers.parallelDo(new AppendFn("B"), stringWrappers.getPType())
-        .parallelDo(new StringWrapper.StringWrapperToStringMapFn(), Writables.strings());
-
-    String outputA = tmpDir.getFileName("stringsA");
-    String outputB = tmpDir.getFileName("stringsB");
-
-    pipeline.writeTextFile(stringsA, outputA);
-    pipeline.writeTextFile(stringsB, outputB);
-    PipelineResult pipelineResult = pipeline.done();
-
-    // Make sure fusing did actually occur
-    assertEquals(1, pipelineResult.getStageResults().size());
-
-    checkFileContents(outputA, Lists.newArrayList("cA", "dA", "aA"));
-    checkFileContents(outputB, Lists.newArrayList("cB", "dB", "aB"));
-
-  }
-
-  private void checkFileContents(String filePath, List<String> expected) throws IOException {
-    File outputFile = new File(filePath, "part-m-00000");
-    List<String> lines = Files.readLines(outputFile, Charset.defaultCharset());
-    assertEquals(expected, lines);
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/PCollectionGetSizeIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/PCollectionGetSizeIT.java b/crunch/src/it/java/org/apache/crunch/PCollectionGetSizeIT.java
deleted file mode 100644
index 44eb897..0000000
--- a/crunch/src/it/java/org/apache/crunch/PCollectionGetSizeIT.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static com.google.common.collect.Lists.newArrayList;
-import static org.apache.crunch.io.At.sequenceFile;
-import static org.apache.crunch.io.At.textFile;
-import static org.apache.crunch.types.writable.Writables.strings;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-
-import java.io.IOException;
-
-import org.apache.crunch.fn.FilterFns;
-import org.apache.crunch.impl.mem.MemPipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-
-public class PCollectionGetSizeIT {
-  @Rule
-  public TemporaryPath tmpDir = TemporaryPaths.create();
-
-  private String emptyInputPath;
-  private String nonEmptyInputPath;
-  private String outputPath;
-
-  @Before
-  public void setUp() throws IOException {
-    emptyInputPath = tmpDir.copyResourceFileName("emptyTextFile.txt");
-    nonEmptyInputPath = tmpDir.copyResourceFileName("set1.txt");
-    outputPath = tmpDir.getFileName("output");
-  }
-
-  @Test
-  public void testGetSizeOfEmptyInput_MRPipeline() throws IOException {
-    testCollectionGetSizeOfEmptyInput(new MRPipeline(this.getClass(), tmpDir.getDefaultConfiguration()));
-  }
-
-  @Test
-  public void testGetSizeOfEmptyInput_MemPipeline() throws IOException {
-    testCollectionGetSizeOfEmptyInput(MemPipeline.getInstance());
-  }
-
-  private void testCollectionGetSizeOfEmptyInput(Pipeline pipeline) throws IOException {
-
-    assertThat(pipeline.read(textFile(emptyInputPath)).getSize(), is(0L));
-  }
-
-  @Test
-  public void testMaterializeEmptyInput_MRPipeline() throws IOException {
-    testMaterializeEmptyInput(new MRPipeline(this.getClass(), tmpDir.getDefaultConfiguration()));
-  }
-
-  @Test
-  public void testMaterializeEmptyImput_MemPipeline() throws IOException {
-    testMaterializeEmptyInput(MemPipeline.getInstance());
-  }
-
-  private void testMaterializeEmptyInput(Pipeline pipeline) throws IOException {
-    assertThat(newArrayList(pipeline.readTextFile(emptyInputPath).materialize().iterator()).size(), is(0));
-  }
-
-  @Test
-  public void testGetSizeOfEmptyIntermediatePCollection_MRPipeline() throws IOException {
-
-    PCollection<String> emptyIntermediate = createPesistentEmptyIntermediate(
-        new MRPipeline(this.getClass(), tmpDir.getDefaultConfiguration()));
-
-    assertThat(emptyIntermediate.getSize(), is(0L));
-  }
-
-  @Test
-  @Ignore("GetSize of a DoCollection is only an estimate based on scale factor, so we can't count on it being reported as 0")
-  public void testGetSizeOfEmptyIntermediatePCollection_NoSave_MRPipeline() throws IOException {
-
-    PCollection<String> data = new MRPipeline(this.getClass(), tmpDir.getDefaultConfiguration())
-      .readTextFile(nonEmptyInputPath);
-
-    PCollection<String> emptyPCollection = data.filter(FilterFns.<String>REJECT_ALL());
-
-    assertThat(emptyPCollection.getSize(), is(0L));
-  }
-
-  @Test
-  public void testGetSizeOfEmptyIntermediatePCollection_MemPipeline() {
-
-    PCollection<String> emptyIntermediate = createPesistentEmptyIntermediate(MemPipeline.getInstance());
-
-    assertThat(emptyIntermediate.getSize(), is(0L));
-  }
-
-  @Test
-  public void testMaterializeOfEmptyIntermediatePCollection_MRPipeline() throws IOException {
-
-    PCollection<String> emptyIntermediate = createPesistentEmptyIntermediate(
-        new MRPipeline(this.getClass(), tmpDir.getDefaultConfiguration()));
-
-    assertThat(newArrayList(emptyIntermediate.materialize()).size(), is(0));
-  }
-
-  @Test
-  public void testMaterializeOfEmptyIntermediatePCollection_MemPipeline() {
-
-    PCollection<String> emptyIntermediate = createPesistentEmptyIntermediate(MemPipeline.getInstance());
-
-    assertThat(newArrayList(emptyIntermediate.materialize()).size(), is(0));
-  }
-
-  private PCollection<String> createPesistentEmptyIntermediate(Pipeline pipeline) {
-
-    PCollection<String> data = pipeline.readTextFile(nonEmptyInputPath);
-
-    PCollection<String> emptyPCollection = data.filter(FilterFns.<String>REJECT_ALL());
-
-    emptyPCollection.write(sequenceFile(outputPath, strings()));
-
-    pipeline.run();
-
-    return pipeline.read(sequenceFile(outputPath, strings()));
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testExpectExceptionForGettingSizeOfNonExistingFile_MRPipeline() throws IOException {
-    new MRPipeline(this.getClass(), tmpDir.getDefaultConfiguration()).readTextFile("non_existing.file").getSize();
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testExpectExceptionForGettingSizeOfNonExistingFile_MemPipeline() {
-    MemPipeline.getInstance().readTextFile("non_existing.file").getSize();
-  }
-}