You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/12/11 21:47:52 UTC

[6/9] git commit: Crunch on Spark

Crunch on Spark


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/6e623413
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/6e623413
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/6e623413

Branch: refs/heads/master
Commit: 6e6234138252f9bfa9f08758ea1443fe3f648322
Parents: a691b83
Author: Josh Wills <jw...@apache.org>
Authored: Sun Nov 17 09:14:00 2013 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Wed Dec 11 12:28:10 2013 -0800

----------------------------------------------------------------------
 crunch-spark/pom.xml                            |    93 +
 .../java/org/apache/crunch/SparkCogroupIT.java  |   189 +
 .../org/apache/crunch/SparkMapsideJoinIT.java   |   188 +
 .../java/org/apache/crunch/SparkPageRankIT.java |   153 +
 .../org/apache/crunch/SparkSecondarySortIT.java |    61 +
 .../it/java/org/apache/crunch/SparkSortIT.java  |   315 +
 .../it/java/org/apache/crunch/SparkTfidfIT.java |   228 +
 .../org/apache/crunch/SparkUnionResultsIT.java  |   117 +
 .../org/apache/crunch/test/StringWrapper.java   |   102 +
 .../it/java/org/apache/crunch/test/Tests.java   |    38 +
 crunch-spark/src/it/resources/customers.txt     |     4 +
 crunch-spark/src/it/resources/docs.txt          |     6 +
 crunch-spark/src/it/resources/emptyTextFile.txt |     0
 crunch-spark/src/it/resources/letters.txt       |     2 +
 crunch-spark/src/it/resources/log4j.properties  |    30 +
 crunch-spark/src/it/resources/maugham.txt       | 29112 +++++++++++++++++
 crunch-spark/src/it/resources/orders.txt        |     5 +
 .../apache/crunch/SparkCogroupITData/src1.txt   |     4 +
 .../apache/crunch/SparkCogroupITData/src2.txt   |     4 +
 .../src/it/resources/secondary_sort_input.txt   |     7 +
 crunch-spark/src/it/resources/set1.txt          |     4 +
 crunch-spark/src/it/resources/set2.txt          |     3 +
 crunch-spark/src/it/resources/shakes.txt        |  3667 +++
 crunch-spark/src/it/resources/sort_by_value.txt |     5 +
 crunch-spark/src/it/resources/urls.txt          |    11 +
 .../org/apache/crunch/impl/spark/ByteArray.java |    50 +
 .../impl/spark/CounterAccumulatorParam.java     |    45 +
 .../apache/crunch/impl/spark/GuavaUtils.java    |    44 +
 .../apache/crunch/impl/spark/IntByteArray.java  |    43 +
 .../crunch/impl/spark/SparkCollection.java      |    24 +
 .../crunch/impl/spark/SparkComparator.java      |    86 +
 .../crunch/impl/spark/SparkPartitioner.java     |    37 +
 .../apache/crunch/impl/spark/SparkPipeline.java |   118 +
 .../apache/crunch/impl/spark/SparkRuntime.java  |   326 +
 .../crunch/impl/spark/SparkRuntimeContext.java  |   202 +
 .../crunch/impl/spark/collect/DoCollection.java |    65 +
 .../crunch/impl/spark/collect/DoTable.java      |    78 +
 .../impl/spark/collect/InputCollection.java     |    61 +
 .../crunch/impl/spark/collect/InputTable.java   |    51 +
 .../impl/spark/collect/PGroupedTableImpl.java   |   122 +
 .../impl/spark/collect/SparkCollectFactory.java |    99 +
 .../impl/spark/collect/ToByteArrayFunction.java |    33 +
 .../impl/spark/collect/UnionCollection.java     |    58 +
 .../crunch/impl/spark/collect/UnionTable.java   |    59 +
 .../impl/spark/fn/CombineMapsideFunction.java   |    90 +
 .../crunch/impl/spark/fn/CrunchIterable.java    |    38 +
 .../crunch/impl/spark/fn/FlatMapDoFn.java       |    41 +
 .../crunch/impl/spark/fn/FlatMapPairDoFn.java   |    45 +
 .../impl/spark/fn/InputConverterFunction.java   |    35 +
 .../crunch/impl/spark/fn/MapFunction.java       |    42 +
 .../crunch/impl/spark/fn/MapOutputFunction.java |    42 +
 .../impl/spark/fn/OutputConverterFunction.java  |    35 +
 .../crunch/impl/spark/fn/PairFlatMapDoFn.java   |    46 +
 .../impl/spark/fn/PairFlatMapPairDoFn.java      |    49 +
 .../crunch/impl/spark/fn/PairMapFunction.java   |    44 +
 .../impl/spark/fn/PairMapIterableFunction.java  |    51 +
 .../spark/fn/PartitionedMapOutputFunction.java  |    83 +
 .../impl/spark/fn/ReduceGroupingFunction.java   |   121 +
 .../impl/spark/fn/ReduceInputFunction.java      |    44 +
 .../crunch/impl/spark/serde/AvroSerDe.java      |   109 +
 .../apache/crunch/impl/spark/serde/SerDe.java   |    30 +
 .../crunch/impl/spark/serde/WritableSerDe.java  |    70 +
 pom.xml                                         |    22 +-
 63 files changed, 36982 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-spark/pom.xml b/crunch-spark/pom.xml
new file mode 100644
index 0000000..57f2be8
--- /dev/null
+++ b/crunch-spark/pom.xml
@@ -0,0 +1,93 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.crunch</groupId>
+    <artifactId>crunch-parent</artifactId>
+    <version>0.9.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>crunch-spark</artifactId>
+  <name>Apache Crunch for Spark</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.crunch</groupId>
+      <artifactId>crunch-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-server</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.version}</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.crunch</groupId>
+      <artifactId>crunch-test</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-failsafe-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/it/java/org/apache/crunch/SparkCogroupIT.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/java/org/apache/crunch/SparkCogroupIT.java b/crunch-spark/src/it/java/org/apache/crunch/SparkCogroupIT.java
new file mode 100644
index 0000000..66d30d2
--- /dev/null
+++ b/crunch-spark/src/it/java/org/apache/crunch/SparkCogroupIT.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import org.apache.crunch.impl.spark.SparkPipeline;
+import org.apache.crunch.lib.Cogroup;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.Tests;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+public class SparkCogroupIT {
+  @Rule
+  public TemporaryPath tmpDir = new TemporaryPath();
+
+  private SparkPipeline pipeline;
+  private PCollection<String> lines1;
+  private PCollection<String> lines2;
+  private PCollection<String> lines3;
+  private PCollection<String> lines4;
+
+  @Before
+  public void setUp() throws IOException {
+    pipeline = new SparkPipeline("local", "wordcount");
+    lines1 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src1.txt")));
+    lines2 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src2.txt")));
+    lines3 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src1.txt")));
+    lines4 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src2.txt")));
+  }
+
+  @After
+  public void tearDown() {
+    pipeline.done();
+  }
+
+  @Test
+  public void testCogroupWritables() {
+    runCogroup(WritableTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testCogroupAvro() {
+    runCogroup(AvroTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testCogroup3Writables() {
+    runCogroup3(WritableTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testCogroup3Avro() {
+    runCogroup3(AvroTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testCogroup4Writables() {
+    runCogroup3(WritableTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testCogroup4Avro() {
+    runCogroup3(AvroTypeFamily.getInstance());
+  }
+
+  public void runCogroup(PTypeFamily ptf) {
+    PTableType<String, String> tt = ptf.tableOf(ptf.strings(), ptf.strings());
+
+    PTable<String, String> kv1 = lines1.parallelDo("kv1", new KeyValueSplit(), tt);
+    PTable<String, String> kv2 = lines2.parallelDo("kv2", new KeyValueSplit(), tt);
+
+    PTable<String, Pair<Collection<String>, Collection<String>>> cg = Cogroup.cogroup(kv1, kv2);
+
+    Map<String, Pair<Collection<String>, Collection<String>>> result = cg.materializeToMap();
+    Map<String, Pair<Collection<String>, Collection<String>>> actual = Maps.newHashMap();
+    for (Map.Entry<String, Pair<Collection<String>, Collection<String>>> e : result.entrySet()) {
+      Collection<String> one = ImmutableSet.copyOf(e.getValue().first());
+      Collection<String> two = ImmutableSet.copyOf(e.getValue().second());
+      actual.put(e.getKey(), Pair.of(one, two));
+    }
+    Map<String, Pair<Collection<String>, Collection<String>>> expected = ImmutableMap.of(
+        "a", Pair.of(coll("1-1", "1-4"), coll()),
+        "b", Pair.of(coll("1-2"), coll("2-1")),
+        "c", Pair.of(coll("1-3"), coll("2-2", "2-3")),
+        "d", Pair.of(coll(), coll("2-4"))
+    );
+
+    assertThat(actual, is(expected));
+  }
+
+  public void runCogroup3(PTypeFamily ptf) {
+    PTableType<String, String> tt = ptf.tableOf(ptf.strings(), ptf.strings());
+
+    PTable<String, String> kv1 = lines1.parallelDo("kv1", new KeyValueSplit(), tt);
+    PTable<String, String> kv2 = lines2.parallelDo("kv2", new KeyValueSplit(), tt);
+    PTable<String, String> kv3 = lines3.parallelDo("kv3", new KeyValueSplit(), tt);
+
+    PTable<String, Tuple3.Collect<String, String, String>> cg = Cogroup.cogroup(kv1, kv2, kv3);
+
+    Map<String, Tuple3.Collect<String, String, String>> result = cg.materializeToMap();
+    Map<String, Tuple3.Collect<String, String, String>> actual = Maps.newHashMap();
+    for (Map.Entry<String, Tuple3.Collect<String, String, String>> e : result.entrySet()) {
+      Collection<String> one = ImmutableSet.copyOf(e.getValue().first());
+      Collection<String> two = ImmutableSet.copyOf(e.getValue().second());
+      Collection<String> three = ImmutableSet.copyOf(e.getValue().third());
+      actual.put(e.getKey(), new Tuple3.Collect<String, String, String>(one, two, three));
+    }
+    Map<String, Tuple3.Collect<String, String, String>> expected = ImmutableMap.of(
+        "a", new Tuple3.Collect<String, String, String>(coll("1-1", "1-4"), coll(), coll("1-1", "1-4")),
+        "b", new Tuple3.Collect<String, String, String>(coll("1-2"), coll("2-1"), coll("1-2")),
+        "c", new Tuple3.Collect<String, String, String>(coll("1-3"), coll("2-2", "2-3"), coll("1-3")),
+        "d", new Tuple3.Collect<String, String, String>(coll(), coll("2-4"), coll())
+    );
+
+    assertThat(actual, is(expected));
+  }
+
+  public void runCogroup4(PTypeFamily ptf) {
+    PTableType<String, String> tt = ptf.tableOf(ptf.strings(), ptf.strings());
+
+    PTable<String, String> kv1 = lines1.parallelDo("kv1", new KeyValueSplit(), tt);
+    PTable<String, String> kv2 = lines2.parallelDo("kv2", new KeyValueSplit(), tt);
+    PTable<String, String> kv3 = lines3.parallelDo("kv3", new KeyValueSplit(), tt);
+    PTable<String, String> kv4 = lines4.parallelDo("kv4", new KeyValueSplit(), tt);
+
+    PTable<String, Tuple4.Collect<String, String, String, String>> cg = Cogroup.cogroup(kv1, kv2, kv3, kv4);
+
+    Map<String, Tuple4.Collect<String, String, String, String>> result = cg.materializeToMap();
+    Map<String, Tuple4.Collect<String, String, String, String>> actual = Maps.newHashMap();
+    for (Map.Entry<String, Tuple4.Collect<String, String, String, String>> e : result.entrySet()) {
+      Collection<String> one = ImmutableSet.copyOf(e.getValue().first());
+      Collection<String> two = ImmutableSet.copyOf(e.getValue().second());
+      Collection<String> three = ImmutableSet.copyOf(e.getValue().third());
+      Collection<String> four = ImmutableSet.copyOf(e.getValue().fourth());
+      actual.put(e.getKey(), new Tuple4.Collect<String, String, String, String>(one, two, three, four));
+    }
+    Map<String, Tuple4.Collect<String, String, String, String>> expected = ImmutableMap.of(
+        "a", new Tuple4.Collect<String, String, String, String>(coll("1-1", "1-4"), coll(), coll("1-1", "1-4"), coll()),
+        "b", new Tuple4.Collect<String, String, String, String>(coll("1-2"), coll("2-1"), coll("1-2"), coll("2-1")),
+        "c", new Tuple4.Collect<String, String, String, String>(coll("1-3"), coll("2-2", "2-3"), coll("1-3"), coll("2-2", "2-3")),
+        "d", new Tuple4.Collect<String, String, String, String>(coll(), coll("2-4"), coll(), coll("2-4"))
+    );
+
+    assertThat(actual, is(expected));
+  }
+
+  private static class KeyValueSplit extends DoFn<String, Pair<String, String>> {
+    @Override
+    public void process(String input, Emitter<Pair<String, String>> emitter) {
+      String[] fields = input.split(",");
+      emitter.emit(Pair.of(fields[0], fields[1]));
+    }
+  }
+
+  private static Collection<String> coll(String... values) {
+    return ImmutableSet.copyOf(values);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/it/java/org/apache/crunch/SparkMapsideJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/java/org/apache/crunch/SparkMapsideJoinIT.java b/crunch-spark/src/it/java/org/apache/crunch/SparkMapsideJoinIT.java
new file mode 100644
index 0000000..b58942f
--- /dev/null
+++ b/crunch-spark/src/it/java/org/apache/crunch/SparkMapsideJoinIT.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import com.google.common.collect.Lists;
+import org.apache.crunch.fn.FilterFns;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.impl.spark.SparkPipeline;
+import org.apache.crunch.lib.join.JoinStrategy;
+import org.apache.crunch.lib.join.JoinType;
+import org.apache.crunch.lib.join.MapsideJoinStrategy;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.types.writable.Writables;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class SparkMapsideJoinIT {
+  private static String saveTempDir;
+
+  @BeforeClass
+  public static void setUpClass(){
+
+    // Ensure a consistent temporary directory for use of the DistributedCache.
+
+    // The DistributedCache technically isn't supported when running in local mode, and the default
+    // temporary directiory "/tmp" is used as its location. This typically only causes an issue when
+    // running integration tests on Mac OS X, as OS X doesn't use "/tmp" as it's default temporary
+    // directory. The following call ensures that "/tmp" is used as the temporary directory on all platforms.
+    saveTempDir = System.setProperty("java.io.tmpdir", "/tmp");
+  }
+
+  @AfterClass
+  public static void tearDownClass(){
+    System.setProperty("java.io.tmpdir", saveTempDir);
+  }
+
+  private static class LineSplitter extends MapFn<String, Pair<Integer, String>> {
+    @Override
+    public Pair<Integer, String> map(String input) {
+      String[] fields = input.split("\\|");
+      return Pair.of(Integer.parseInt(fields[0]), fields[1]);
+    }
+  }
+
+  private static class CapOrdersFn extends MapFn<String, String> {
+    @Override
+    public String map(String v) {
+      return v.toUpperCase(Locale.ENGLISH);
+    }
+  }
+
+  private static class ConcatValuesFn extends MapFn<Pair<String, String>, String> {
+    @Override
+    public String map(Pair<String, String> v) {
+      return v.toString();
+    }
+  }
+
+  @Rule
+  public TemporaryPath tmpDir = new TemporaryPath();
+
+  @Test
+  public void testMapsideJoin_RightSideIsEmpty() throws IOException {
+    Pipeline pipeline = new SparkPipeline("local", "mapside");
+    PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");
+    PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
+
+    PTable<Integer, String> filteredOrderTable = orderTable
+        .parallelDo(FilterFns.<Pair<Integer, String>>REJECT_ALL(), orderTable.getPTableType());
+
+
+    JoinStrategy<Integer, String, String> mapsideJoin = new MapsideJoinStrategy<Integer, String, String>();
+    PTable<Integer, Pair<String, String>> joined = mapsideJoin.join(customerTable, filteredOrderTable, JoinType.INNER_JOIN);
+
+    List<Pair<Integer, Pair<String, String>>> materializedJoin = Lists.newArrayList(joined.materialize());
+
+    assertTrue(materializedJoin.isEmpty());
+    pipeline.done();
+  }
+
+  @Test
+  public void testMapsideJoin() throws IOException {
+    runMapsideJoin(new SparkPipeline("local", "mapside"), false);
+  }
+
+  @Test
+  public void testMapsideJoin_Materialized() throws IOException {
+    runMapsideJoin(new SparkPipeline("local", "mapside"), true);
+  }
+
+  @Test
+  public void testMapsideJoin_LeftOuterJoin() throws IOException {
+    runMapsideLeftOuterJoin(new SparkPipeline("local", "mapside"), false);
+  }
+
+  @Test
+  public void testMapsideJoin_LeftOuterJoin_Materialized() throws IOException {
+    runMapsideLeftOuterJoin(new SparkPipeline("local", "mapside"), true);
+  }
+
+  private void runMapsideJoin(Pipeline pipeline, boolean materialize) {
+    PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");
+    PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
+
+    JoinStrategy<Integer, String, String> mapsideJoin = new MapsideJoinStrategy<Integer, String, String>(materialize);
+    PTable<Integer, String> custOrders = mapsideJoin.join(customerTable, orderTable, JoinType.INNER_JOIN)
+        .mapValues("concat", new ConcatValuesFn(), Writables.strings());
+
+    PTable<Integer, String> ORDER_TABLE = orderTable.mapValues(new CapOrdersFn(), orderTable.getValueType());
+    PTable<Integer, Pair<String, String>> joined = mapsideJoin.join(custOrders, ORDER_TABLE, JoinType.INNER_JOIN);
+
+    List<Pair<Integer, Pair<String, String>>> expectedJoinResult = Lists.newArrayList();
+    expectedJoinResult.add(Pair.of(111, Pair.of("[John Doe,Corn flakes]", "CORN FLAKES")));
+    expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet paper]", "TOILET PAPER")));
+    expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet paper]", "TOILET PLUNGER")));
+    expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet plunger]", "TOILET PAPER")));
+    expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet plunger]", "TOILET PLUNGER")));
+    expectedJoinResult.add(Pair.of(333, Pair.of("[Someone Else,Toilet brush]", "TOILET BRUSH")));
+    Iterable<Pair<Integer, Pair<String, String>>> iter = joined.materialize();
+
+    List<Pair<Integer, Pair<String, String>>> joinedResultList = Lists.newArrayList(iter);
+    Collections.sort(joinedResultList);
+    assertEquals(expectedJoinResult, joinedResultList);
+    pipeline.done();
+  }
+
+  private void runMapsideLeftOuterJoin(Pipeline pipeline, boolean materialize) {
+    PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");
+    PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
+
+    JoinStrategy<Integer, String, String> mapsideJoin = new MapsideJoinStrategy<Integer, String, String>(materialize);
+    PTable<Integer, String> custOrders = mapsideJoin.join(customerTable, orderTable, JoinType.LEFT_OUTER_JOIN)
+        .mapValues("concat", new ConcatValuesFn(), Writables.strings());
+
+    PTable<Integer, String> ORDER_TABLE = orderTable.mapValues(new CapOrdersFn(), orderTable.getValueType());
+    PTable<Integer, Pair<String, String>> joined = mapsideJoin.join(custOrders, ORDER_TABLE, JoinType.LEFT_OUTER_JOIN);
+
+    List<Pair<Integer, Pair<String, String>>> expectedJoinResult = Lists.newArrayList();
+    expectedJoinResult.add(Pair.of(111, Pair.of("[John Doe,Corn flakes]", "CORN FLAKES")));
+    expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet paper]", "TOILET PAPER")));
+    expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet paper]", "TOILET PLUNGER")));
+    expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet plunger]", "TOILET PAPER")));
+    expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet plunger]", "TOILET PLUNGER")));
+    expectedJoinResult.add(Pair.of(333, Pair.of("[Someone Else,Toilet brush]", "TOILET BRUSH")));
+    expectedJoinResult.add(Pair.of(444, Pair.<String,String>of("[Has No Orders,null]", null)));
+    Iterable<Pair<Integer, Pair<String, String>>> iter = joined.materialize();
+
+    List<Pair<Integer, Pair<String, String>>> joinedResultList = Lists.newArrayList(iter);
+    Collections.sort(joinedResultList);
+    assertEquals(expectedJoinResult, joinedResultList);
+    pipeline.done();
+  }
+
+  private PTable<Integer, String> readTable(Pipeline pipeline, String filename) {
+    try {
+      return pipeline.readTextFile(tmpDir.copyResourceFileName(filename)).parallelDo("asTable",
+          new LineSplitter(),
+          Writables.tableOf(Writables.ints(), Writables.strings()));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/it/java/org/apache/crunch/SparkPageRankIT.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/java/org/apache/crunch/SparkPageRankIT.java b/crunch-spark/src/it/java/org/apache/crunch/SparkPageRankIT.java
new file mode 100644
index 0000000..c76c62a
--- /dev/null
+++ b/crunch-spark/src/it/java/org/apache/crunch/SparkPageRankIT.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.apache.crunch.impl.spark.SparkPipeline;
+import org.apache.crunch.lib.Aggregate;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.PTypes;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class SparkPageRankIT {
+
+  public static class PageRankData {
+    public float score;
+    public float lastScore;
+    public List<String> urls;
+
+    public PageRankData() {
+    }
+
+    public PageRankData(float score, float lastScore, Iterable<String> urls) {
+      this.score = score;
+      this.lastScore = lastScore;
+      this.urls = Lists.newArrayList(urls);
+    }
+
+    public PageRankData next(float newScore) {
+      return new PageRankData(newScore, score, urls);
+    }
+
+    public float propagatedScore() {
+      return score / urls.size();
+    }
+
+    @Override
+    public String toString() {
+      return score + " " + lastScore + " " + urls;
+    }
+  }
+
+  @Rule
+  public TemporaryPath tmpDir = new TemporaryPath();
+  private Pipeline pipeline;
+
+  @Before
+  public void setUp() throws Exception {
+    pipeline = new SparkPipeline("local", "pagerank");
+  }
+
+  @Test
+  public void testAvroJSON() throws Exception {
+    PTypeFamily tf = AvroTypeFamily.getInstance();
+    PType<PageRankData> prType = PTypes.jsonString(PageRankData.class, tf);
+    String urlInput = tmpDir.copyResourceFileName("urls.txt");
+    run(pipeline, urlInput, prType, tf);
+    pipeline.done();
+  }
+
+  @Test
+  public void testWritablesJSON() throws Exception {
+    PTypeFamily tf = WritableTypeFamily.getInstance();
+    PType<PageRankData> prType = PTypes.jsonString(PageRankData.class, tf);
+    String urlInput = tmpDir.copyResourceFileName("urls.txt");
+    run(pipeline, urlInput, prType, tf);
+    pipeline.done();
+  }
+
+  public static PTable<String, PageRankData> pageRank(PTable<String, PageRankData> input, final float d) {
+    PTypeFamily ptf = input.getTypeFamily();
+    PTable<String, Float> outbound = input.parallelDo(new DoFn<Pair<String, PageRankData>, Pair<String, Float>>() {
+      @Override
+      public void process(Pair<String, PageRankData> input, Emitter<Pair<String, Float>> emitter) {
+        PageRankData prd = input.second();
+        for (String link : prd.urls) {
+          emitter.emit(Pair.of(link, prd.propagatedScore()));
+        }
+      }
+    }, ptf.tableOf(ptf.strings(), ptf.floats()));
+
+    return input.cogroup(outbound).mapValues(
+        new MapFn<Pair<Collection<PageRankData>, Collection<Float>>, PageRankData>() {
+          @Override
+          public PageRankData map(Pair<Collection<PageRankData>, Collection<Float>> input) {
+            PageRankData prd = Iterables.getOnlyElement(input.first());
+            Collection<Float> propagatedScores = input.second();
+            float sum = 0.0f;
+            for (Float s : propagatedScores) {
+              sum += s;
+            }
+            return prd.next(d + (1.0f - d) * sum);
+          }
+        }, input.getValueType());
+  }
+
+  public static void run(Pipeline pipeline, String urlInput,
+                         PType<PageRankData> prType, PTypeFamily ptf) throws Exception {
+    PTable<String, PageRankData> scores = pipeline.readTextFile(urlInput)
+        .parallelDo(new MapFn<String, Pair<String, String>>() {
+          @Override
+          public Pair<String, String> map(String input) {
+            String[] urls = input.split("\\t");
+            return Pair.of(urls[0], urls[1]);
+          }
+        }, ptf.tableOf(ptf.strings(), ptf.strings())).groupByKey()
+        .mapValues(new MapFn<Iterable<String>, PageRankData>() {
+          @Override
+          public PageRankData map(Iterable<String> input) {
+            return new PageRankData(1.0f, 0.0f, input);
+          }
+        }, prType);
+
+    Float delta = 1.0f;
+    while (delta > 0.01) {
+      scores = pageRank(scores, 0.5f).cache();
+      delta = Aggregate.max(scores.parallelDo(new MapFn<Pair<String, PageRankData>, Float>() {
+        @Override
+        public Float map(Pair<String, PageRankData> input) {
+          PageRankData prd = input.second();
+          return Math.abs(prd.score - prd.lastScore);
+        }
+      }, ptf.floats())).getValue();
+    }
+    assertEquals(0.0048, delta, 0.001);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/it/java/org/apache/crunch/SparkSecondarySortIT.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/java/org/apache/crunch/SparkSecondarySortIT.java b/crunch-spark/src/it/java/org/apache/crunch/SparkSecondarySortIT.java
new file mode 100644
index 0000000..f9e3265
--- /dev/null
+++ b/crunch-spark/src/it/java/org/apache/crunch/SparkSecondarySortIT.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import org.apache.crunch.impl.spark.SparkPipeline;
+import org.apache.crunch.io.From;
+import org.apache.crunch.lib.SecondarySort;
+import org.apache.crunch.test.CrunchTestSupport;
+import org.junit.Test;
+
+import java.io.Serializable;
+
+import static org.apache.crunch.types.avro.Avros.*;
+import static org.apache.crunch.types.avro.Avros.ints;
+import static org.apache.crunch.types.avro.Avros.strings;
+import static org.junit.Assert.assertEquals;
+
+public class SparkSecondarySortIT extends CrunchTestSupport implements Serializable {
+  @Test
+  public void testSecondarySort() throws Exception {
+    Pipeline p = new SparkPipeline("local", "secondarysort");
+    String inputFile = tempDir.copyResourceFileName("secondary_sort_input.txt");
+
+    PTable<String, Pair<Integer, Integer>> in = p.read(From.textFile(inputFile))
+        .parallelDo(new MapFn<String, Pair<String, Pair<Integer, Integer>>>() {
+          @Override
+          public Pair<String, Pair<Integer, Integer>> map(String input) {
+            String[] pieces = input.split(",");
+            return Pair.of(pieces[0],
+                Pair.of(Integer.valueOf(pieces[1].trim()), Integer.valueOf(pieces[2].trim())));
+          }
+        }, tableOf(strings(), pairs(ints(), ints())));
+    Iterable<String> lines = SecondarySort.sortAndApply(in, new MapFn<Pair<String, Iterable<Pair<Integer, Integer>>>, String>() {
+      @Override
+      public String map(Pair<String, Iterable<Pair<Integer, Integer>>> input) {
+        Joiner j = Joiner.on(',');
+        return j.join(input.first(), j.join(input.second()));
+      }
+    }, strings()).materialize();
+    assertEquals(ImmutableList.of("one,[-5,10],[1,1],[2,-3]", "three,[0,-1]", "two,[1,7],[2,6],[4,5]"),
+        ImmutableList.copyOf(lines));
+    p.done();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/it/java/org/apache/crunch/SparkSortIT.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/java/org/apache/crunch/SparkSortIT.java b/crunch-spark/src/it/java/org/apache/crunch/SparkSortIT.java
new file mode 100644
index 0000000..994aae9
--- /dev/null
+++ b/crunch-spark/src/it/java/org/apache/crunch/SparkSortIT.java
@@ -0,0 +1,315 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import com.google.common.collect.Lists;
+import org.apache.crunch.impl.spark.SparkPipeline;
+import org.apache.crunch.lib.Sort;
+import org.apache.crunch.test.StringWrapper;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.crunch.lib.Sort.ColumnOrder.by;
+import static org.apache.crunch.lib.Sort.Order.ASCENDING;
+import static org.apache.crunch.lib.Sort.Order.DESCENDING;
+import static org.apache.crunch.test.StringWrapper.wrap;
+import static org.junit.Assert.assertEquals;
+
+public class SparkSortIT implements Serializable {
+  @Rule
+  public transient TemporaryPath tmpDir = new TemporaryPath();
+
+  @Test
+  public void testWritableSortAsc() throws Exception {
+    runSingle(new SparkPipeline("local", "sort"), WritableTypeFamily.getInstance(), Sort.Order.ASCENDING,
+        "A\tand this text as well");
+  }
+
+  @Test
+  public void testWritableSortDesc() throws Exception {
+    runSingle(new SparkPipeline("local", "sort"), WritableTypeFamily.getInstance(), Sort.Order.DESCENDING,
+        "B\tthis doc has some text");
+  }
+
+  @Test
+  public void testWritableSortAscDesc() throws Exception {
+    runPair(new SparkPipeline("local", "sort"), WritableTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING), "A",
+        "this doc has this text");
+  }
+
+  @Test
+  public void testWritableSortSecondDescFirstAsc() throws Exception {
+    runPair(new SparkPipeline("local", "sort"), WritableTypeFamily.getInstance(), by(2, DESCENDING), by(1, ASCENDING), "A",
+        "this doc has this text");
+  }
+
+  @Test
+  public void testWritableSortTripleAscDescAsc() throws Exception {
+    runTriple(new SparkPipeline("local", "sort"), WritableTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING),
+        by(3, ASCENDING), "A", "this", "doc");
+  }
+
+  @Test
+  public void testWritableSortQuadAscDescAscDesc() throws Exception {
+    runQuad(new SparkPipeline("local", "sort"), WritableTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING),
+        by(3, ASCENDING), by(4, DESCENDING), "A", "this", "doc", "has");
+  }
+
+  @Test
+  public void testWritableSortTupleNAscDesc() throws Exception {
+    runTupleN(new SparkPipeline("local", "sort"), WritableTypeFamily.getInstance(),
+        new Sort.ColumnOrder[] { by(1, ASCENDING), by(2, DESCENDING) }, new String[] { "A", "this doc has this text" });
+  }
+
+  @Test
+  public void testWritableSortTable() throws Exception {
+    runTable(new SparkPipeline("local", "sort"), WritableTypeFamily.getInstance(), "A");
+  }
+
+  @Test
+  public void testAvroSortAsc() throws Exception {
+    runSingle(new SparkPipeline("local", "sort"), AvroTypeFamily.getInstance(), Sort.Order.ASCENDING, "A\tand this text as well");
+  }
+
+  @Test
+  public void testAvroSortDesc() throws Exception {
+    runSingle(new SparkPipeline("local", "sort"), AvroTypeFamily.getInstance(), Sort.Order.DESCENDING, "B\tthis doc has some text");
+  }
+
+  @Test
+  public void testAvroSortPairAscDesc() throws Exception {
+    runPair(new SparkPipeline("local", "sort"), AvroTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING), "A",
+        "this doc has this text");
+  }
+
+  @Test
+  public void testAvroSortPairSecondDescFirstAsc() throws Exception {
+    runPair(new SparkPipeline("local", "sort"), AvroTypeFamily.getInstance(), by(2, DESCENDING), by(1, ASCENDING), "A",
+        "this doc has this text");
+  }
+
+  @Test
+  public void testAvroSortTripleAscDescAsc() throws Exception {
+    runTriple(new SparkPipeline("local", "sort"), AvroTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING),
+        by(3, ASCENDING), "A", "this", "doc");
+  }
+
+  @Test
+  public void testAvroSortQuadAscDescAscDesc() throws Exception {
+    runQuad(new SparkPipeline("local", "sort"), AvroTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING),
+        by(3, ASCENDING), by(4, DESCENDING), "A", "this", "doc", "has");
+  }
+
+  @Test
+  public void testAvroSortTupleNAscDesc() throws Exception {
+    runTupleN(new SparkPipeline("local", "sort"), AvroTypeFamily.getInstance(),
+        new Sort.ColumnOrder[] { by(1, ASCENDING), by(2, DESCENDING) }, new String[] { "A", "this doc has this text" });
+  }
+
+  @Test
+  public void testAvroReflectSortPair() throws IOException {
+    Pipeline pipeline = new SparkPipeline("local", "sort");
+    pipeline.enableDebug();
+    String rsrc = tmpDir.copyResourceFileName("set2.txt");
+    PCollection<Pair<String, StringWrapper>> in = pipeline.readTextFile(rsrc)
+        .parallelDo(new MapFn<String, Pair<String, StringWrapper>>() {
+
+          @Override
+          public Pair<String, StringWrapper> map(String input) {
+            return Pair.of(input, wrap(input));
+          }
+        }, Avros.pairs(Avros.strings(), Avros.reflects(StringWrapper.class)));
+    PCollection<Pair<String, StringWrapper>> sorted = Sort.sort(in, Sort.Order.ASCENDING);
+
+    List<Pair<String, StringWrapper>> expected = Lists.newArrayList();
+    expected.add(Pair.of("a", wrap("a")));
+    expected.add(Pair.of("c", wrap("c")));
+    expected.add(Pair.of("d", wrap("d")));
+
+    assertEquals(expected, Lists.newArrayList(sorted.materialize()));
+    pipeline.done();
+  }
+
+  @Test
+  public void testAvroReflectSortTable() throws IOException {
+    Pipeline pipeline = new SparkPipeline("local", "sort");
+    PTable<String, StringWrapper> unsorted = pipeline.readTextFile(tmpDir.copyResourceFileName("set2.txt")).parallelDo(
+        new MapFn<String, Pair<String, StringWrapper>>() {
+
+          @Override
+          public Pair<String, StringWrapper> map(String input) {
+            return Pair.of(input, wrap(input));
+          }
+        }, Avros.tableOf(Avros.strings(), Avros.reflects(StringWrapper.class)));
+
+    PTable<String, StringWrapper> sorted = Sort.sort(unsorted);
+
+    List<Pair<String, StringWrapper>> expected = Lists.newArrayList();
+    expected.add(Pair.of("a", wrap("a")));
+    expected.add(Pair.of("c", wrap("c")));
+    expected.add(Pair.of("d", wrap("d")));
+
+    assertEquals(expected, Lists.newArrayList(sorted.materialize()));
+    pipeline.done();
+  }
+
+  @Test
+  public void testAvroSortTable() throws Exception {
+    runTable(new SparkPipeline("local", "sort"), AvroTypeFamily.getInstance(), "A");
+  }
+
+  private void runSingle(Pipeline pipeline, PTypeFamily typeFamily, Sort.Order order, String firstLine) throws IOException {
+    String inputPath = tmpDir.copyResourceFileName("docs.txt");
+
+    PCollection<String> input = pipeline.readTextFile(inputPath);
+    // following turns the input from Writables to required type family
+    PCollection<String> input2 = input.parallelDo(new DoFn<String, String>() {
+      @Override
+      public void process(String input, Emitter<String> emitter) {
+        emitter.emit(input);
+      }
+    }, typeFamily.strings());
+    PCollection<String> sorted = Sort.sort(input2, order);
+    Iterable<String> lines = sorted.materialize();
+
+    assertEquals(firstLine, lines.iterator().next());
+    pipeline.done(); // TODO: finally
+  }
+
+  private void runPair(Pipeline pipeline, PTypeFamily typeFamily, Sort.ColumnOrder first, Sort.ColumnOrder second,
+                       String firstField, String secondField) throws IOException {
+    String inputPath = tmpDir.copyResourceFileName("docs.txt");
+
+    PCollection<String> input = pipeline.readTextFile(inputPath);
+    PTable<String, String> kv = input.parallelDo(new DoFn<String, Pair<String, String>>() {
+      @Override
+      public void process(String input, Emitter<Pair<String, String>> emitter) {
+        String[] split = input.split("[\t]+");
+        emitter.emit(Pair.of(split[0], split[1]));
+      }
+    }, typeFamily.tableOf(typeFamily.strings(), typeFamily.strings()));
+    PCollection<Pair<String, String>> sorted = Sort.sortPairs(kv, first, second);
+    List<Pair<String, String>> lines = Lists.newArrayList(sorted.materialize());
+    Pair<String, String> l = lines.iterator().next();
+    assertEquals(firstField, l.first());
+    assertEquals(secondField, l.second());
+    pipeline.done();
+  }
+
+  private void runTriple(Pipeline pipeline, PTypeFamily typeFamily, Sort.ColumnOrder first, Sort.ColumnOrder second,
+                         Sort.ColumnOrder third, String firstField, String secondField, String thirdField) throws IOException {
+    String inputPath = tmpDir.copyResourceFileName("docs.txt");
+
+    PCollection<String> input = pipeline.readTextFile(inputPath);
+    PCollection<Tuple3<String, String, String>> kv = input.parallelDo(
+        new DoFn<String, Tuple3<String, String, String>>() {
+          @Override
+          public void process(String input, Emitter<Tuple3<String, String, String>> emitter) {
+            String[] split = input.split("[\t ]+");
+            int len = split.length;
+            emitter.emit(Tuple3.of(split[0], split[1 % len], split[2 % len]));
+          }
+        }, typeFamily.triples(typeFamily.strings(), typeFamily.strings(), typeFamily.strings()));
+    PCollection<Tuple3<String, String, String>> sorted = Sort.sortTriples(kv, first, second, third);
+    List<Tuple3<String, String, String>> lines = Lists.newArrayList(sorted.materialize());
+    Tuple3<String, String, String> l = lines.iterator().next();
+    assertEquals(firstField, l.first());
+    assertEquals(secondField, l.second());
+    assertEquals(thirdField, l.third());
+    pipeline.done();
+  }
+
+  private void runQuad(Pipeline pipeline, PTypeFamily typeFamily, Sort.ColumnOrder first, Sort.ColumnOrder second,
+                       Sort.ColumnOrder third, Sort.ColumnOrder fourth, String firstField, String secondField, String thirdField,
+                       String fourthField) throws IOException {
+    String inputPath = tmpDir.copyResourceFileName("docs.txt");
+
+    PCollection<String> input = pipeline.readTextFile(inputPath);
+    PCollection<Tuple4<String, String, String, String>> kv = input.parallelDo(
+        new DoFn<String, Tuple4<String, String, String, String>>() {
+          @Override
+          public void process(String input, Emitter<Tuple4<String, String, String, String>> emitter) {
+            String[] split = input.split("[\t ]+");
+            int len = split.length;
+            emitter.emit(Tuple4.of(split[0], split[1 % len], split[2 % len], split[3 % len]));
+          }
+        }, typeFamily.quads(typeFamily.strings(), typeFamily.strings(), typeFamily.strings(), typeFamily.strings()));
+    PCollection<Tuple4<String, String, String, String>> sorted = Sort.sortQuads(kv, first, second, third, fourth);
+    Iterable<Tuple4<String, String, String, String>> lines = sorted.materialize();
+    Tuple4<String, String, String, String> l = lines.iterator().next();
+    assertEquals(firstField, l.first());
+    assertEquals(secondField, l.second());
+    assertEquals(thirdField, l.third());
+    assertEquals(fourthField, l.fourth());
+    pipeline.done();
+  }
+
+  private void runTupleN(Pipeline pipeline, PTypeFamily typeFamily, Sort.ColumnOrder[] orders, String[] fields)
+      throws IOException {
+    String inputPath = tmpDir.copyResourceFileName("docs.txt");
+
+    PCollection<String> input = pipeline.readTextFile(inputPath);
+    PType[] types = new PType[orders.length];
+    Arrays.fill(types, typeFamily.strings());
+    PCollection<TupleN> kv = input.parallelDo(new DoFn<String, TupleN>() {
+      @Override
+      public void process(String input, Emitter<TupleN> emitter) {
+        String[] split = input.split("[\t]+");
+        emitter.emit(new TupleN(split));
+      }
+    }, typeFamily.tuples(types));
+    PCollection<TupleN> sorted = Sort.sortTuples(kv, orders);
+    Iterable<TupleN> lines = sorted.materialize();
+    TupleN l = lines.iterator().next();
+    int i = 0;
+    for (String field : fields) {
+      assertEquals(field, l.get(i++));
+    }
+    pipeline.done();
+  }
+
+  private void runTable(Pipeline pipeline, PTypeFamily typeFamily, String firstKey) throws IOException {
+    String inputPath = tmpDir.copyResourceFileName("docs.txt");
+
+    PCollection<String> input = pipeline.readTextFile(inputPath);
+    PTable<String, String> table = input.parallelDo(new DoFn<String, Pair<String, String>>() {
+      @Override
+      public void process(String input, Emitter<Pair<String, String>> emitter) {
+        String[] split = input.split("[\t]+");
+        emitter.emit(Pair.of(split[0], split[1]));
+      }
+    }, typeFamily.tableOf(typeFamily.strings(), typeFamily.strings()));
+
+    PTable<String, String> sorted = Sort.sort(table);
+    Iterable<Pair<String, String>> lines = sorted.materialize();
+    Pair<String, String> l = lines.iterator().next();
+    assertEquals(firstKey, l.first());
+    pipeline.done();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/it/java/org/apache/crunch/SparkTfidfIT.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/java/org/apache/crunch/SparkTfidfIT.java b/crunch-spark/src/it/java/org/apache/crunch/SparkTfidfIT.java
new file mode 100644
index 0000000..f13166b
--- /dev/null
+++ b/crunch-spark/src/it/java/org/apache/crunch/SparkTfidfIT.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.crunch.impl.spark.SparkPipeline;
+import org.apache.crunch.io.seq.SeqFileSourceTarget;
+import org.apache.crunch.lib.Aggregate;
+import org.apache.crunch.lib.Join;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.apache.hadoop.fs.Path;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+
+@SuppressWarnings("serial")
+public class SparkTfidfIT implements Serializable {
+  @Rule
+  public transient TemporaryPath tmpDir = new TemporaryPath();
+
+  // total number of documents, should calculate
+  protected static final double N = 2;
+
+  private transient Pipeline pipeline;
+
+  @Before
+  public void setUp() throws Exception {
+    pipeline = new SparkPipeline("local", "tfidf");
+
+  }
+  @Test
+  public void testWritablesSingleRun() throws IOException {
+    run(pipeline, WritableTypeFamily.getInstance(), true);
+  }
+
+  @Test
+  public void testWritablesMultiRun() throws IOException {
+    run(pipeline, WritableTypeFamily.getInstance(), false);
+  }
+
+  /**
+   * This method should generate a TF-IDF score for the input.
+   */
+  public PTable<String, Collection<Pair<String, Double>>> generateTFIDF(PCollection<String> docs, Path termFreqPath,
+                                                                        PTypeFamily ptf) throws IOException {
+
+    /*
+     * Input: String Input title text
+     *
+     * Output: PTable<Pair<String, String>, Long> Pair<Pair<word, title>, count
+     * in title>
+     */
+    PTable<Pair<String, String>, Long> tf = Aggregate.count(docs.parallelDo("term document frequency",
+        new DoFn<String, Pair<String, String>>() {
+          @Override
+          public void process(String doc, Emitter<Pair<String, String>> emitter) {
+            String[] kv = doc.split("\t");
+            String title = kv[0];
+            String text = kv[1];
+            for (String word : text.split("\\W+")) {
+              if (word.length() > 0) {
+                Pair<String, String> pair = Pair.of(word.toLowerCase(), title);
+                emitter.emit(pair);
+              }
+            }
+          }
+        }, ptf.pairs(ptf.strings(), ptf.strings())));
+
+    tf.write(new SeqFileSourceTarget<Pair<Pair<String, String>, Long>>(termFreqPath, tf.getPType()));
+
+    /*
+     * Input: Pair<Pair<String, String>, Long> Pair<Pair<word, title>, count in
+     * title>
+     *
+     * Output: PTable<String, Long> PTable<word, # of docs containing word>
+     */
+    PTable<String, Long> n = Aggregate.count(tf.parallelDo("little n (# of docs contain word)",
+        new DoFn<Pair<Pair<String, String>, Long>, String>() {
+          @Override
+          public void process(Pair<Pair<String, String>, Long> input, Emitter<String> emitter) {
+            emitter.emit(input.first().first());
+          }
+        }, ptf.strings()));
+
+    /*
+     * Input: Pair<Pair<String, String>, Long> Pair<Pair<word, title>, count in
+     * title>
+     *
+     * Output: PTable<String, Pair<String, Long>> PTable<word, Pair<title, count
+     * in title>>
+     */
+    PTable<String, Collection<Pair<String, Long>>> wordDocumentCountPair = tf.parallelDo(
+        "transform wordDocumentPairCount",
+        new DoFn<Pair<Pair<String, String>, Long>, Pair<String, Collection<Pair<String, Long>>>>() {
+          Collection<Pair<String, Long>> buffer;
+          String key;
+
+          @Override
+          public void process(Pair<Pair<String, String>, Long> input,
+                              Emitter<Pair<String, Collection<Pair<String, Long>>>> emitter) {
+            Pair<String, String> wordDocumentPair = input.first();
+            if (!wordDocumentPair.first().equals(key)) {
+              flush(emitter);
+              key = wordDocumentPair.first();
+              buffer = Lists.newArrayList();
+            }
+            buffer.add(Pair.of(wordDocumentPair.second(), input.second()));
+          }
+
+          protected void flush(Emitter<Pair<String, Collection<Pair<String, Long>>>> emitter) {
+            if (buffer != null) {
+              emitter.emit(Pair.of(key, buffer));
+              buffer = null;
+            }
+          }
+
+          @Override
+          public void cleanup(Emitter<Pair<String, Collection<Pair<String, Long>>>> emitter) {
+            flush(emitter);
+          }
+        }, ptf.tableOf(ptf.strings(), ptf.collections(ptf.pairs(ptf.strings(), ptf.longs()))));
+
+    PTable<String, Pair<Long, Collection<Pair<String, Long>>>> joinedResults = Join.join(n, wordDocumentCountPair);
+
+    /*
+     * Input: Pair<String, Pair<Long, Collection<Pair<String, Long>>> Pair<word,
+     * Pair<# of docs containing word, Collection<Pair<title, term frequency>>>
+     *
+     * Output: Pair<String, Collection<Pair<String, Double>>> Pair<word,
+     * Collection<Pair<title, tfidf>>>
+     */
+    return joinedResults
+        .mapValues(
+            new MapFn<Pair<Long, Collection<Pair<String, Long>>>, Collection<Pair<String, Double>>>() {
+              @Override
+              public Collection<Pair<String, Double>> map(
+                  Pair<Long, Collection<Pair<String, Long>>> input) {
+                Collection<Pair<String, Double>> tfidfs = Lists.newArrayList();
+                double n = input.first();
+                double idf = Math.log(N / n);
+                for (Pair<String, Long> tf : input.second()) {
+                  double tfidf = tf.second() * idf;
+                  tfidfs.add(Pair.of(tf.first(), tfidf));
+                }
+                return tfidfs;
+              }
+
+            }, ptf.collections(ptf.pairs(ptf.strings(), ptf.doubles())));
+  }
+
+  public void run(Pipeline pipeline, PTypeFamily typeFamily, boolean singleRun) throws IOException {
+    String inputFile = tmpDir.copyResourceFileName("docs.txt");
+    String outputPath1 = tmpDir.getFileName("output1");
+    String outputPath2 = tmpDir.getFileName("output2");
+
+    Path tfPath = tmpDir.getPath("termfreq");
+
+    PCollection<String> docs = pipeline.readTextFile(inputFile);
+
+    PTable<String, Collection<Pair<String, Double>>> results = generateTFIDF(docs, tfPath, typeFamily);
+    pipeline.writeTextFile(results, outputPath1);
+    if (!singleRun) {
+      pipeline.run();
+    }
+
+    PTable<String, Collection<Pair<String, Double>>> uppercased = results.mapKeys(
+        new MapFn<String, String>() {
+          @Override
+          public String map(String k1) {
+            return k1.toUpperCase();
+          }
+        }, results.getKeyType());
+    pipeline.writeTextFile(uppercased, outputPath2);
+    pipeline.done();
+
+    // Check the lowercase version...
+    File outputFile = new File(outputPath1, "part-r-00000");
+    List<String> lines = Files.readLines(outputFile, Charset.defaultCharset());
+    boolean passed = false;
+    for (String line : lines) {
+      if (line.startsWith("[the") && line.contains("B,0.6931471805599453")) {
+        passed = true;
+        break;
+      }
+    }
+    assertTrue(passed);
+
+    // ...and the uppercase version
+    outputFile = new File(outputPath2, "part-r-00000");
+    lines = Files.readLines(outputFile, Charset.defaultCharset());
+    passed = false;
+    for (String line : lines) {
+      if (line.startsWith("[THE") && line.contains("B,0.6931471805599453")) {
+        passed = true;
+        break;
+      }
+    }
+    assertTrue(passed);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/it/java/org/apache/crunch/SparkUnionResultsIT.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/java/org/apache/crunch/SparkUnionResultsIT.java b/crunch-spark/src/it/java/org/apache/crunch/SparkUnionResultsIT.java
new file mode 100644
index 0000000..db8509b
--- /dev/null
+++ b/crunch-spark/src/it/java/org/apache/crunch/SparkUnionResultsIT.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.crunch.impl.spark.SparkPipeline;
+import org.apache.crunch.io.At;
+import org.apache.crunch.io.To;
+import org.apache.crunch.test.CrunchTestSupport;
+import org.apache.crunch.types.writable.Writables;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+public class SparkUnionResultsIT extends CrunchTestSupport implements Serializable {
+
+  static class StringLengthMapFn extends MapFn<String, Pair<String, Long>> {
+    @Override
+    public Pair<String, Long> map(String input) {
+      return new Pair<String, Long>(input, 10L);
+    }
+  }
+
+
+  /**
+   * Tests combining a GBK output with a map-only job output into a single
+   * unioned collection.
+   */
+  @Test
+  public void testUnionOfGroupedOutputAndNonGroupedOutput() throws IOException {
+    String inputPath = tempDir.copyResourceFileName("set1.txt");
+    String inputPath2 = tempDir.copyResourceFileName("set2.txt");
+
+    Pipeline pipeline = new SparkPipeline("local", "unionresults");
+
+    PCollection<String> set1Lines = pipeline.read(At.textFile(inputPath, Writables.strings()));
+    PCollection<Pair<String, Long>> set1Lengths = set1Lines.parallelDo(new StringLengthMapFn(),
+        Writables.pairs(Writables.strings(), Writables.longs()));
+    PCollection<Pair<String, Long>> set2Counts = pipeline.read(At.textFile(inputPath2, Writables.strings())).count();
+
+    PCollection<Pair<String, Long>> union = set1Lengths.union(set2Counts);
+
+    Set<Pair<String, Long>> unionValues = Sets.newHashSet(union.materialize());
+    assertEquals(7, unionValues.size());
+
+    Set<Pair<String, Long>> expectedPairs = Sets.newHashSet();
+    expectedPairs.add(Pair.of("b", 10L));
+    expectedPairs.add(Pair.of("c", 10L));
+    expectedPairs.add(Pair.of("a", 10L));
+    expectedPairs.add(Pair.of("e", 10L));
+    expectedPairs.add(Pair.of("a", 1L));
+    expectedPairs.add(Pair.of("c", 1L));
+    expectedPairs.add(Pair.of("d", 1L));
+
+    assertEquals(expectedPairs, unionValues);
+
+    pipeline.done();
+  }
+
+  @Test
+  public void testMultiWrite() throws Exception {
+    String inputPath = tempDir.copyResourceFileName("set1.txt");
+    String inputPath2 = tempDir.copyResourceFileName("set2.txt");
+    String output = tempDir.getFileName("output");
+
+    Pipeline pipeline = new SparkPipeline("local", "multiwrite");
+
+    PCollection<String> set1Lines = pipeline.read(At.textFile(inputPath, Writables.strings()));
+    PTable<String, Long> set1Lengths = set1Lines.parallelDo(new StringLengthMapFn(),
+        Writables.tableOf(Writables.strings(), Writables.longs()));
+    PTable<String, Long> set2Counts = pipeline.read(At.textFile(inputPath2, Writables.strings())).count();
+
+    TableSourceTarget<String, Long> inter = At.sequenceFile(output, Writables.strings(), Writables.longs());
+    set1Lengths.write(inter);
+    set2Counts.write(inter, Target.WriteMode.APPEND);
+
+    pipeline.run();
+
+    PTable<String, Long> in = pipeline.read(inter);
+    Set<Pair<String, Long>> values = Sets.newHashSet(in.materialize());
+    assertEquals(7, values.size());
+
+    Set<Pair<String, Long>> expectedPairs = Sets.newHashSet();
+    expectedPairs.add(Pair.of("b", 10L));
+    expectedPairs.add(Pair.of("c", 10L));
+    expectedPairs.add(Pair.of("a", 10L));
+    expectedPairs.add(Pair.of("e", 10L));
+    expectedPairs.add(Pair.of("a", 1L));
+    expectedPairs.add(Pair.of("c", 1L));
+    expectedPairs.add(Pair.of("d", 1L));
+
+    assertEquals(expectedPairs, values);
+
+    pipeline.done();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/it/java/org/apache/crunch/test/StringWrapper.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/java/org/apache/crunch/test/StringWrapper.java b/crunch-spark/src/it/java/org/apache/crunch/test/StringWrapper.java
new file mode 100644
index 0000000..34302b5
--- /dev/null
+++ b/crunch-spark/src/it/java/org/apache/crunch/test/StringWrapper.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.test;
+
+import org.apache.crunch.MapFn;
+
+/**
+ * Simple String wrapper for testing with Avro reflection.
+ */
+public class StringWrapper implements Comparable<StringWrapper> {
+
+  public static class StringToStringWrapperMapFn extends MapFn<String, StringWrapper> {
+
+    @Override
+    public StringWrapper map(String input) {
+      return wrap(input);
+    }
+
+  }
+
+  public static class StringWrapperToStringMapFn extends MapFn<StringWrapper, String> {
+
+    @Override
+    public String map(StringWrapper input) {
+      return input.getValue();
+    }
+
+  }
+
+  private String value;
+
+  public StringWrapper() {
+    this("");
+  }
+
+  public StringWrapper(String value) {
+    this.value = value;
+  }
+
+  @Override
+  public int compareTo(StringWrapper o) {
+    return this.value.compareTo(o.value);
+  }
+
+  public String getValue() {
+    return value;
+  }
+
+  public void setValue(String value) {
+    this.value = value;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((value == null) ? 0 : value.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    StringWrapper other = (StringWrapper) obj;
+    if (value == null) {
+      if (other.value != null)
+        return false;
+    } else if (!value.equals(other.value))
+      return false;
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    return "StringWrapper [value=" + value + "]";
+  }
+
+  public static StringWrapper wrap(String value) {
+    return new StringWrapper(value);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/it/java/org/apache/crunch/test/Tests.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/java/org/apache/crunch/test/Tests.java b/crunch-spark/src/it/java/org/apache/crunch/test/Tests.java
new file mode 100644
index 0000000..dc4985e
--- /dev/null
+++ b/crunch-spark/src/it/java/org/apache/crunch/test/Tests.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.test;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class Tests {
+  /**
+   * This doesn't check whether the resource exists!
+   *
+   * @param testCase
+   * @param resourceName
+   * @return The path to the resource (never null)
+   */
+  public static String resource(Object testCase, String resourceName) {
+    checkNotNull(testCase);
+    checkNotNull(resourceName);
+
+    // Note: We append "Data" because otherwise Eclipse would complain about the
+    //       the case's class name clashing with the resource directory's name.
+    return testCase.getClass().getName().replaceAll("\\.", "/") + "Data/" + resourceName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/it/resources/customers.txt
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/resources/customers.txt b/crunch-spark/src/it/resources/customers.txt
new file mode 100644
index 0000000..98f3f3d
--- /dev/null
+++ b/crunch-spark/src/it/resources/customers.txt
@@ -0,0 +1,4 @@
+111|John Doe
+222|Jane Doe
+333|Someone Else
+444|Has No Orders
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/it/resources/docs.txt
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/resources/docs.txt b/crunch-spark/src/it/resources/docs.txt
new file mode 100644
index 0000000..90a3f65
--- /dev/null
+++ b/crunch-spark/src/it/resources/docs.txt
@@ -0,0 +1,6 @@
+A	this doc has this text
+A	and this text as well
+A	but also this
+B	this doc has some text
+B	but not as much as the last
+B	doc

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/it/resources/emptyTextFile.txt
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/resources/emptyTextFile.txt b/crunch-spark/src/it/resources/emptyTextFile.txt
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/it/resources/letters.txt
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/resources/letters.txt b/crunch-spark/src/it/resources/letters.txt
new file mode 100644
index 0000000..916bfc9
--- /dev/null
+++ b/crunch-spark/src/it/resources/letters.txt
@@ -0,0 +1,2 @@
+a
+bb
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/it/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/resources/log4j.properties b/crunch-spark/src/it/resources/log4j.properties
new file mode 100644
index 0000000..8769f6c
--- /dev/null
+++ b/crunch-spark/src/it/resources/log4j.properties
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# ***** Set root logger level to INFO and its only appender to A.
+log4j.logger.org.apache.crunch=info, A
+
+# Log warnings on Hadoop/Spark for the local runner when testing
+log4j.logger.org.apache.hadoop=warn, A
+log4j.logger.org.apache.spark=info, A
+# Except for Configuration, which is chatty.
+log4j.logger.org.apache.hadoop.conf.Configuration=error, A
+
+# ***** A is set to be a ConsoleAppender.
+log4j.appender.A=org.apache.log4j.ConsoleAppender
+# ***** A uses PatternLayout.
+log4j.appender.A.layout=org.apache.log4j.PatternLayout
+log4j.appender.A.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n