You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/04/23 22:41:23 UTC

[21/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/PObjectsIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/PObjectsIT.java b/crunch/src/it/java/org/apache/crunch/PObjectsIT.java
deleted file mode 100644
index 6ee849f..0000000
--- a/crunch/src/it/java/org/apache/crunch/PObjectsIT.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.lang.Integer;
-import java.lang.Iterable;
-import java.lang.String;
-import java.util.Iterator;
-
-import org.apache.crunch.PCollection;
-import org.apache.crunch.impl.mem.MemPipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.materialize.pobject.PObjectImpl;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.junit.Rule;
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class PObjectsIT {
-
-  private static final Integer LINES_IN_SHAKES = 3667;
-
-  @Rule
-  public TemporaryPath tmpDir = TemporaryPaths.create();
-
-  /**
-   * A mock PObject that should map PCollections of strings to an integer count of the number of
-   * elements in the underlying PCollection.
-   */
-  public static class MockPObjectImpl extends PObjectImpl<String, Integer> {
-    private int numProcessCalls;
-
-    public MockPObjectImpl(PCollection<String> collect) {
-      super(collect);
-      numProcessCalls = 0;
-    }
-
-    @Override
-    public Integer process(Iterable<String> input) {
-      numProcessCalls++;
-      int i = 0;
-      Iterator<String> itr = input.iterator();
-      while (itr.hasNext()) {
-        i++;
-        itr.next();
-      }
-      return i;
-    }
-
-    public int getNumProcessCalls() {
-      return numProcessCalls;
-    }
-  }
-
-  @Test
-  public void testMRPipeline() throws IOException {
-    run(new MRPipeline(PObjectsIT.class, tmpDir.getDefaultConfiguration()));
-  }
-
-  @Test
-  public void testInMemoryPipeline() throws IOException {
-    run(MemPipeline.getInstance());
-  }
-
-  public void run(Pipeline pipeline) throws IOException {
-    String shakesInputPath = tmpDir.copyResourceFileName("shakes.txt");
-    PCollection<String> shakespeare = pipeline.readTextFile(shakesInputPath);
-    MockPObjectImpl lineCount = new MockPObjectImpl(shakespeare);
-    // Get the line count once and verify it's correctness.
-    assertEquals("Incorrect number of lines counted from PCollection.", LINES_IN_SHAKES,
-        lineCount.getValue());
-    // And do it again.
-    assertEquals("Incorrect number of lines counted from PCollection.", LINES_IN_SHAKES,
-        lineCount.getValue());
-    // Make sure process was called only once because the PObject's value was cached after the
-    // first call.
-    assertEquals("Process on PObject not called exactly 1 times.", 1,
-        lineCount.getNumProcessCalls());
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/PTableKeyValueIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/PTableKeyValueIT.java b/crunch/src/it/java/org/apache/crunch/PTableKeyValueIT.java
deleted file mode 100644
index d56e122..0000000
--- a/crunch/src/it/java/org/apache/crunch/PTableKeyValueIT.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-
-import junit.framework.Assert;
-
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.At;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import com.google.common.collect.Lists;
-
-@RunWith(value = Parameterized.class)
-public class PTableKeyValueIT implements Serializable {
-
-  private static final long serialVersionUID = 4374227704751746689L;
-
-  private transient PTypeFamily typeFamily;
-  private transient MRPipeline pipeline;
-  private transient String inputFile;
-  @Rule
-  public transient TemporaryPath tmpDir = TemporaryPaths.create();
-
-  @Before
-  public void setUp() throws IOException {
-    pipeline = new MRPipeline(PTableKeyValueIT.class, tmpDir.getDefaultConfiguration());
-    inputFile = tmpDir.copyResourceFileName("set1.txt");
-  }
-
-  @After
-  public void tearDown() {
-    pipeline.done();
-  }
-
-  public PTableKeyValueIT(PTypeFamily typeFamily) {
-    this.typeFamily = typeFamily;
-  }
-
-  @Parameters
-  public static Collection<Object[]> data() {
-    Object[][] data = new Object[][] { { WritableTypeFamily.getInstance() }, { AvroTypeFamily.getInstance() } };
-    return Arrays.asList(data);
-  }
-
-  @Test
-  public void testKeysAndValues() throws Exception {
-
-    PCollection<String> collection = pipeline.read(At.textFile(inputFile, typeFamily.strings()));
-
-    PTable<String, String> table = collection.parallelDo(new DoFn<String, Pair<String, String>>() {
-
-      @Override
-      public void process(String input, Emitter<Pair<String, String>> emitter) {
-        emitter.emit(Pair.of(input.toUpperCase(), input));
-
-      }
-    }, typeFamily.tableOf(typeFamily.strings(), typeFamily.strings()));
-
-    PCollection<String> keys = table.keys();
-    PCollection<String> values = table.values();
-
-    ArrayList<String> keyList = Lists.newArrayList(keys.materialize().iterator());
-    ArrayList<String> valueList = Lists.newArrayList(values.materialize().iterator());
-
-    Assert.assertEquals(keyList.size(), valueList.size());
-    for (int i = 0; i < keyList.size(); i++) {
-      Assert.assertEquals(keyList.get(i), valueList.get(i).toUpperCase());
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/PageRankIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/PageRankIT.java b/crunch/src/it/java/org/apache/crunch/PageRankIT.java
deleted file mode 100644
index 6291ef8..0000000
--- a/crunch/src/it/java/org/apache/crunch/PageRankIT.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.crunch.impl.mem.MemPipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.lib.Aggregate;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.PTypes;
-import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.avro.Avros;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
-public class PageRankIT {
-
-  public static class PageRankData {
-    public float score;
-    public float lastScore;
-    public List<String> urls;
-
-    public PageRankData() {
-    }
-
-    public PageRankData(float score, float lastScore, Iterable<String> urls) {
-      this.score = score;
-      this.lastScore = lastScore;
-      this.urls = Lists.newArrayList(urls);
-    }
-
-    public PageRankData next(float newScore) {
-      return new PageRankData(newScore, score, urls);
-    }
-
-    public float propagatedScore() {
-      return score / urls.size();
-    }
-
-    @Override
-    public String toString() {
-      return score + " " + lastScore + " " + urls;
-    }
-  }
-
-  @Rule
-  public TemporaryPath tmpDir = TemporaryPaths.create();
-
-  @Test
-  public void testAvroReflect() throws Exception {
-    PTypeFamily tf = AvroTypeFamily.getInstance();
-    PType<PageRankData> prType = Avros.reflects(PageRankData.class);
-    String urlInput = tmpDir.copyResourceFileName("urls.txt");
-    run(new MRPipeline(PageRankIT.class, tmpDir.getDefaultConfiguration()),
-        urlInput, prType, tf);
-  }
-
-  @Test
-  public void testAvroMReflectInMemory() throws Exception {
-    PTypeFamily tf = AvroTypeFamily.getInstance();
-    PType<PageRankData> prType = Avros.reflects(PageRankData.class);
-    String urlInput = tmpDir.copyResourceFileName("urls.txt");
-    run(MemPipeline.getInstance(), urlInput, prType, tf);
-  }
-
-  @Test
-  public void testAvroJSON() throws Exception {
-    PTypeFamily tf = AvroTypeFamily.getInstance();
-    PType<PageRankData> prType = PTypes.jsonString(PageRankData.class, tf);
-    String urlInput = tmpDir.copyResourceFileName("urls.txt");
-    run(new MRPipeline(PageRankIT.class, tmpDir.getDefaultConfiguration()),
-        urlInput, prType, tf);
-  }
-
-  @Test
-  public void testWritablesJSON() throws Exception {
-    PTypeFamily tf = WritableTypeFamily.getInstance();
-    PType<PageRankData> prType = PTypes.jsonString(PageRankData.class, tf);
-    String urlInput = tmpDir.copyResourceFileName("urls.txt");
-    run(new MRPipeline(PageRankIT.class, tmpDir.getDefaultConfiguration()),
-        urlInput, prType, tf);
-  }
-
-  public static PTable<String, PageRankData> pageRank(PTable<String, PageRankData> input, final float d) {
-    PTypeFamily ptf = input.getTypeFamily();
-    PTable<String, Float> outbound = input.parallelDo(new DoFn<Pair<String, PageRankData>, Pair<String, Float>>() {
-      @Override
-      public void process(Pair<String, PageRankData> input, Emitter<Pair<String, Float>> emitter) {
-        PageRankData prd = input.second();
-        for (String link : prd.urls) {
-          emitter.emit(Pair.of(link, prd.propagatedScore()));
-        }
-      }
-    }, ptf.tableOf(ptf.strings(), ptf.floats()));
-
-    return input.cogroup(outbound).parallelDo(
-        new MapFn<Pair<String, Pair<Collection<PageRankData>, Collection<Float>>>, Pair<String, PageRankData>>() {
-          @Override
-          public Pair<String, PageRankData> map(Pair<String, Pair<Collection<PageRankData>, Collection<Float>>> input) {
-            PageRankData prd = Iterables.getOnlyElement(input.second().first());
-            Collection<Float> propagatedScores = input.second().second();
-            float sum = 0.0f;
-            for (Float s : propagatedScores) {
-              sum += s;
-            }
-            return Pair.of(input.first(), prd.next(d + (1.0f - d) * sum));
-          }
-        }, input.getPTableType());
-  }
-
-  public static void run(Pipeline pipeline, String urlInput,
-      PType<PageRankData> prType, PTypeFamily ptf) throws Exception {
-    PTable<String, PageRankData> scores = pipeline.readTextFile(urlInput)
-        .parallelDo(new MapFn<String, Pair<String, String>>() {
-          @Override
-          public Pair<String, String> map(String input) {
-            String[] urls = input.split("\\t");
-            return Pair.of(urls[0], urls[1]);
-          }
-        }, ptf.tableOf(ptf.strings(), ptf.strings())).groupByKey()
-        .parallelDo(new MapFn<Pair<String, Iterable<String>>, Pair<String, PageRankData>>() {
-          @Override
-          public Pair<String, PageRankData> map(Pair<String, Iterable<String>> input) {
-            return Pair.of(input.first(), new PageRankData(1.0f, 0.0f, input.second()));
-          }
-        }, ptf.tableOf(ptf.strings(), prType));
-
-    Float delta = 1.0f;
-    while (delta > 0.01) {
-      scores = pageRank(scores, 0.5f);
-      scores.materialize().iterator(); // force the write
-      delta = Aggregate.max(scores.parallelDo(new MapFn<Pair<String, PageRankData>, Float>() {
-        @Override
-        public Float map(Pair<String, PageRankData> input) {
-          PageRankData prd = input.second();
-          return Math.abs(prd.score - prd.lastScore);
-        }
-      }, ptf.floats())).getValue();
-    }
-    assertEquals(0.0048, delta, 0.001);
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/StageResultsCountersIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/StageResultsCountersIT.java b/crunch/src/it/java/org/apache/crunch/StageResultsCountersIT.java
deleted file mode 100644
index 19fc302..0000000
--- a/crunch/src/it/java/org/apache/crunch/StageResultsCountersIT.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertTrue;
-
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.crunch.PipelineResult.StageResult;
-import org.apache.crunch.impl.mem.MemPipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.From;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-import org.apache.hadoop.mapreduce.Counter;
-import org.junit.After;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-public class StageResultsCountersIT {
-
-  @Rule
-  public TemporaryPath tmpDir = TemporaryPaths.create();
-
-  public static HashSet<String> SPECIAL_KEYWORDS = Sets.newHashSet("AND", "OR", "NOT");
-
-  public static String KEYWORDS_COUNTER_GROUP = "KEYWORDS_COUNTER_GROUP";
-
-  @After
-  public void after() {
-    MemPipeline.clearCounters();
-  }
-  
-  @Test
-  public void testStageResultsCountersMRWritables() throws Exception {
-    testSpecialKeywordCount(new MRPipeline(StageResultsCountersIT.class, tmpDir.getDefaultConfiguration()),
-        WritableTypeFamily.getInstance());
-  }
-
-  @Test
-  public void testStageResultsCountersMRAvro() throws Exception {
-    testSpecialKeywordCount(new MRPipeline(StageResultsCountersIT.class, tmpDir.getDefaultConfiguration()),
-        AvroTypeFamily.getInstance());
-  }
-
-  @Test
-  public void testStageResultsCountersMemWritables() throws Exception {
-    testSpecialKeywordCount(MemPipeline.getInstance(), WritableTypeFamily.getInstance());
-  }
-
-  @Test
-  public void testStageResultsCountersMemAvro() throws Exception {
-    testSpecialKeywordCount(MemPipeline.getInstance(), AvroTypeFamily.getInstance());
-  }
-
-  public void testSpecialKeywordCount(Pipeline pipeline, PTypeFamily tf) throws Exception {
-
-    String rowsInputPath = tmpDir.copyResourceFileName("shakes.txt");
-
-    PipelineResult result = coutSpecialKeywords(pipeline, rowsInputPath, tf);
-
-    assertTrue(result.succeeded());
-
-    Map<String, Long> keywordsMap = countersToMap(result.getStageResults(), KEYWORDS_COUNTER_GROUP);
-
-    assertEquals(3, keywordsMap.size());
-
-    assertEquals("{NOT=157, AND=596, OR=81}", keywordsMap.toString());
-  }
-
-  private static PipelineResult coutSpecialKeywords(Pipeline pipeline, String inputFileName, PTypeFamily tf) {
-
-    pipeline.read(From.textFile(inputFileName)).parallelDo(new DoFn<String, Void>() {
-
-      @Override
-      public void process(String text, Emitter<Void> emitter) {
-
-        if (!StringUtils.isBlank(text)) {
-
-          String[] tokens = text.toUpperCase().split("\\s");
-
-          for (String token : tokens) {
-            if (SPECIAL_KEYWORDS.contains(token)) {
-              getCounter(KEYWORDS_COUNTER_GROUP, token).increment(1);
-            }
-          }
-        }
-      }
-    }, tf.nulls()).materialize(); // TODO can we avoid the materialize ?
-
-    return pipeline.done();
-  }
-
-  private static Map<String, Long> countersToMap(List<StageResult> stages, String counterGroupName) {
-
-    Map<String, Long> countersMap = Maps.newHashMap();
-
-    for (StageResult sr : stages) {
-      Iterator<Counter> iterator = sr.getCounters().getGroup(counterGroupName).iterator();
-      while (iterator.hasNext()) {
-        Counter counter = (Counter) iterator.next();
-        countersMap.put(counter.getDisplayName(), counter.getValue());
-      }
-    }
-
-    return countersMap;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/TermFrequencyIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/TermFrequencyIT.java b/crunch/src/it/java/org/apache/crunch/TermFrequencyIT.java
deleted file mode 100644
index ca66aa8..0000000
--- a/crunch/src/it/java/org/apache/crunch/TermFrequencyIT.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.crunch.impl.mem.MemPipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.At;
-import org.apache.crunch.io.ReadableSourceTarget;
-import org.apache.crunch.lib.Aggregate;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-import org.junit.Rule;
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class TermFrequencyIT implements Serializable {
-  @Rule
-  public transient TemporaryPath tmpDir = TemporaryPaths.create();
-
-  @Test
-  public void testTermFrequencyWithNoTransform() throws IOException {
-    run(new MRPipeline(TermFrequencyIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance(), false);
-  }
-
-  @Test
-  public void testTermFrequencyWithTransform() throws IOException {
-    run(new MRPipeline(TermFrequencyIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance(), true);
-  }
-
-  @Test
-  public void testTermFrequencyNoTransformInMemory() throws IOException {
-    run(MemPipeline.getInstance(), WritableTypeFamily.getInstance(), false);
-  }
-
-  @Test
-  public void testTermFrequencyWithTransformInMemory() throws IOException {
-    run(MemPipeline.getInstance(), WritableTypeFamily.getInstance(), true);
-  }
-
-  public void run(Pipeline pipeline, PTypeFamily typeFamily, boolean transformTF) throws IOException {
-    String input = tmpDir.copyResourceFileName("docs.txt");
-
-    File transformedOutput = tmpDir.getFile("transformed-output");
-    File tfOutput = tmpDir.getFile("tf-output");
-
-    PCollection<String> docs = pipeline.readTextFile(input);
-
-    PTypeFamily ptf = docs.getTypeFamily();
-
-    /*
-     * Input: String Input title text
-     * 
-     * Output: PTable<Pair<String, String>, Long> Pair<Pair<word, title>, count
-     * in title>
-     */
-    PTable<Pair<String, String>, Long> tf = Aggregate.count(docs.parallelDo("term document frequency",
-        new DoFn<String, Pair<String, String>>() {
-          @Override
-          public void process(String doc, Emitter<Pair<String, String>> emitter) {
-            String[] kv = doc.split("\t");
-            String title = kv[0];
-            String text = kv[1];
-            for (String word : text.split("\\W+")) {
-              if (word.length() > 0) {
-                Pair<String, String> pair = Pair.of(word.toLowerCase(), title);
-                emitter.emit(pair);
-              }
-            }
-          }
-        }, ptf.pairs(ptf.strings(), ptf.strings())));
-
-    if (transformTF) {
-      /*
-       * Input: Pair<Pair<String, String>, Long> Pair<Pair<word, title>, count
-       * in title>
-       * 
-       * Output: PTable<String, Pair<String, Long>> PTable<word, Pair<title,
-       * count in title>>
-       */
-      PTable<String, Pair<String, Long>> wordDocumentCountPair = tf.parallelDo("transform wordDocumentPairCount",
-          new MapFn<Pair<Pair<String, String>, Long>, Pair<String, Pair<String, Long>>>() {
-            @Override
-            public Pair<String, Pair<String, Long>> map(Pair<Pair<String, String>, Long> input) {
-              Pair<String, String> wordDocumentPair = input.first();
-              return Pair.of(wordDocumentPair.first(), Pair.of(wordDocumentPair.second(), input.second()));
-            }
-          }, ptf.tableOf(ptf.strings(), ptf.pairs(ptf.strings(), ptf.longs())));
-
-      pipeline.writeTextFile(wordDocumentCountPair, transformedOutput.getAbsolutePath());
-    }
-
-    SourceTarget<String> st = At.textFile(tfOutput.getAbsolutePath());
-    pipeline.write(tf, st);
-
-    pipeline.run();
-
-    // test the case we should see
-    Iterable<String> lines = ((ReadableSourceTarget<String>) st).read(pipeline.getConfiguration());
-    boolean passed = false;
-    for (String line : lines) {
-      if ("[well,A]\t0".equals(line)) {
-        fail("Found " + line + " but well is in Document A 1 time");
-      }
-      if ("[well,A]\t1".equals(line)) {
-        passed = true;
-      }
-    }
-    assertTrue(passed);
-    pipeline.done();
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/TextPairIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/TextPairIT.java b/crunch/src/it/java/org/apache/crunch/TextPairIT.java
deleted file mode 100644
index 55d9af9..0000000
--- a/crunch/src/it/java/org/apache/crunch/TextPairIT.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.From;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.writable.Writables;
-import org.junit.Rule;
-import org.junit.Test;
-
-public class TextPairIT {
-  @Rule
-  public TemporaryPath tmpDir = TemporaryPaths.create();
-
-  @Test
-  public void testWritables() throws IOException {
-    run(new MRPipeline(TextPairIT.class, tmpDir.getDefaultConfiguration()));
-  }
-
-  private static final String CANARY = "Writables.STRING_TO_TEXT";
-
-  public static PCollection<Pair<String, String>> wordDuplicate(PCollection<String> words) {
-    return words.parallelDo("my word duplicator", new DoFn<String, Pair<String, String>>() {
-      public void process(String line, Emitter<Pair<String, String>> emitter) {
-        for (String word : line.split("\\W+")) {
-          if (word.length() > 0) {
-            Pair<String, String> pair = Pair.of(CANARY, word);
-            emitter.emit(pair);
-          }
-        }
-      }
-    }, Writables.pairs(Writables.strings(), Writables.strings()));
-  }
-
-  public void run(Pipeline pipeline) throws IOException {
-    String input = tmpDir.copyResourceFileName("shakes.txt");
-
-    PCollection<String> shakespeare = pipeline.read(From.textFile(input));
-    Iterable<Pair<String, String>> lines = pipeline.materialize(wordDuplicate(shakespeare));
-    boolean passed = false;
-    for (Pair<String, String> line : lines) {
-      if (line.first().contains(CANARY)) {
-        passed = true;
-        break;
-      }
-    }
-
-    pipeline.done();
-    assertTrue(passed);
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/TfIdfIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/TfIdfIT.java b/crunch/src/it/java/org/apache/crunch/TfIdfIT.java
deleted file mode 100644
index 218f538..0000000
--- a/crunch/src/it/java/org/apache/crunch/TfIdfIT.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.nio.charset.Charset;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.crunch.fn.MapKeysFn;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.seq.SeqFileSourceTarget;
-import org.apache.crunch.lib.Aggregate;
-import org.apache.crunch.lib.Join;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-import org.apache.hadoop.fs.Path;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-import com.google.common.io.Files;
-
-@SuppressWarnings("serial")
-public class TfIdfIT implements Serializable {
-  @Rule
-  public transient TemporaryPath tmpDir = TemporaryPaths.create();
-
-  // total number of documents, should calculate
-  protected static final double N = 2;
-
-  @Test
-  public void testWritablesSingleRun() throws IOException {
-    run(new MRPipeline(TfIdfIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance(), true);
-  }
-
-  @Test
-  public void testWritablesMultiRun() throws IOException {
-    run(new MRPipeline(TfIdfIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance(), false);
-  }
-
-  /**
-   * This method should generate a TF-IDF score for the input.
-   */
-  public PTable<String, Collection<Pair<String, Double>>> generateTFIDF(PCollection<String> docs, Path termFreqPath,
-      PTypeFamily ptf) throws IOException {
-
-    /*
-     * Input: String Input title text
-     * 
-     * Output: PTable<Pair<String, String>, Long> Pair<Pair<word, title>, count
-     * in title>
-     */
-    PTable<Pair<String, String>, Long> tf = Aggregate.count(docs.parallelDo("term document frequency",
-        new DoFn<String, Pair<String, String>>() {
-          @Override
-          public void process(String doc, Emitter<Pair<String, String>> emitter) {
-            String[] kv = doc.split("\t");
-            String title = kv[0];
-            String text = kv[1];
-            for (String word : text.split("\\W+")) {
-              if (word.length() > 0) {
-                Pair<String, String> pair = Pair.of(word.toLowerCase(), title);
-                emitter.emit(pair);
-              }
-            }
-          }
-        }, ptf.pairs(ptf.strings(), ptf.strings())));
-
-    tf.write(new SeqFileSourceTarget<Pair<Pair<String, String>, Long>>(termFreqPath, tf.getPType()));
-
-    /*
-     * Input: Pair<Pair<String, String>, Long> Pair<Pair<word, title>, count in
-     * title>
-     * 
-     * Output: PTable<String, Long> PTable<word, # of docs containing word>
-     */
-    PTable<String, Long> n = Aggregate.count(tf.parallelDo("little n (# of docs contain word)",
-        new DoFn<Pair<Pair<String, String>, Long>, String>() {
-          @Override
-          public void process(Pair<Pair<String, String>, Long> input, Emitter<String> emitter) {
-            emitter.emit(input.first().first());
-          }
-        }, ptf.strings()));
-
-    /*
-     * Input: Pair<Pair<String, String>, Long> Pair<Pair<word, title>, count in
-     * title>
-     * 
-     * Output: PTable<String, Pair<String, Long>> PTable<word, Pair<title, count
-     * in title>>
-     */
-    PTable<String, Collection<Pair<String, Long>>> wordDocumentCountPair = tf.parallelDo(
-        "transform wordDocumentPairCount",
-        new DoFn<Pair<Pair<String, String>, Long>, Pair<String, Collection<Pair<String, Long>>>>() {
-          Collection<Pair<String, Long>> buffer;
-          String key;
-
-          @Override
-          public void process(Pair<Pair<String, String>, Long> input,
-              Emitter<Pair<String, Collection<Pair<String, Long>>>> emitter) {
-            Pair<String, String> wordDocumentPair = input.first();
-            if (!wordDocumentPair.first().equals(key)) {
-              flush(emitter);
-              key = wordDocumentPair.first();
-              buffer = Lists.newArrayList();
-            }
-            buffer.add(Pair.of(wordDocumentPair.second(), input.second()));
-          }
-
-          protected void flush(Emitter<Pair<String, Collection<Pair<String, Long>>>> emitter) {
-            if (buffer != null) {
-              emitter.emit(Pair.of(key, buffer));
-              buffer = null;
-            }
-          }
-
-          @Override
-          public void cleanup(Emitter<Pair<String, Collection<Pair<String, Long>>>> emitter) {
-            flush(emitter);
-          }
-        }, ptf.tableOf(ptf.strings(), ptf.collections(ptf.pairs(ptf.strings(), ptf.longs()))));
-
-    PTable<String, Pair<Long, Collection<Pair<String, Long>>>> joinedResults = Join.join(n, wordDocumentCountPair);
-
-    /*
-     * Input: Pair<String, Pair<Long, Collection<Pair<String, Long>>> Pair<word,
-     * Pair<# of docs containing word, Collection<Pair<title, term frequency>>>
-     * 
-     * Output: Pair<String, Collection<Pair<String, Double>>> Pair<word,
-     * Collection<Pair<title, tfidf>>>
-     */
-    return joinedResults
-        .parallelDo(
-            "calculate tfidf",
-            new MapFn<Pair<String, Pair<Long, Collection<Pair<String, Long>>>>, Pair<String, Collection<Pair<String, Double>>>>() {
-              @Override
-              public Pair<String, Collection<Pair<String, Double>>> map(
-                  Pair<String, Pair<Long, Collection<Pair<String, Long>>>> input) {
-                Collection<Pair<String, Double>> tfidfs = Lists.newArrayList();
-                String word = input.first();
-                double n = input.second().first();
-                double idf = Math.log(N / n);
-                for (Pair<String, Long> tf : input.second().second()) {
-                  double tfidf = tf.second() * idf;
-                  tfidfs.add(Pair.of(tf.first(), tfidf));
-                }
-                return Pair.of(word, tfidfs);
-              }
-
-            }, ptf.tableOf(ptf.strings(), ptf.collections(ptf.pairs(ptf.strings(), ptf.doubles()))));
-  }
-
-  public void run(Pipeline pipeline, PTypeFamily typeFamily, boolean singleRun) throws IOException {
-    String inputFile = tmpDir.copyResourceFileName("docs.txt");
-    String outputPath1 = tmpDir.getFileName("output1");
-    String outputPath2 = tmpDir.getFileName("output2");
-
-    Path tfPath = tmpDir.getPath("termfreq");
-
-    PCollection<String> docs = pipeline.readTextFile(inputFile);
-
-    PTable<String, Collection<Pair<String, Double>>> results = generateTFIDF(docs, tfPath, typeFamily);
-    pipeline.writeTextFile(results, outputPath1);
-    if (!singleRun) {
-      pipeline.run();
-    }
-
-    PTable<String, Collection<Pair<String, Double>>> uppercased = results.parallelDo(
-        new MapKeysFn<String, String, Collection<Pair<String, Double>>>() {
-          @Override
-          public String map(String k1) {
-            return k1.toUpperCase();
-          }
-        }, results.getPTableType());
-    pipeline.writeTextFile(uppercased, outputPath2);
-    pipeline.done();
-
-    // Check the lowercase version...
-    File outputFile = new File(outputPath1, "part-r-00000");
-    List<String> lines = Files.readLines(outputFile, Charset.defaultCharset());
-    boolean passed = false;
-    for (String line : lines) {
-      if (line.startsWith("[the") && line.contains("B,0.6931471805599453")) {
-        passed = true;
-        break;
-      }
-    }
-    assertTrue(passed);
-
-    // ...and the uppercase version
-    outputFile = new File(outputPath2, "part-r-00000");
-    lines = Files.readLines(outputFile, Charset.defaultCharset());
-    passed = false;
-    for (String line : lines) {
-      if (line.startsWith("[THE") && line.contains("B,0.6931471805599453")) {
-        passed = true;
-        break;
-      }
-    }
-    assertTrue(passed);
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/TupleNClassCastBugIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/TupleNClassCastBugIT.java b/crunch/src/it/java/org/apache/crunch/TupleNClassCastBugIT.java
deleted file mode 100644
index e49f4d5..0000000
--- a/crunch/src/it/java/org/apache/crunch/TupleNClassCastBugIT.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.List;
-
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.io.Files;
-
-
-public class TupleNClassCastBugIT {
-  @Rule
-  public TemporaryPath tmpDir = TemporaryPaths.create();
-
-  public static PCollection<TupleN> mapGroupDo(PCollection<String> lines, PTypeFamily ptf) {
-    PTable<String, TupleN> mapped = lines.parallelDo(new MapFn<String, Pair<String, TupleN>>() {
-
-      @Override
-      public Pair<String, TupleN> map(String line) {
-        String[] columns = line.split("\\t");
-        String docId = columns[0];
-        String docLine = columns[1];
-        return Pair.of(docId, new TupleN(docId, docLine));
-      }
-    }, ptf.tableOf(ptf.strings(), ptf.tuples(ptf.strings(), ptf.strings())));
-    return mapped.groupByKey().parallelDo(new DoFn<Pair<String, Iterable<TupleN>>, TupleN>() {
-      @Override
-      public void process(Pair<String, Iterable<TupleN>> input, Emitter<TupleN> tupleNEmitter) {
-        for (TupleN tuple : input.second()) {
-          tupleNEmitter.emit(tuple);
-        }
-      }
-    }, ptf.tuples(ptf.strings(), ptf.strings()));
-  }
-
-  @Test
-  public void testWritables() throws IOException {
-    run(new MRPipeline(TupleNClassCastBugIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance());
-  }
-
-  @Test
-  public void testAvro() throws IOException {
-    run(new MRPipeline(TupleNClassCastBugIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance());
-  }
-
-  public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
-    String inputPath = tmpDir.copyResourceFileName("docs.txt");
-    String outputPath = tmpDir.getFileName("output");
-
-    PCollection<String> docLines = pipeline.readTextFile(inputPath);
-    pipeline.writeTextFile(mapGroupDo(docLines, typeFamily), outputPath);
-    pipeline.done();
-
-    // *** We are not directly testing the output, we are looking for a
-    // ClassCastException
-    // *** which is thrown in a different thread during the reduce phase. If all
-    // is well
-    // *** the file will exist and have six lines. Otherwise the bug is present.
-    File outputFile = new File(outputPath, "part-r-00000");
-    List<String> lines = Files.readLines(outputFile, Charset.defaultCharset());
-    int lineCount = 0;
-    for (String line : lines) {
-      lineCount++;
-    }
-    assertEquals(6, lineCount);
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/UnionFromSameSourceIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/UnionFromSameSourceIT.java b/crunch/src/it/java/org/apache/crunch/UnionFromSameSourceIT.java
deleted file mode 100644
index 501a944..0000000
--- a/crunch/src/it/java/org/apache/crunch/UnionFromSameSourceIT.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-
-import org.apache.crunch.fn.IdentityFn;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PTableType;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.writable.Writables;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-
-/**
- * Collection of tests re-using the same PCollection in various unions.
- */
-public class UnionFromSameSourceIT {
-
-  private static final int NUM_ELEMENTS = 4;
-
-  @Rule
-  public transient TemporaryPath tmpDir = TemporaryPaths.create();
-
-  private Pipeline pipeline;
-  private PType<String> elementType = Writables.strings();
-  private PTableType<String, String> tableType = Writables.tableOf(Writables.strings(),
-    Writables.strings());
-
-  @Before
-  public void setUp() {
-    pipeline = new MRPipeline(UnionFromSameSourceIT.class, tmpDir.getDefaultConfiguration());
-  }
-
-  @Test
-  public void testUnion_SingleRead() throws IOException {
-    PCollection<String> strings = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt"));
-    PCollection<String> union = strings.union(strings.parallelDo(IdentityFn.<String> getInstance(),
-      strings.getPType()));
-
-    assertEquals(NUM_ELEMENTS * 2, getCount(union));
-  }
-
-  @Test
-  public void testUnion_TwoReads() throws IOException {
-    PCollection<String> stringsA = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt"));
-    PCollection<String> stringsB = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt"));
-
-    PCollection<String> union = stringsA.union(stringsB);
-
-    assertEquals(NUM_ELEMENTS * 2, getCount(union));
-  }
-
-  @Test
-  public void testDoubleUnion_EndingWithGBK() throws IOException {
-    runDoubleUnionPipeline(true);
-  }
-
-  @Test
-  public void testDoubleUnion_EndingWithoutGBK() throws IOException {
-    runDoubleUnionPipeline(false);
-  }
-
-  private void runDoubleUnionPipeline(boolean endWithGBK) throws IOException {
-    PCollection<String> strings = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt"));
-    PTable<String, String> tableA = strings.parallelDo("to table A", new ToTableFn(), tableType);
-    PTable<String, String> tableB = strings.parallelDo("to table B", new ToTableFn(), tableType);
-
-    PGroupedTable<String, String> groupedTable = tableA.union(tableB).groupByKey();
-    PCollection<String> ungrouped = groupedTable.parallelDo("ungroup before union",
-      new FromGroupedTableFn(), elementType).union(
-      strings.parallelDo("fake id", IdentityFn.<String> getInstance(), elementType));
-
-    PTable<String, String> table = ungrouped.parallelDo("union back to table", new ToTableFn(),
-      tableType);
-
-    if (endWithGBK) {
-      table = table.groupByKey().ungroup();
-    }
-
-    assertEquals(3 * NUM_ELEMENTS, getCount(table));
-  }
-
-  private int getCount(PCollection<?> pcollection) {
-    int cnt = 0;
-    for (Object v : pcollection.materialize()) {
-      cnt++;
-    }
-    return cnt;
-  }
-
-  private static class ToTableFn extends MapFn<String, Pair<String, String>> {
-
-    @Override
-    public Pair<String, String> map(String input) {
-      return Pair.of(input, input);
-    }
-
-  }
-
-  private static class FromGroupedTableFn extends DoFn<Pair<String, Iterable<String>>, String> {
-
-    @Override
-    public void process(Pair<String, Iterable<String>> input, Emitter<String> emitter) {
-      for (String value : input.second()) {
-        emitter.emit(value);
-      }
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/UnionIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/UnionIT.java b/crunch/src/it/java/org/apache/crunch/UnionIT.java
deleted file mode 100644
index 1c60a1b..0000000
--- a/crunch/src/it/java/org/apache/crunch/UnionIT.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.crunch.fn.Aggregators;
-import org.apache.crunch.fn.IdentityFn;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.test.Tests;
-import org.apache.crunch.types.avro.Avros;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableMultiset;
-
-
-public class UnionIT {
-
-  @Rule
-  public TemporaryPath tmpDir = TemporaryPaths.create();
-  private MRPipeline pipeline;
-  private PCollection<String> words1;
-  private PCollection<String> words2;
-
-  @Before
-  public void setUp() throws IOException {
-    pipeline = new MRPipeline(UnionIT.class, tmpDir.getDefaultConfiguration());
-    words1 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src1.txt")));
-    words2 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src2.txt")));
-  }
-
-  @After
-  public void tearDown() {
-    pipeline.done();
-  }
-
-  @Test
-  public void testUnion() throws Exception {
-    IdentityFn<String> identity = IdentityFn.getInstance();
-    words1 = words1.parallelDo(identity, Avros.strings());
-    words2 = words2.parallelDo(identity, Avros.strings());
-
-    PCollection<String> union = words1.union(words2);
-
-    ImmutableMultiset<String> actual = ImmutableMultiset.copyOf(union.materialize());
-    assertThat(actual.elementSet().size(), is(3));
-    assertThat(actual.count("a1"), is(4));
-    assertThat(actual.count("b2"), is(2));
-    assertThat(actual.count("c3"), is(2));
-  }
-
-  @Test
-  public void testTableUnion() throws IOException {
-    PTable<String, String> words1ByFirstLetter = byFirstLetter(words1);
-    PTable<String, String> words2ByFirstLetter = byFirstLetter(words2);
-
-    PTable<String, String> union = words1ByFirstLetter.union(words2ByFirstLetter);
-
-    ImmutableMultiset<Pair<String, String>> actual = ImmutableMultiset.copyOf(union.materialize());
-
-    assertThat(actual.elementSet().size(), is(3));
-    assertThat(actual.count(Pair.of("a", "1")), is(4));
-    assertThat(actual.count(Pair.of("b", "2")), is(2));
-    assertThat(actual.count(Pair.of("c", "3")), is(2));
-  }
-
-  @Test
-  public void testUnionThenGroupByKey() throws IOException {
-    PCollection<String> union = words1.union(words2);
-
-    PGroupedTable<String, String> grouped = byFirstLetter(union).groupByKey();
-
-    Map<String, String> actual = grouped.combineValues(Aggregators.STRING_CONCAT("", true))
-        .materializeToMap();
-
-    Map<String, String> expected = ImmutableMap.of("a", "1111", "b", "22", "c", "33");
-    assertThat(actual, is(expected));
-  }
-
-  @Test
-  public void testTableUnionThenGroupByKey() throws IOException {
-    PTable<String, String> words1ByFirstLetter = byFirstLetter(words1);
-    PTable<String, String> words2ByFirstLetter = byFirstLetter(words2);
-
-    PTable<String, String> union = words1ByFirstLetter.union(words2ByFirstLetter);
-
-    PGroupedTable<String, String> grouped = union.groupByKey();
-
-    Map<String, String> actual = grouped.combineValues(Aggregators.STRING_CONCAT("", true))
-        .materializeToMap();
-
-    Map<String, String> expected = ImmutableMap.of("a", "1111", "b", "22", "c", "33");
-    assertThat(actual, is(expected));
-  }
-
-
-  private static PTable<String, String> byFirstLetter(PCollection<String> values) {
-    return values.parallelDo("byFirstLetter", new FirstLetterKeyFn(),
-        Avros.tableOf(Avros.strings(), Avros.strings()));
-  }
-
-  private static class FirstLetterKeyFn extends DoFn<String, Pair<String, String>> {
-    @Override
-    public void process(String input, Emitter<Pair<String, String>> emitter) {
-      if (input.length() > 1) {
-        emitter.emit(Pair.of(input.substring(0, 1), input.substring(1)));
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/UnionResultsIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/UnionResultsIT.java b/crunch/src/it/java/org/apache/crunch/UnionResultsIT.java
deleted file mode 100644
index df0511a..0000000
--- a/crunch/src/it/java/org/apache/crunch/UnionResultsIT.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.At;
-import org.apache.crunch.test.CrunchTestSupport;
-import org.apache.crunch.types.writable.Writables;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-public class UnionResultsIT extends CrunchTestSupport implements Serializable {
-
-  static class StringLengthMapFn extends MapFn<String, Pair<String, Long>> {
-
-    @Override
-    public Pair<String, Long> map(String input) {
-      return new Pair<String, Long>(input, 10L);
-    }
-  }
-
-
-  /**
-   * Tests combining a GBK output with a map-only job output into a single
-   * unioned collection.
-   */
-  @Test
-  public void testUnionOfGroupedOutputAndNonGroupedOutput() throws IOException {
-    String inputPath = tempDir.copyResourceFileName("set1.txt");
-    String inputPath2 = tempDir.copyResourceFileName("set2.txt");
-
-    Pipeline pipeline = new MRPipeline(UnionResultsIT.class);
-
-    PCollection<String> set1Lines = pipeline.read(At.textFile(inputPath, Writables.strings()));
-    PCollection<Pair<String, Long>> set1Lengths = set1Lines.parallelDo(new StringLengthMapFn(),
-        Writables.pairs(Writables.strings(), Writables.longs()));
-    PCollection<Pair<String, Long>> set2Counts = pipeline.read(At.textFile(inputPath2, Writables.strings())).count();
-
-    PCollection<Pair<String, Long>> union = set1Lengths.union(set2Counts);
-
-    List<Pair<String, Long>> unionValues = Lists.newArrayList(union.materialize());
-    assertEquals(7, unionValues.size());
-
-    Set<Pair<String, Long>> expectedPairs = Sets.newHashSet();
-    expectedPairs.add(Pair.of("b", 10L));
-    expectedPairs.add(Pair.of("c", 10L));
-    expectedPairs.add(Pair.of("a", 10L));
-    expectedPairs.add(Pair.of("e", 10L));
-    expectedPairs.add(Pair.of("a", 1L));
-    expectedPairs.add(Pair.of("c", 1L));
-    expectedPairs.add(Pair.of("d", 1L));
-
-    assertEquals(expectedPairs, Sets.newHashSet(unionValues));
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/WordCountIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/WordCountIT.java b/crunch/src/it/java/org/apache/crunch/WordCountIT.java
deleted file mode 100644
index c646663..0000000
--- a/crunch/src/it/java/org/apache/crunch/WordCountIT.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.List;
-
-import org.apache.crunch.fn.Aggregators;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.At;
-import org.apache.crunch.io.To;
-import org.apache.crunch.lib.Aggregate;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.io.Files;
-
-public class WordCountIT {
-  @Rule
-  public TemporaryPath tmpDir = TemporaryPaths.create();
-
-  enum WordCountStats {
-    ANDS
-  };
-
-  public static PTable<String, Long> wordCount(PCollection<String> words, PTypeFamily typeFamily) {
-    return Aggregate.count(words.parallelDo(new DoFn<String, String>() {
-
-      @Override
-      public void process(String line, Emitter<String> emitter) {
-        for (String word : line.split("\\s+")) {
-          emitter.emit(word);
-          if ("and".equals(word)) {
-            increment(WordCountStats.ANDS);
-          }
-        }
-      }
-    }, typeFamily.strings()));
-  }
-
-  public static PTable<String, Long> substr(PTable<String, Long> ptable) {
-    return ptable.parallelDo(new DoFn<Pair<String, Long>, Pair<String, Long>>() {
-
-      public void process(Pair<String, Long> input, Emitter<Pair<String, Long>> emitter) {
-        if (input.first().length() > 0) {
-          emitter.emit(Pair.of(input.first().substring(0, 1), input.second()));
-        }
-      }
-    }, ptable.getPTableType());
-  }
-
-  private boolean runSecond = false;
-  private boolean useToOutput = false;
-
-  @Test
-  public void testWritables() throws IOException {
-    run(new MRPipeline(WordCountIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance());
-  }
-
-  @Test
-  public void testWritablesWithSecond() throws IOException {
-    runSecond = true;
-    run(new MRPipeline(WordCountIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance());
-  }
-
-  @Test
-  public void testWritablesWithSecondUseToOutput() throws IOException {
-    runSecond = true;
-    useToOutput = true;
-    run(new MRPipeline(WordCountIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance());
-  }
-
-  @Test
-  public void testAvro() throws IOException {
-    run(new MRPipeline(WordCountIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance());
-  }
-
-  @Test
-  public void testAvroWithSecond() throws IOException {
-    runSecond = true;
-    run(new MRPipeline(WordCountIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance());
-  }
-
-  @Test
-  public void testWithTopWritable() throws IOException {
-    runWithTop(WritableTypeFamily.getInstance());
-  }
-
-  @Test
-  public void testWithTopAvro() throws IOException {
-    runWithTop(AvroTypeFamily.getInstance());
-  }
-
-  public void runWithTop(PTypeFamily tf) throws IOException {
-    Pipeline pipeline = new MRPipeline(WordCountIT.class, tmpDir.getDefaultConfiguration());
-    String inputPath = tmpDir.copyResourceFileName("shakes.txt");
-
-    PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, tf.strings()));
-    PTable<String, Long> wordCount = wordCount(shakespeare, tf);
-    List<Pair<String, Long>> top5 = Lists.newArrayList(Aggregate.top(wordCount, 5, true).materialize());
-    assertEquals(
-        ImmutableList.of(Pair.of("", 1470L), Pair.of("the", 620L), Pair.of("and", 427L), Pair.of("of", 396L),
-            Pair.of("to", 367L)), top5);
-  }
-
-  public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
-    String inputPath = tmpDir.copyResourceFileName("shakes.txt");
-    String outputPath = tmpDir.getFileName("output");
-
-    PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, typeFamily.strings()));
-    PTable<String, Long> wordCount = wordCount(shakespeare, typeFamily);
-    if (useToOutput) {
-      wordCount.write(To.textFile(outputPath));
-    } else {
-      pipeline.writeTextFile(wordCount, outputPath);
-    }
-
-    if (runSecond) {
-      String substrPath = tmpDir.getFileName("substr");
-      PTable<String, Long> we = substr(wordCount).groupByKey().combineValues(Aggregators.SUM_LONGS());
-      pipeline.writeTextFile(we, substrPath);
-    }
-    PipelineResult res = pipeline.done();
-    assertTrue(res.succeeded());
-    List<PipelineResult.StageResult> stageResults = res.getStageResults();
-    if (runSecond) {
-      assertEquals(2, stageResults.size());
-    } else {
-      assertEquals(1, stageResults.size());
-      assertEquals(427, stageResults.get(0).getCounterValue(WordCountStats.ANDS));
-    }
-
-    File outputFile = new File(outputPath, "part-r-00000");
-    List<String> lines = Files.readLines(outputFile, Charset.defaultCharset());
-    boolean passed = false;
-    for (String line : lines) {
-      if (line.startsWith("Macbeth\t28") || line.startsWith("[Macbeth,28]")) {
-        passed = true;
-        break;
-      }
-    }
-    assertTrue(passed);
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/fn/AggregatorsIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/fn/AggregatorsIT.java b/crunch/src/it/java/org/apache/crunch/fn/AggregatorsIT.java
deleted file mode 100644
index c9584a1..0000000
--- a/crunch/src/it/java/org/apache/crunch/fn/AggregatorsIT.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.fn;
-
-import static org.apache.crunch.fn.Aggregators.SUM_INTS;
-import static org.apache.crunch.fn.Aggregators.pairAggregator;
-import static org.apache.crunch.types.writable.Writables.ints;
-import static org.apache.crunch.types.writable.Writables.pairs;
-import static org.apache.crunch.types.writable.Writables.strings;
-import static org.apache.crunch.types.writable.Writables.tableOf;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-
-import java.util.Collection;
-import java.util.Map;
-
-import org.apache.crunch.MapFn;
-import org.apache.crunch.PCollection;
-import org.apache.crunch.PTable;
-import org.apache.crunch.Pair;
-import org.apache.crunch.Pipeline;
-import org.apache.crunch.test.Tests;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-
-@RunWith(Parameterized.class)
-public class AggregatorsIT {
-  private Pipeline pipeline;
-
-  @Parameters
-  public static Collection<Object[]> params() {
-    return Tests.pipelinesParams(AggregatorsIT.class);
-  }
-
-  public AggregatorsIT(Pipeline pipeline) {
-    this.pipeline = pipeline;
-  }
-
-  @Test
-  public void testPairAggregator() {
-    PCollection<String> lines = pipeline.readTextFile(Tests.pathTo(this, "ints.txt"));
-
-    PTable<String, Pair<Integer, Integer>> table = lines.parallelDo(new SplitLine(),
-        tableOf(strings(), pairs(ints(), ints())));
-
-    PTable<String, Pair<Integer, Integer>> combinedTable = table.groupByKey().combineValues(
-        pairAggregator(SUM_INTS(), SUM_INTS()));
-
-    Map<String, Pair<Integer, Integer>> result = combinedTable.asMap().getValue();
-
-    assertThat(result.size(), is(2));
-    assertThat(result.get("a"), is(Pair.of(9,  12)));
-    assertThat(result.get("b"), is(Pair.of(11,  13)));
-  }
-
-  private static final class SplitLine extends MapFn<String, Pair<String, Pair<Integer, Integer>>> {
-    @Override
-    public Pair<String, Pair<Integer, Integer>> map(String input) {
-      String[] split = input.split("\t");
-      return Pair.of(split[0],
-          Pair.of(Integer.parseInt(split[1]), Integer.parseInt(split[2])));
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileWritingIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileWritingIT.java b/crunch/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileWritingIT.java
deleted file mode 100644
index 976a43e..0000000
--- a/crunch/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileWritingIT.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mem;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.util.List;
-
-import org.apache.crunch.PCollection;
-import org.apache.crunch.Pipeline;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableList;
-import com.google.common.io.Files;
-
-public class MemPipelineFileWritingIT {
-  @Rule
-  public TemporaryPath baseTmpDir = TemporaryPaths.create();
-
-  @Test
-  public void testMemPipelineFileWriter() throws Exception {
-    File tmpDir = baseTmpDir.getFile("mempipe");
-    Pipeline p = MemPipeline.getInstance();
-    PCollection<String> lines = MemPipeline.collectionOf("hello", "world");
-    p.writeTextFile(lines, tmpDir.toString());
-    p.done();
-    assertTrue(tmpDir.exists());
-    File[] files = tmpDir.listFiles();
-    assertTrue(files != null && files.length > 0);
-    for (File f : files) {
-      if (!f.getName().startsWith(".")) {
-        List<String> txt = Files.readLines(f, Charsets.UTF_8);
-        assertEquals(ImmutableList.of("hello", "world"), txt);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java b/crunch/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java
deleted file mode 100644
index f9f73b2..0000000
--- a/crunch/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.collect;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.crunch.PCollection;
-import org.apache.crunch.PTableKeyValueIT;
-import org.apache.crunch.Pipeline;
-import org.apache.crunch.impl.mem.MemPipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.At;
-import org.apache.crunch.io.To;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.avro.Avros;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import com.google.common.collect.Lists;
-
-@RunWith(value = Parameterized.class)
-public class UnionCollectionIT {
-  @Rule
-  public TemporaryPath tmpDir = TemporaryPaths.create();
-
-  private static final Log LOG = LogFactory.getLog(UnionCollectionIT.class);
-
-  private PTypeFamily typeFamily;
-  private Pipeline pipeline;
-  private PCollection<String> union;
-
-  private ArrayList<String> EXPECTED = Lists.newArrayList("a", "a", "b", "c", "c", "d", "e");
-
-  private Class pipelineClass;
-
-  @Before
-  @SuppressWarnings("unchecked")
-  public void setUp() throws IOException {
-    String inputFile1 = tmpDir.copyResourceFileName("set1.txt");
-    String inputFile2 = tmpDir.copyResourceFileName("set2.txt");
-    if (pipelineClass == null) {
-      pipeline = MemPipeline.getInstance();
-    } else {
-      pipeline = new MRPipeline(pipelineClass, tmpDir.getDefaultConfiguration());
-    }
-    PCollection<String> firstCollection = pipeline.read(At.textFile(inputFile1, typeFamily.strings()));
-    PCollection<String> secondCollection = pipeline.read(At.textFile(inputFile2, typeFamily.strings()));
-
-    LOG.info("Test fixture: [" + pipeline.getClass().getSimpleName() + " : " + typeFamily.getClass().getSimpleName()
-        + "]  First: " + Lists.newArrayList(firstCollection.materialize().iterator()) + ", Second: "
-        + Lists.newArrayList(secondCollection.materialize().iterator()));
-
-    union = secondCollection.union(firstCollection);
-  }
-
-  @Parameters
-  public static Collection<Object[]> data() throws IOException {
-    Object[][] data = new Object[][] { { WritableTypeFamily.getInstance(), PTableKeyValueIT.class },
-        { WritableTypeFamily.getInstance(), null }, { AvroTypeFamily.getInstance(), PTableKeyValueIT.class },
-        { AvroTypeFamily.getInstance(), null } };
-    return Arrays.asList(data);
-  }
-
-  public UnionCollectionIT(PTypeFamily typeFamily, Class pipelineClass) {
-    this.typeFamily = typeFamily;
-    this.pipelineClass = pipelineClass;
-  }
-
-  @Test
-  public void unionMaterializeShouldNotThrowNPE() throws Exception {
-    checkMaterialized(union.materialize());
-    checkMaterialized(pipeline.materialize(union));
-  }
-
-  private void checkMaterialized(Iterable<String> materialized) {
-    List<String> materializedValues = Lists.newArrayList(materialized.iterator());
-    Collections.sort(materializedValues);
-    LOG.info("Materialized union: " + materializedValues);
-    assertEquals(EXPECTED, materializedValues);
-  }
-
-  @Test
-  public void unionWriteShouldNotThrowNPE() throws IOException {
-    String outputPath1 = tmpDir.getFileName("output1");
-    String outputPath2 = tmpDir.getFileName("output2");
-    String outputPath3 = tmpDir.getFileName("output3");
-
-    if (typeFamily == AvroTypeFamily.getInstance()) {
-      union.write(To.avroFile(outputPath1));
-      pipeline.write(union, To.avroFile(outputPath2));
-
-      pipeline.run();
-
-      checkFileContents(outputPath1);
-      checkFileContents(outputPath2);
-
-    } else {
-
-      union.write(To.textFile(outputPath1));
-      pipeline.write(union, To.textFile(outputPath2));
-      pipeline.writeTextFile(union, outputPath3);
-
-      pipeline.run();
-
-      checkFileContents(outputPath1);
-      checkFileContents(outputPath2);
-      checkFileContents(outputPath3);
-    }
-  }
-
-  private void checkFileContents(String filePath) throws IOException {
-
-    List<String> fileContentValues = (typeFamily != AvroTypeFamily.getInstance() || !(pipeline instanceof MRPipeline)) ? Lists
-        .newArrayList(pipeline.read(At.textFile(filePath, typeFamily.strings())).materialize().iterator()) : Lists
-        .newArrayList(pipeline.read(At.avroFile(filePath, Avros.strings())).materialize().iterator());
-
-    Collections.sort(fileContentValues);
-
-    LOG.info("Saved Union: " + fileContentValues);
-    assertEquals(EXPECTED, fileContentValues);
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/io/CompositePathIterableIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/io/CompositePathIterableIT.java b/crunch/src/it/java/org/apache/crunch/io/CompositePathIterableIT.java
deleted file mode 100644
index 08d226d..0000000
--- a/crunch/src/it/java/org/apache/crunch/io/CompositePathIterableIT.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.crunch.io.text.TextFileReaderFactory;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.writable.Writables;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-public class CompositePathIterableIT {
-  @Rule
-  public TemporaryPath tmpDir = TemporaryPaths.create();
-
-  @Test
-  public void testCreate_FilePresent() throws IOException {
-    String inputFilePath = tmpDir.copyResourceFileName("set1.txt");
-    Configuration conf = new Configuration();
-    LocalFileSystem local = FileSystem.getLocal(conf);
-
-    Iterable<String> iterable = CompositePathIterable.create(local, new Path(inputFilePath),
-        new TextFileReaderFactory<String>(Writables.strings()));
-
-    assertEquals(Lists.newArrayList("b", "c", "a", "e"), Lists.newArrayList(iterable));
-
-  }
-
-  @Test
-  public void testCreate_DirectoryPresentButNoFiles() throws IOException {
-    Path emptyInputDir = tmpDir.getRootPath();
-
-    Configuration conf = new Configuration();
-    LocalFileSystem local = FileSystem.getLocal(conf);
-
-    Iterable<String> iterable = CompositePathIterable.create(local, emptyInputDir,
-        new TextFileReaderFactory<String>(Writables.strings()));
-
-    assertTrue(Lists.newArrayList(iterable).isEmpty());
-  }
-
-  @Test(expected = IOException.class)
-  public void testCreate_DirectoryNotPresent() throws IOException {
-    File nonExistentDir = tmpDir.getFile("not-there");
-
-    // Sanity check
-    assertFalse(nonExistentDir.exists());
-
-    Configuration conf = new Configuration();
-    LocalFileSystem local = FileSystem.getLocal(conf);
-
-    CompositePathIterable.create(local, new Path(nonExistentDir.getAbsolutePath()), new TextFileReaderFactory<String>(
-        Writables.strings()));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/io/NLineInputIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/io/NLineInputIT.java b/crunch/src/it/java/org/apache/crunch/io/NLineInputIT.java
deleted file mode 100644
index 52b8ff5..0000000
--- a/crunch/src/it/java/org/apache/crunch/io/NLineInputIT.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Emitter;
-import org.apache.crunch.PCollection;
-import org.apache.crunch.Pipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.text.NLineFileSource;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.writable.Writables;
-import org.apache.crunch.types.avro.Avros;
-import org.apache.hadoop.conf.Configuration;
-import org.junit.Rule;
-import org.junit.Test;
-
-public class NLineInputIT {
-
-  @Rule
-  public TemporaryPath tmpDir = TemporaryPaths.create();
-  
-  @Test
-  public void testNLine() throws Exception {
-    String urlsInputPath = tmpDir.copyResourceFileName("urls.txt");
-    Configuration conf = new Configuration(tmpDir.getDefaultConfiguration());
-    conf.setInt("io.sort.mb", 10);
-    Pipeline pipeline = new MRPipeline(NLineInputIT.class, conf);
-    PCollection<String> urls = pipeline.read(new NLineFileSource<String>(urlsInputPath,
-        Writables.strings(), 2));
-    assertEquals(new Integer(2),
-        urls.parallelDo(new LineCountFn(), Avros.ints()).max().getValue());
-  }
-  
-  private static class LineCountFn extends DoFn<String, Integer> {
-
-    private int lineCount = 0;
-    
-    @Override
-    public void initialize() {
-      this.lineCount = 0;
-    }
-    
-    @Override
-    public void process(String input, Emitter<Integer> emitter) {
-      lineCount++;
-    }
-    
-    @Override
-    public void cleanup(Emitter<Integer> emitter) {
-      emitter.emit(lineCount);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/io/TextFileTableIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/io/TextFileTableIT.java b/crunch/src/it/java/org/apache/crunch/io/TextFileTableIT.java
deleted file mode 100644
index bddc0b5..0000000
--- a/crunch/src/it/java/org/apache/crunch/io/TextFileTableIT.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io;
-
-import static org.apache.crunch.types.writable.Writables.*;
-import static org.junit.Assert.assertEquals;
-
-import java.util.Set;
-
-import org.apache.crunch.PTable;
-import org.apache.crunch.Pair;
-import org.apache.crunch.Pipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.text.TextFileTableSource;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.ImmutableSet;
-
-/**
- *
- */
-public class TextFileTableIT {
-
-  @Rule
-  public TemporaryPath tmpDir = TemporaryPaths.create();
-  
-  @Test
-  public void testTextFileTable() throws Exception {
-    String urlsFile = tmpDir.copyResourceFileName("urls.txt");
-    Pipeline pipeline = new MRPipeline(TextFileTableIT.class, tmpDir.getDefaultConfiguration());
-    PTable<String, String> urls = pipeline.read(
-        new TextFileTableSource<String, String>(urlsFile, tableOf(strings(), strings())));
-    Set<Pair<String, Long>> cnts = ImmutableSet.copyOf(urls.keys().count().materialize());
-    assertEquals(ImmutableSet.of(Pair.of("www.A.com", 4L), Pair.of("www.B.com", 2L),
-        Pair.of("www.C.com", 1L), Pair.of("www.D.com", 1L), Pair.of("www.E.com", 1L),
-        Pair.of("www.F.com", 2L)), cnts);
-  }
-}