You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/12/11 21:47:52 UTC
[6/9] git commit: Crunch on Spark
Crunch on Spark
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/6e623413
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/6e623413
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/6e623413
Branch: refs/heads/master
Commit: 6e6234138252f9bfa9f08758ea1443fe3f648322
Parents: a691b83
Author: Josh Wills <jw...@apache.org>
Authored: Sun Nov 17 09:14:00 2013 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Wed Dec 11 12:28:10 2013 -0800
----------------------------------------------------------------------
crunch-spark/pom.xml | 93 +
.../java/org/apache/crunch/SparkCogroupIT.java | 189 +
.../org/apache/crunch/SparkMapsideJoinIT.java | 188 +
.../java/org/apache/crunch/SparkPageRankIT.java | 153 +
.../org/apache/crunch/SparkSecondarySortIT.java | 61 +
.../it/java/org/apache/crunch/SparkSortIT.java | 315 +
.../it/java/org/apache/crunch/SparkTfidfIT.java | 228 +
.../org/apache/crunch/SparkUnionResultsIT.java | 117 +
.../org/apache/crunch/test/StringWrapper.java | 102 +
.../it/java/org/apache/crunch/test/Tests.java | 38 +
crunch-spark/src/it/resources/customers.txt | 4 +
crunch-spark/src/it/resources/docs.txt | 6 +
crunch-spark/src/it/resources/emptyTextFile.txt | 0
crunch-spark/src/it/resources/letters.txt | 2 +
crunch-spark/src/it/resources/log4j.properties | 30 +
crunch-spark/src/it/resources/maugham.txt | 29112 +++++++++++++++++
crunch-spark/src/it/resources/orders.txt | 5 +
.../apache/crunch/SparkCogroupITData/src1.txt | 4 +
.../apache/crunch/SparkCogroupITData/src2.txt | 4 +
.../src/it/resources/secondary_sort_input.txt | 7 +
crunch-spark/src/it/resources/set1.txt | 4 +
crunch-spark/src/it/resources/set2.txt | 3 +
crunch-spark/src/it/resources/shakes.txt | 3667 +++
crunch-spark/src/it/resources/sort_by_value.txt | 5 +
crunch-spark/src/it/resources/urls.txt | 11 +
.../org/apache/crunch/impl/spark/ByteArray.java | 50 +
.../impl/spark/CounterAccumulatorParam.java | 45 +
.../apache/crunch/impl/spark/GuavaUtils.java | 44 +
.../apache/crunch/impl/spark/IntByteArray.java | 43 +
.../crunch/impl/spark/SparkCollection.java | 24 +
.../crunch/impl/spark/SparkComparator.java | 86 +
.../crunch/impl/spark/SparkPartitioner.java | 37 +
.../apache/crunch/impl/spark/SparkPipeline.java | 118 +
.../apache/crunch/impl/spark/SparkRuntime.java | 326 +
.../crunch/impl/spark/SparkRuntimeContext.java | 202 +
.../crunch/impl/spark/collect/DoCollection.java | 65 +
.../crunch/impl/spark/collect/DoTable.java | 78 +
.../impl/spark/collect/InputCollection.java | 61 +
.../crunch/impl/spark/collect/InputTable.java | 51 +
.../impl/spark/collect/PGroupedTableImpl.java | 122 +
.../impl/spark/collect/SparkCollectFactory.java | 99 +
.../impl/spark/collect/ToByteArrayFunction.java | 33 +
.../impl/spark/collect/UnionCollection.java | 58 +
.../crunch/impl/spark/collect/UnionTable.java | 59 +
.../impl/spark/fn/CombineMapsideFunction.java | 90 +
.../crunch/impl/spark/fn/CrunchIterable.java | 38 +
.../crunch/impl/spark/fn/FlatMapDoFn.java | 41 +
.../crunch/impl/spark/fn/FlatMapPairDoFn.java | 45 +
.../impl/spark/fn/InputConverterFunction.java | 35 +
.../crunch/impl/spark/fn/MapFunction.java | 42 +
.../crunch/impl/spark/fn/MapOutputFunction.java | 42 +
.../impl/spark/fn/OutputConverterFunction.java | 35 +
.../crunch/impl/spark/fn/PairFlatMapDoFn.java | 46 +
.../impl/spark/fn/PairFlatMapPairDoFn.java | 49 +
.../crunch/impl/spark/fn/PairMapFunction.java | 44 +
.../impl/spark/fn/PairMapIterableFunction.java | 51 +
.../spark/fn/PartitionedMapOutputFunction.java | 83 +
.../impl/spark/fn/ReduceGroupingFunction.java | 121 +
.../impl/spark/fn/ReduceInputFunction.java | 44 +
.../crunch/impl/spark/serde/AvroSerDe.java | 109 +
.../apache/crunch/impl/spark/serde/SerDe.java | 30 +
.../crunch/impl/spark/serde/WritableSerDe.java | 70 +
pom.xml | 22 +-
63 files changed, 36982 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-spark/pom.xml b/crunch-spark/pom.xml
new file mode 100644
index 0000000..57f2be8
--- /dev/null
+++ b/crunch-spark/pom.xml
@@ -0,0 +1,93 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.crunch</groupId>
+ <artifactId>crunch-parent</artifactId>
+ <version>0.9.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>crunch-spark</artifactId>
+ <name>Apache Crunch for Spark</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.crunch</groupId>
+ <artifactId>crunch-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-server</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.version}</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.crunch</groupId>
+ <artifactId>crunch-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/it/java/org/apache/crunch/SparkCogroupIT.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/java/org/apache/crunch/SparkCogroupIT.java b/crunch-spark/src/it/java/org/apache/crunch/SparkCogroupIT.java
new file mode 100644
index 0000000..66d30d2
--- /dev/null
+++ b/crunch-spark/src/it/java/org/apache/crunch/SparkCogroupIT.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import org.apache.crunch.impl.spark.SparkPipeline;
+import org.apache.crunch.lib.Cogroup;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.Tests;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+public class SparkCogroupIT {
+ @Rule
+ public TemporaryPath tmpDir = new TemporaryPath();
+
+ private SparkPipeline pipeline;
+ private PCollection<String> lines1;
+ private PCollection<String> lines2;
+ private PCollection<String> lines3;
+ private PCollection<String> lines4;
+
+ @Before
+ public void setUp() throws IOException {
+ pipeline = new SparkPipeline("local", "wordcount");
+ lines1 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src1.txt")));
+ lines2 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src2.txt")));
+ lines3 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src1.txt")));
+ lines4 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src2.txt")));
+ }
+
+ @After
+ public void tearDown() {
+ pipeline.done();
+ }
+
+ @Test
+ public void testCogroupWritables() {
+ runCogroup(WritableTypeFamily.getInstance());
+ }
+
+ @Test
+ public void testCogroupAvro() {
+ runCogroup(AvroTypeFamily.getInstance());
+ }
+
+ @Test
+ public void testCogroup3Writables() {
+ runCogroup3(WritableTypeFamily.getInstance());
+ }
+
+ @Test
+ public void testCogroup3Avro() {
+ runCogroup3(AvroTypeFamily.getInstance());
+ }
+
+ @Test
+ public void testCogroup4Writables() {
+ runCogroup3(WritableTypeFamily.getInstance());
+ }
+
+ @Test
+ public void testCogroup4Avro() {
+ runCogroup3(AvroTypeFamily.getInstance());
+ }
+
+ public void runCogroup(PTypeFamily ptf) {
+ PTableType<String, String> tt = ptf.tableOf(ptf.strings(), ptf.strings());
+
+ PTable<String, String> kv1 = lines1.parallelDo("kv1", new KeyValueSplit(), tt);
+ PTable<String, String> kv2 = lines2.parallelDo("kv2", new KeyValueSplit(), tt);
+
+ PTable<String, Pair<Collection<String>, Collection<String>>> cg = Cogroup.cogroup(kv1, kv2);
+
+ Map<String, Pair<Collection<String>, Collection<String>>> result = cg.materializeToMap();
+ Map<String, Pair<Collection<String>, Collection<String>>> actual = Maps.newHashMap();
+ for (Map.Entry<String, Pair<Collection<String>, Collection<String>>> e : result.entrySet()) {
+ Collection<String> one = ImmutableSet.copyOf(e.getValue().first());
+ Collection<String> two = ImmutableSet.copyOf(e.getValue().second());
+ actual.put(e.getKey(), Pair.of(one, two));
+ }
+ Map<String, Pair<Collection<String>, Collection<String>>> expected = ImmutableMap.of(
+ "a", Pair.of(coll("1-1", "1-4"), coll()),
+ "b", Pair.of(coll("1-2"), coll("2-1")),
+ "c", Pair.of(coll("1-3"), coll("2-2", "2-3")),
+ "d", Pair.of(coll(), coll("2-4"))
+ );
+
+ assertThat(actual, is(expected));
+ }
+
+ public void runCogroup3(PTypeFamily ptf) {
+ PTableType<String, String> tt = ptf.tableOf(ptf.strings(), ptf.strings());
+
+ PTable<String, String> kv1 = lines1.parallelDo("kv1", new KeyValueSplit(), tt);
+ PTable<String, String> kv2 = lines2.parallelDo("kv2", new KeyValueSplit(), tt);
+ PTable<String, String> kv3 = lines3.parallelDo("kv3", new KeyValueSplit(), tt);
+
+ PTable<String, Tuple3.Collect<String, String, String>> cg = Cogroup.cogroup(kv1, kv2, kv3);
+
+ Map<String, Tuple3.Collect<String, String, String>> result = cg.materializeToMap();
+ Map<String, Tuple3.Collect<String, String, String>> actual = Maps.newHashMap();
+ for (Map.Entry<String, Tuple3.Collect<String, String, String>> e : result.entrySet()) {
+ Collection<String> one = ImmutableSet.copyOf(e.getValue().first());
+ Collection<String> two = ImmutableSet.copyOf(e.getValue().second());
+ Collection<String> three = ImmutableSet.copyOf(e.getValue().third());
+ actual.put(e.getKey(), new Tuple3.Collect<String, String, String>(one, two, three));
+ }
+ Map<String, Tuple3.Collect<String, String, String>> expected = ImmutableMap.of(
+ "a", new Tuple3.Collect<String, String, String>(coll("1-1", "1-4"), coll(), coll("1-1", "1-4")),
+ "b", new Tuple3.Collect<String, String, String>(coll("1-2"), coll("2-1"), coll("1-2")),
+ "c", new Tuple3.Collect<String, String, String>(coll("1-3"), coll("2-2", "2-3"), coll("1-3")),
+ "d", new Tuple3.Collect<String, String, String>(coll(), coll("2-4"), coll())
+ );
+
+ assertThat(actual, is(expected));
+ }
+
+ public void runCogroup4(PTypeFamily ptf) {
+ PTableType<String, String> tt = ptf.tableOf(ptf.strings(), ptf.strings());
+
+ PTable<String, String> kv1 = lines1.parallelDo("kv1", new KeyValueSplit(), tt);
+ PTable<String, String> kv2 = lines2.parallelDo("kv2", new KeyValueSplit(), tt);
+ PTable<String, String> kv3 = lines3.parallelDo("kv3", new KeyValueSplit(), tt);
+ PTable<String, String> kv4 = lines4.parallelDo("kv4", new KeyValueSplit(), tt);
+
+ PTable<String, Tuple4.Collect<String, String, String, String>> cg = Cogroup.cogroup(kv1, kv2, kv3, kv4);
+
+ Map<String, Tuple4.Collect<String, String, String, String>> result = cg.materializeToMap();
+ Map<String, Tuple4.Collect<String, String, String, String>> actual = Maps.newHashMap();
+ for (Map.Entry<String, Tuple4.Collect<String, String, String, String>> e : result.entrySet()) {
+ Collection<String> one = ImmutableSet.copyOf(e.getValue().first());
+ Collection<String> two = ImmutableSet.copyOf(e.getValue().second());
+ Collection<String> three = ImmutableSet.copyOf(e.getValue().third());
+ Collection<String> four = ImmutableSet.copyOf(e.getValue().fourth());
+ actual.put(e.getKey(), new Tuple4.Collect<String, String, String, String>(one, two, three, four));
+ }
+ Map<String, Tuple4.Collect<String, String, String, String>> expected = ImmutableMap.of(
+ "a", new Tuple4.Collect<String, String, String, String>(coll("1-1", "1-4"), coll(), coll("1-1", "1-4"), coll()),
+ "b", new Tuple4.Collect<String, String, String, String>(coll("1-2"), coll("2-1"), coll("1-2"), coll("2-1")),
+ "c", new Tuple4.Collect<String, String, String, String>(coll("1-3"), coll("2-2", "2-3"), coll("1-3"), coll("2-2", "2-3")),
+ "d", new Tuple4.Collect<String, String, String, String>(coll(), coll("2-4"), coll(), coll("2-4"))
+ );
+
+ assertThat(actual, is(expected));
+ }
+
+ private static class KeyValueSplit extends DoFn<String, Pair<String, String>> {
+ @Override
+ public void process(String input, Emitter<Pair<String, String>> emitter) {
+ String[] fields = input.split(",");
+ emitter.emit(Pair.of(fields[0], fields[1]));
+ }
+ }
+
+ private static Collection<String> coll(String... values) {
+ return ImmutableSet.copyOf(values);
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/it/java/org/apache/crunch/SparkMapsideJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/java/org/apache/crunch/SparkMapsideJoinIT.java b/crunch-spark/src/it/java/org/apache/crunch/SparkMapsideJoinIT.java
new file mode 100644
index 0000000..b58942f
--- /dev/null
+++ b/crunch-spark/src/it/java/org/apache/crunch/SparkMapsideJoinIT.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import com.google.common.collect.Lists;
+import org.apache.crunch.fn.FilterFns;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.impl.spark.SparkPipeline;
+import org.apache.crunch.lib.join.JoinStrategy;
+import org.apache.crunch.lib.join.JoinType;
+import org.apache.crunch.lib.join.MapsideJoinStrategy;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.types.writable.Writables;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class SparkMapsideJoinIT {
+ private static String saveTempDir;
+
+ @BeforeClass
+ public static void setUpClass(){
+
+ // Ensure a consistent temporary directory for use of the DistributedCache.
+
+ // The DistributedCache technically isn't supported when running in local mode, and the default
+ // temporary directiory "/tmp" is used as its location. This typically only causes an issue when
+ // running integration tests on Mac OS X, as OS X doesn't use "/tmp" as it's default temporary
+ // directory. The following call ensures that "/tmp" is used as the temporary directory on all platforms.
+ saveTempDir = System.setProperty("java.io.tmpdir", "/tmp");
+ }
+
+ @AfterClass
+ public static void tearDownClass(){
+ System.setProperty("java.io.tmpdir", saveTempDir);
+ }
+
+ private static class LineSplitter extends MapFn<String, Pair<Integer, String>> {
+ @Override
+ public Pair<Integer, String> map(String input) {
+ String[] fields = input.split("\\|");
+ return Pair.of(Integer.parseInt(fields[0]), fields[1]);
+ }
+ }
+
+ private static class CapOrdersFn extends MapFn<String, String> {
+ @Override
+ public String map(String v) {
+ return v.toUpperCase(Locale.ENGLISH);
+ }
+ }
+
+ private static class ConcatValuesFn extends MapFn<Pair<String, String>, String> {
+ @Override
+ public String map(Pair<String, String> v) {
+ return v.toString();
+ }
+ }
+
+ @Rule
+ public TemporaryPath tmpDir = new TemporaryPath();
+
+ @Test
+ public void testMapsideJoin_RightSideIsEmpty() throws IOException {
+ Pipeline pipeline = new SparkPipeline("local", "mapside");
+ PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");
+ PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
+
+ PTable<Integer, String> filteredOrderTable = orderTable
+ .parallelDo(FilterFns.<Pair<Integer, String>>REJECT_ALL(), orderTable.getPTableType());
+
+
+ JoinStrategy<Integer, String, String> mapsideJoin = new MapsideJoinStrategy<Integer, String, String>();
+ PTable<Integer, Pair<String, String>> joined = mapsideJoin.join(customerTable, filteredOrderTable, JoinType.INNER_JOIN);
+
+ List<Pair<Integer, Pair<String, String>>> materializedJoin = Lists.newArrayList(joined.materialize());
+
+ assertTrue(materializedJoin.isEmpty());
+ pipeline.done();
+ }
+
+ @Test
+ public void testMapsideJoin() throws IOException {
+ runMapsideJoin(new SparkPipeline("local", "mapside"), false);
+ }
+
+ @Test
+ public void testMapsideJoin_Materialized() throws IOException {
+ runMapsideJoin(new SparkPipeline("local", "mapside"), true);
+ }
+
+ @Test
+ public void testMapsideJoin_LeftOuterJoin() throws IOException {
+ runMapsideLeftOuterJoin(new SparkPipeline("local", "mapside"), false);
+ }
+
+ @Test
+ public void testMapsideJoin_LeftOuterJoin_Materialized() throws IOException {
+ runMapsideLeftOuterJoin(new SparkPipeline("local", "mapside"), true);
+ }
+
+ private void runMapsideJoin(Pipeline pipeline, boolean materialize) {
+ PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");
+ PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
+
+ JoinStrategy<Integer, String, String> mapsideJoin = new MapsideJoinStrategy<Integer, String, String>(materialize);
+ PTable<Integer, String> custOrders = mapsideJoin.join(customerTable, orderTable, JoinType.INNER_JOIN)
+ .mapValues("concat", new ConcatValuesFn(), Writables.strings());
+
+ PTable<Integer, String> ORDER_TABLE = orderTable.mapValues(new CapOrdersFn(), orderTable.getValueType());
+ PTable<Integer, Pair<String, String>> joined = mapsideJoin.join(custOrders, ORDER_TABLE, JoinType.INNER_JOIN);
+
+ List<Pair<Integer, Pair<String, String>>> expectedJoinResult = Lists.newArrayList();
+ expectedJoinResult.add(Pair.of(111, Pair.of("[John Doe,Corn flakes]", "CORN FLAKES")));
+ expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet paper]", "TOILET PAPER")));
+ expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet paper]", "TOILET PLUNGER")));
+ expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet plunger]", "TOILET PAPER")));
+ expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet plunger]", "TOILET PLUNGER")));
+ expectedJoinResult.add(Pair.of(333, Pair.of("[Someone Else,Toilet brush]", "TOILET BRUSH")));
+ Iterable<Pair<Integer, Pair<String, String>>> iter = joined.materialize();
+
+ List<Pair<Integer, Pair<String, String>>> joinedResultList = Lists.newArrayList(iter);
+ Collections.sort(joinedResultList);
+ assertEquals(expectedJoinResult, joinedResultList);
+ pipeline.done();
+ }
+
+ private void runMapsideLeftOuterJoin(Pipeline pipeline, boolean materialize) {
+ PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");
+ PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
+
+ JoinStrategy<Integer, String, String> mapsideJoin = new MapsideJoinStrategy<Integer, String, String>(materialize);
+ PTable<Integer, String> custOrders = mapsideJoin.join(customerTable, orderTable, JoinType.LEFT_OUTER_JOIN)
+ .mapValues("concat", new ConcatValuesFn(), Writables.strings());
+
+ PTable<Integer, String> ORDER_TABLE = orderTable.mapValues(new CapOrdersFn(), orderTable.getValueType());
+ PTable<Integer, Pair<String, String>> joined = mapsideJoin.join(custOrders, ORDER_TABLE, JoinType.LEFT_OUTER_JOIN);
+
+ List<Pair<Integer, Pair<String, String>>> expectedJoinResult = Lists.newArrayList();
+ expectedJoinResult.add(Pair.of(111, Pair.of("[John Doe,Corn flakes]", "CORN FLAKES")));
+ expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet paper]", "TOILET PAPER")));
+ expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet paper]", "TOILET PLUNGER")));
+ expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet plunger]", "TOILET PAPER")));
+ expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet plunger]", "TOILET PLUNGER")));
+ expectedJoinResult.add(Pair.of(333, Pair.of("[Someone Else,Toilet brush]", "TOILET BRUSH")));
+ expectedJoinResult.add(Pair.of(444, Pair.<String,String>of("[Has No Orders,null]", null)));
+ Iterable<Pair<Integer, Pair<String, String>>> iter = joined.materialize();
+
+ List<Pair<Integer, Pair<String, String>>> joinedResultList = Lists.newArrayList(iter);
+ Collections.sort(joinedResultList);
+ assertEquals(expectedJoinResult, joinedResultList);
+ pipeline.done();
+ }
+
+ private PTable<Integer, String> readTable(Pipeline pipeline, String filename) {
+ try {
+ return pipeline.readTextFile(tmpDir.copyResourceFileName(filename)).parallelDo("asTable",
+ new LineSplitter(),
+ Writables.tableOf(Writables.ints(), Writables.strings()));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/it/java/org/apache/crunch/SparkPageRankIT.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/java/org/apache/crunch/SparkPageRankIT.java b/crunch-spark/src/it/java/org/apache/crunch/SparkPageRankIT.java
new file mode 100644
index 0000000..c76c62a
--- /dev/null
+++ b/crunch-spark/src/it/java/org/apache/crunch/SparkPageRankIT.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.apache.crunch.impl.spark.SparkPipeline;
+import org.apache.crunch.lib.Aggregate;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.PTypes;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class SparkPageRankIT {
+
+ public static class PageRankData {
+ public float score;
+ public float lastScore;
+ public List<String> urls;
+
+ public PageRankData() {
+ }
+
+ public PageRankData(float score, float lastScore, Iterable<String> urls) {
+ this.score = score;
+ this.lastScore = lastScore;
+ this.urls = Lists.newArrayList(urls);
+ }
+
+ public PageRankData next(float newScore) {
+ return new PageRankData(newScore, score, urls);
+ }
+
+ public float propagatedScore() {
+ return score / urls.size();
+ }
+
+ @Override
+ public String toString() {
+ return score + " " + lastScore + " " + urls;
+ }
+ }
+
+ @Rule
+ public TemporaryPath tmpDir = new TemporaryPath();
+ private Pipeline pipeline;
+
+ @Before
+ public void setUp() throws Exception {
+ pipeline = new SparkPipeline("local", "pagerank");
+ }
+
+ @Test
+ public void testAvroJSON() throws Exception {
+ PTypeFamily tf = AvroTypeFamily.getInstance();
+ PType<PageRankData> prType = PTypes.jsonString(PageRankData.class, tf);
+ String urlInput = tmpDir.copyResourceFileName("urls.txt");
+ run(pipeline, urlInput, prType, tf);
+ pipeline.done();
+ }
+
+ @Test
+ public void testWritablesJSON() throws Exception {
+ PTypeFamily tf = WritableTypeFamily.getInstance();
+ PType<PageRankData> prType = PTypes.jsonString(PageRankData.class, tf);
+ String urlInput = tmpDir.copyResourceFileName("urls.txt");
+ run(pipeline, urlInput, prType, tf);
+ pipeline.done();
+ }
+
+ public static PTable<String, PageRankData> pageRank(PTable<String, PageRankData> input, final float d) {
+ PTypeFamily ptf = input.getTypeFamily();
+ PTable<String, Float> outbound = input.parallelDo(new DoFn<Pair<String, PageRankData>, Pair<String, Float>>() {
+ @Override
+ public void process(Pair<String, PageRankData> input, Emitter<Pair<String, Float>> emitter) {
+ PageRankData prd = input.second();
+ for (String link : prd.urls) {
+ emitter.emit(Pair.of(link, prd.propagatedScore()));
+ }
+ }
+ }, ptf.tableOf(ptf.strings(), ptf.floats()));
+
+ return input.cogroup(outbound).mapValues(
+ new MapFn<Pair<Collection<PageRankData>, Collection<Float>>, PageRankData>() {
+ @Override
+ public PageRankData map(Pair<Collection<PageRankData>, Collection<Float>> input) {
+ PageRankData prd = Iterables.getOnlyElement(input.first());
+ Collection<Float> propagatedScores = input.second();
+ float sum = 0.0f;
+ for (Float s : propagatedScores) {
+ sum += s;
+ }
+ return prd.next(d + (1.0f - d) * sum);
+ }
+ }, input.getValueType());
+ }
+
+ public static void run(Pipeline pipeline, String urlInput,
+ PType<PageRankData> prType, PTypeFamily ptf) throws Exception {
+ PTable<String, PageRankData> scores = pipeline.readTextFile(urlInput)
+ .parallelDo(new MapFn<String, Pair<String, String>>() {
+ @Override
+ public Pair<String, String> map(String input) {
+ String[] urls = input.split("\\t");
+ return Pair.of(urls[0], urls[1]);
+ }
+ }, ptf.tableOf(ptf.strings(), ptf.strings())).groupByKey()
+ .mapValues(new MapFn<Iterable<String>, PageRankData>() {
+ @Override
+ public PageRankData map(Iterable<String> input) {
+ return new PageRankData(1.0f, 0.0f, input);
+ }
+ }, prType);
+
+ Float delta = 1.0f;
+ while (delta > 0.01) {
+ scores = pageRank(scores, 0.5f).cache();
+ delta = Aggregate.max(scores.parallelDo(new MapFn<Pair<String, PageRankData>, Float>() {
+ @Override
+ public Float map(Pair<String, PageRankData> input) {
+ PageRankData prd = input.second();
+ return Math.abs(prd.score - prd.lastScore);
+ }
+ }, ptf.floats())).getValue();
+ }
+ assertEquals(0.0048, delta, 0.001);
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/it/java/org/apache/crunch/SparkSecondarySortIT.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/java/org/apache/crunch/SparkSecondarySortIT.java b/crunch-spark/src/it/java/org/apache/crunch/SparkSecondarySortIT.java
new file mode 100644
index 0000000..f9e3265
--- /dev/null
+++ b/crunch-spark/src/it/java/org/apache/crunch/SparkSecondarySortIT.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import org.apache.crunch.impl.spark.SparkPipeline;
+import org.apache.crunch.io.From;
+import org.apache.crunch.lib.SecondarySort;
+import org.apache.crunch.test.CrunchTestSupport;
+import org.junit.Test;
+
+import java.io.Serializable;
+
+import static org.apache.crunch.types.avro.Avros.*;
+import static org.apache.crunch.types.avro.Avros.ints;
+import static org.apache.crunch.types.avro.Avros.strings;
+import static org.junit.Assert.assertEquals;
+
+public class SparkSecondarySortIT extends CrunchTestSupport implements Serializable {
+ @Test
+ public void testSecondarySort() throws Exception {
+ Pipeline p = new SparkPipeline("local", "secondarysort");
+ String inputFile = tempDir.copyResourceFileName("secondary_sort_input.txt");
+
+ PTable<String, Pair<Integer, Integer>> in = p.read(From.textFile(inputFile))
+ .parallelDo(new MapFn<String, Pair<String, Pair<Integer, Integer>>>() {
+ @Override
+ public Pair<String, Pair<Integer, Integer>> map(String input) {
+ String[] pieces = input.split(",");
+ return Pair.of(pieces[0],
+ Pair.of(Integer.valueOf(pieces[1].trim()), Integer.valueOf(pieces[2].trim())));
+ }
+ }, tableOf(strings(), pairs(ints(), ints())));
+ Iterable<String> lines = SecondarySort.sortAndApply(in, new MapFn<Pair<String, Iterable<Pair<Integer, Integer>>>, String>() {
+ @Override
+ public String map(Pair<String, Iterable<Pair<Integer, Integer>>> input) {
+ Joiner j = Joiner.on(',');
+ return j.join(input.first(), j.join(input.second()));
+ }
+ }, strings()).materialize();
+ assertEquals(ImmutableList.of("one,[-5,10],[1,1],[2,-3]", "three,[0,-1]", "two,[1,7],[2,6],[4,5]"),
+ ImmutableList.copyOf(lines));
+ p.done();
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/it/java/org/apache/crunch/SparkSortIT.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/java/org/apache/crunch/SparkSortIT.java b/crunch-spark/src/it/java/org/apache/crunch/SparkSortIT.java
new file mode 100644
index 0000000..994aae9
--- /dev/null
+++ b/crunch-spark/src/it/java/org/apache/crunch/SparkSortIT.java
@@ -0,0 +1,315 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import com.google.common.collect.Lists;
+import org.apache.crunch.impl.spark.SparkPipeline;
+import org.apache.crunch.lib.Sort;
+import org.apache.crunch.test.StringWrapper;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.crunch.lib.Sort.ColumnOrder.by;
+import static org.apache.crunch.lib.Sort.Order.ASCENDING;
+import static org.apache.crunch.lib.Sort.Order.DESCENDING;
+import static org.apache.crunch.test.StringWrapper.wrap;
+import static org.junit.Assert.assertEquals;
+
+public class SparkSortIT implements Serializable {
+ @Rule
+ public transient TemporaryPath tmpDir = new TemporaryPath();
+
+ @Test
+ public void testWritableSortAsc() throws Exception {
+ runSingle(new SparkPipeline("local", "sort"), WritableTypeFamily.getInstance(), Sort.Order.ASCENDING,
+ "A\tand this text as well");
+ }
+
+ @Test
+ public void testWritableSortDesc() throws Exception {
+ runSingle(new SparkPipeline("local", "sort"), WritableTypeFamily.getInstance(), Sort.Order.DESCENDING,
+ "B\tthis doc has some text");
+ }
+
+ @Test
+ public void testWritableSortAscDesc() throws Exception {
+ runPair(new SparkPipeline("local", "sort"), WritableTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING), "A",
+ "this doc has this text");
+ }
+
+ @Test
+ public void testWritableSortSecondDescFirstAsc() throws Exception {
+ runPair(new SparkPipeline("local", "sort"), WritableTypeFamily.getInstance(), by(2, DESCENDING), by(1, ASCENDING), "A",
+ "this doc has this text");
+ }
+
+ @Test
+ public void testWritableSortTripleAscDescAsc() throws Exception {
+ runTriple(new SparkPipeline("local", "sort"), WritableTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING),
+ by(3, ASCENDING), "A", "this", "doc");
+ }
+
+ @Test
+ public void testWritableSortQuadAscDescAscDesc() throws Exception {
+ runQuad(new SparkPipeline("local", "sort"), WritableTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING),
+ by(3, ASCENDING), by(4, DESCENDING), "A", "this", "doc", "has");
+ }
+
+ @Test
+ public void testWritableSortTupleNAscDesc() throws Exception {
+ runTupleN(new SparkPipeline("local", "sort"), WritableTypeFamily.getInstance(),
+ new Sort.ColumnOrder[] { by(1, ASCENDING), by(2, DESCENDING) }, new String[] { "A", "this doc has this text" });
+ }
+
+ @Test
+ public void testWritableSortTable() throws Exception {
+ runTable(new SparkPipeline("local", "sort"), WritableTypeFamily.getInstance(), "A");
+ }
+
+ @Test
+ public void testAvroSortAsc() throws Exception {
+ runSingle(new SparkPipeline("local", "sort"), AvroTypeFamily.getInstance(), Sort.Order.ASCENDING, "A\tand this text as well");
+ }
+
+ @Test
+ public void testAvroSortDesc() throws Exception {
+ runSingle(new SparkPipeline("local", "sort"), AvroTypeFamily.getInstance(), Sort.Order.DESCENDING, "B\tthis doc has some text");
+ }
+
+ @Test
+ public void testAvroSortPairAscDesc() throws Exception {
+ runPair(new SparkPipeline("local", "sort"), AvroTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING), "A",
+ "this doc has this text");
+ }
+
+ @Test
+ public void testAvroSortPairSecondDescFirstAsc() throws Exception {
+ runPair(new SparkPipeline("local", "sort"), AvroTypeFamily.getInstance(), by(2, DESCENDING), by(1, ASCENDING), "A",
+ "this doc has this text");
+ }
+
+ @Test
+ public void testAvroSortTripleAscDescAsc() throws Exception {
+ runTriple(new SparkPipeline("local", "sort"), AvroTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING),
+ by(3, ASCENDING), "A", "this", "doc");
+ }
+
+ @Test
+ public void testAvroSortQuadAscDescAscDesc() throws Exception {
+ runQuad(new SparkPipeline("local", "sort"), AvroTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING),
+ by(3, ASCENDING), by(4, DESCENDING), "A", "this", "doc", "has");
+ }
+
+ @Test
+ public void testAvroSortTupleNAscDesc() throws Exception {
+ runTupleN(new SparkPipeline("local", "sort"), AvroTypeFamily.getInstance(),
+ new Sort.ColumnOrder[] { by(1, ASCENDING), by(2, DESCENDING) }, new String[] { "A", "this doc has this text" });
+ }
+
+ @Test
+ public void testAvroReflectSortPair() throws IOException {
+ Pipeline pipeline = new SparkPipeline("local", "sort");
+ pipeline.enableDebug();
+ String rsrc = tmpDir.copyResourceFileName("set2.txt");
+ PCollection<Pair<String, StringWrapper>> in = pipeline.readTextFile(rsrc)
+ .parallelDo(new MapFn<String, Pair<String, StringWrapper>>() {
+
+ @Override
+ public Pair<String, StringWrapper> map(String input) {
+ return Pair.of(input, wrap(input));
+ }
+ }, Avros.pairs(Avros.strings(), Avros.reflects(StringWrapper.class)));
+ PCollection<Pair<String, StringWrapper>> sorted = Sort.sort(in, Sort.Order.ASCENDING);
+
+ List<Pair<String, StringWrapper>> expected = Lists.newArrayList();
+ expected.add(Pair.of("a", wrap("a")));
+ expected.add(Pair.of("c", wrap("c")));
+ expected.add(Pair.of("d", wrap("d")));
+
+ assertEquals(expected, Lists.newArrayList(sorted.materialize()));
+ pipeline.done();
+ }
+
+ @Test
+ public void testAvroReflectSortTable() throws IOException {
+ Pipeline pipeline = new SparkPipeline("local", "sort");
+ PTable<String, StringWrapper> unsorted = pipeline.readTextFile(tmpDir.copyResourceFileName("set2.txt")).parallelDo(
+ new MapFn<String, Pair<String, StringWrapper>>() {
+
+ @Override
+ public Pair<String, StringWrapper> map(String input) {
+ return Pair.of(input, wrap(input));
+ }
+ }, Avros.tableOf(Avros.strings(), Avros.reflects(StringWrapper.class)));
+
+ PTable<String, StringWrapper> sorted = Sort.sort(unsorted);
+
+ List<Pair<String, StringWrapper>> expected = Lists.newArrayList();
+ expected.add(Pair.of("a", wrap("a")));
+ expected.add(Pair.of("c", wrap("c")));
+ expected.add(Pair.of("d", wrap("d")));
+
+ assertEquals(expected, Lists.newArrayList(sorted.materialize()));
+ pipeline.done();
+ }
+
+ @Test
+ public void testAvroSortTable() throws Exception {
+ runTable(new SparkPipeline("local", "sort"), AvroTypeFamily.getInstance(), "A");
+ }
+
+ private void runSingle(Pipeline pipeline, PTypeFamily typeFamily, Sort.Order order, String firstLine) throws IOException {
+ String inputPath = tmpDir.copyResourceFileName("docs.txt");
+
+ PCollection<String> input = pipeline.readTextFile(inputPath);
+ // following turns the input from Writables to required type family
+ PCollection<String> input2 = input.parallelDo(new DoFn<String, String>() {
+ @Override
+ public void process(String input, Emitter<String> emitter) {
+ emitter.emit(input);
+ }
+ }, typeFamily.strings());
+ PCollection<String> sorted = Sort.sort(input2, order);
+ Iterable<String> lines = sorted.materialize();
+
+ assertEquals(firstLine, lines.iterator().next());
+ pipeline.done(); // TODO: finally
+ }
+
+ private void runPair(Pipeline pipeline, PTypeFamily typeFamily, Sort.ColumnOrder first, Sort.ColumnOrder second,
+ String firstField, String secondField) throws IOException {
+ String inputPath = tmpDir.copyResourceFileName("docs.txt");
+
+ PCollection<String> input = pipeline.readTextFile(inputPath);
+ PTable<String, String> kv = input.parallelDo(new DoFn<String, Pair<String, String>>() {
+ @Override
+ public void process(String input, Emitter<Pair<String, String>> emitter) {
+ String[] split = input.split("[\t]+");
+ emitter.emit(Pair.of(split[0], split[1]));
+ }
+ }, typeFamily.tableOf(typeFamily.strings(), typeFamily.strings()));
+ PCollection<Pair<String, String>> sorted = Sort.sortPairs(kv, first, second);
+ List<Pair<String, String>> lines = Lists.newArrayList(sorted.materialize());
+ Pair<String, String> l = lines.iterator().next();
+ assertEquals(firstField, l.first());
+ assertEquals(secondField, l.second());
+ pipeline.done();
+ }
+
+ private void runTriple(Pipeline pipeline, PTypeFamily typeFamily, Sort.ColumnOrder first, Sort.ColumnOrder second,
+ Sort.ColumnOrder third, String firstField, String secondField, String thirdField) throws IOException {
+ String inputPath = tmpDir.copyResourceFileName("docs.txt");
+
+ PCollection<String> input = pipeline.readTextFile(inputPath);
+ PCollection<Tuple3<String, String, String>> kv = input.parallelDo(
+ new DoFn<String, Tuple3<String, String, String>>() {
+ @Override
+ public void process(String input, Emitter<Tuple3<String, String, String>> emitter) {
+ String[] split = input.split("[\t ]+");
+ int len = split.length;
+ emitter.emit(Tuple3.of(split[0], split[1 % len], split[2 % len]));
+ }
+ }, typeFamily.triples(typeFamily.strings(), typeFamily.strings(), typeFamily.strings()));
+ PCollection<Tuple3<String, String, String>> sorted = Sort.sortTriples(kv, first, second, third);
+ List<Tuple3<String, String, String>> lines = Lists.newArrayList(sorted.materialize());
+ Tuple3<String, String, String> l = lines.iterator().next();
+ assertEquals(firstField, l.first());
+ assertEquals(secondField, l.second());
+ assertEquals(thirdField, l.third());
+ pipeline.done();
+ }
+
+ private void runQuad(Pipeline pipeline, PTypeFamily typeFamily, Sort.ColumnOrder first, Sort.ColumnOrder second,
+ Sort.ColumnOrder third, Sort.ColumnOrder fourth, String firstField, String secondField, String thirdField,
+ String fourthField) throws IOException {
+ String inputPath = tmpDir.copyResourceFileName("docs.txt");
+
+ PCollection<String> input = pipeline.readTextFile(inputPath);
+ PCollection<Tuple4<String, String, String, String>> kv = input.parallelDo(
+ new DoFn<String, Tuple4<String, String, String, String>>() {
+ @Override
+ public void process(String input, Emitter<Tuple4<String, String, String, String>> emitter) {
+ String[] split = input.split("[\t ]+");
+ int len = split.length;
+ emitter.emit(Tuple4.of(split[0], split[1 % len], split[2 % len], split[3 % len]));
+ }
+ }, typeFamily.quads(typeFamily.strings(), typeFamily.strings(), typeFamily.strings(), typeFamily.strings()));
+ PCollection<Tuple4<String, String, String, String>> sorted = Sort.sortQuads(kv, first, second, third, fourth);
+ Iterable<Tuple4<String, String, String, String>> lines = sorted.materialize();
+ Tuple4<String, String, String, String> l = lines.iterator().next();
+ assertEquals(firstField, l.first());
+ assertEquals(secondField, l.second());
+ assertEquals(thirdField, l.third());
+ assertEquals(fourthField, l.fourth());
+ pipeline.done();
+ }
+
+ private void runTupleN(Pipeline pipeline, PTypeFamily typeFamily, Sort.ColumnOrder[] orders, String[] fields)
+ throws IOException {
+ String inputPath = tmpDir.copyResourceFileName("docs.txt");
+
+ PCollection<String> input = pipeline.readTextFile(inputPath);
+ PType[] types = new PType[orders.length];
+ Arrays.fill(types, typeFamily.strings());
+ PCollection<TupleN> kv = input.parallelDo(new DoFn<String, TupleN>() {
+ @Override
+ public void process(String input, Emitter<TupleN> emitter) {
+ String[] split = input.split("[\t]+");
+ emitter.emit(new TupleN(split));
+ }
+ }, typeFamily.tuples(types));
+ PCollection<TupleN> sorted = Sort.sortTuples(kv, orders);
+ Iterable<TupleN> lines = sorted.materialize();
+ TupleN l = lines.iterator().next();
+ int i = 0;
+ for (String field : fields) {
+ assertEquals(field, l.get(i++));
+ }
+ pipeline.done();
+ }
+
+ private void runTable(Pipeline pipeline, PTypeFamily typeFamily, String firstKey) throws IOException {
+ String inputPath = tmpDir.copyResourceFileName("docs.txt");
+
+ PCollection<String> input = pipeline.readTextFile(inputPath);
+ PTable<String, String> table = input.parallelDo(new DoFn<String, Pair<String, String>>() {
+ @Override
+ public void process(String input, Emitter<Pair<String, String>> emitter) {
+ String[] split = input.split("[\t]+");
+ emitter.emit(Pair.of(split[0], split[1]));
+ }
+ }, typeFamily.tableOf(typeFamily.strings(), typeFamily.strings()));
+
+ PTable<String, String> sorted = Sort.sort(table);
+ Iterable<Pair<String, String>> lines = sorted.materialize();
+ Pair<String, String> l = lines.iterator().next();
+ assertEquals(firstKey, l.first());
+ pipeline.done();
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/it/java/org/apache/crunch/SparkTfidfIT.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/java/org/apache/crunch/SparkTfidfIT.java b/crunch-spark/src/it/java/org/apache/crunch/SparkTfidfIT.java
new file mode 100644
index 0000000..f13166b
--- /dev/null
+++ b/crunch-spark/src/it/java/org/apache/crunch/SparkTfidfIT.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.crunch.impl.spark.SparkPipeline;
+import org.apache.crunch.io.seq.SeqFileSourceTarget;
+import org.apache.crunch.lib.Aggregate;
+import org.apache.crunch.lib.Join;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.apache.hadoop.fs.Path;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+
+@SuppressWarnings("serial")
+public class SparkTfidfIT implements Serializable {
+ @Rule
+ public transient TemporaryPath tmpDir = new TemporaryPath();
+
+ // total number of documents, should calculate
+ protected static final double N = 2;
+
+ private transient Pipeline pipeline;
+
+ @Before
+ public void setUp() throws Exception {
+ pipeline = new SparkPipeline("local", "tfidf");
+
+ }
+ @Test
+ public void testWritablesSingleRun() throws IOException {
+ run(pipeline, WritableTypeFamily.getInstance(), true);
+ }
+
+ @Test
+ public void testWritablesMultiRun() throws IOException {
+ run(pipeline, WritableTypeFamily.getInstance(), false);
+ }
+
+ /**
+ * This method should generate a TF-IDF score for the input.
+ */
+ public PTable<String, Collection<Pair<String, Double>>> generateTFIDF(PCollection<String> docs, Path termFreqPath,
+ PTypeFamily ptf) throws IOException {
+
+ /*
+ * Input: String Input title text
+ *
+ * Output: PTable<Pair<String, String>, Long> Pair<Pair<word, title>, count
+ * in title>
+ */
+ PTable<Pair<String, String>, Long> tf = Aggregate.count(docs.parallelDo("term document frequency",
+ new DoFn<String, Pair<String, String>>() {
+ @Override
+ public void process(String doc, Emitter<Pair<String, String>> emitter) {
+ String[] kv = doc.split("\t");
+ String title = kv[0];
+ String text = kv[1];
+ for (String word : text.split("\\W+")) {
+ if (word.length() > 0) {
+ Pair<String, String> pair = Pair.of(word.toLowerCase(), title);
+ emitter.emit(pair);
+ }
+ }
+ }
+ }, ptf.pairs(ptf.strings(), ptf.strings())));
+
+ tf.write(new SeqFileSourceTarget<Pair<Pair<String, String>, Long>>(termFreqPath, tf.getPType()));
+
+ /*
+ * Input: Pair<Pair<String, String>, Long> Pair<Pair<word, title>, count in
+ * title>
+ *
+ * Output: PTable<String, Long> PTable<word, # of docs containing word>
+ */
+ PTable<String, Long> n = Aggregate.count(tf.parallelDo("little n (# of docs contain word)",
+ new DoFn<Pair<Pair<String, String>, Long>, String>() {
+ @Override
+ public void process(Pair<Pair<String, String>, Long> input, Emitter<String> emitter) {
+ emitter.emit(input.first().first());
+ }
+ }, ptf.strings()));
+
+ /*
+ * Input: Pair<Pair<String, String>, Long> Pair<Pair<word, title>, count in
+ * title>
+ *
+ * Output: PTable<String, Pair<String, Long>> PTable<word, Pair<title, count
+ * in title>>
+ */
+ PTable<String, Collection<Pair<String, Long>>> wordDocumentCountPair = tf.parallelDo(
+ "transform wordDocumentPairCount",
+ new DoFn<Pair<Pair<String, String>, Long>, Pair<String, Collection<Pair<String, Long>>>>() {
+ Collection<Pair<String, Long>> buffer;
+ String key;
+
+ @Override
+ public void process(Pair<Pair<String, String>, Long> input,
+ Emitter<Pair<String, Collection<Pair<String, Long>>>> emitter) {
+ Pair<String, String> wordDocumentPair = input.first();
+ if (!wordDocumentPair.first().equals(key)) {
+ flush(emitter);
+ key = wordDocumentPair.first();
+ buffer = Lists.newArrayList();
+ }
+ buffer.add(Pair.of(wordDocumentPair.second(), input.second()));
+ }
+
+ protected void flush(Emitter<Pair<String, Collection<Pair<String, Long>>>> emitter) {
+ if (buffer != null) {
+ emitter.emit(Pair.of(key, buffer));
+ buffer = null;
+ }
+ }
+
+ @Override
+ public void cleanup(Emitter<Pair<String, Collection<Pair<String, Long>>>> emitter) {
+ flush(emitter);
+ }
+ }, ptf.tableOf(ptf.strings(), ptf.collections(ptf.pairs(ptf.strings(), ptf.longs()))));
+
+ PTable<String, Pair<Long, Collection<Pair<String, Long>>>> joinedResults = Join.join(n, wordDocumentCountPair);
+
+ /*
+ * Input: Pair<String, Pair<Long, Collection<Pair<String, Long>>> Pair<word,
+ * Pair<# of docs containing word, Collection<Pair<title, term frequency>>>
+ *
+ * Output: Pair<String, Collection<Pair<String, Double>>> Pair<word,
+ * Collection<Pair<title, tfidf>>>
+ */
+ return joinedResults
+ .mapValues(
+ new MapFn<Pair<Long, Collection<Pair<String, Long>>>, Collection<Pair<String, Double>>>() {
+ @Override
+ public Collection<Pair<String, Double>> map(
+ Pair<Long, Collection<Pair<String, Long>>> input) {
+ Collection<Pair<String, Double>> tfidfs = Lists.newArrayList();
+ double n = input.first();
+ double idf = Math.log(N / n);
+ for (Pair<String, Long> tf : input.second()) {
+ double tfidf = tf.second() * idf;
+ tfidfs.add(Pair.of(tf.first(), tfidf));
+ }
+ return tfidfs;
+ }
+
+ }, ptf.collections(ptf.pairs(ptf.strings(), ptf.doubles())));
+ }
+
+ public void run(Pipeline pipeline, PTypeFamily typeFamily, boolean singleRun) throws IOException {
+ String inputFile = tmpDir.copyResourceFileName("docs.txt");
+ String outputPath1 = tmpDir.getFileName("output1");
+ String outputPath2 = tmpDir.getFileName("output2");
+
+ Path tfPath = tmpDir.getPath("termfreq");
+
+ PCollection<String> docs = pipeline.readTextFile(inputFile);
+
+ PTable<String, Collection<Pair<String, Double>>> results = generateTFIDF(docs, tfPath, typeFamily);
+ pipeline.writeTextFile(results, outputPath1);
+ if (!singleRun) {
+ pipeline.run();
+ }
+
+ PTable<String, Collection<Pair<String, Double>>> uppercased = results.mapKeys(
+ new MapFn<String, String>() {
+ @Override
+ public String map(String k1) {
+ return k1.toUpperCase();
+ }
+ }, results.getKeyType());
+ pipeline.writeTextFile(uppercased, outputPath2);
+ pipeline.done();
+
+ // Check the lowercase version...
+ File outputFile = new File(outputPath1, "part-r-00000");
+ List<String> lines = Files.readLines(outputFile, Charset.defaultCharset());
+ boolean passed = false;
+ for (String line : lines) {
+ if (line.startsWith("[the") && line.contains("B,0.6931471805599453")) {
+ passed = true;
+ break;
+ }
+ }
+ assertTrue(passed);
+
+ // ...and the uppercase version
+ outputFile = new File(outputPath2, "part-r-00000");
+ lines = Files.readLines(outputFile, Charset.defaultCharset());
+ passed = false;
+ for (String line : lines) {
+ if (line.startsWith("[THE") && line.contains("B,0.6931471805599453")) {
+ passed = true;
+ break;
+ }
+ }
+ assertTrue(passed);
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/it/java/org/apache/crunch/SparkUnionResultsIT.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/java/org/apache/crunch/SparkUnionResultsIT.java b/crunch-spark/src/it/java/org/apache/crunch/SparkUnionResultsIT.java
new file mode 100644
index 0000000..db8509b
--- /dev/null
+++ b/crunch-spark/src/it/java/org/apache/crunch/SparkUnionResultsIT.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.crunch.impl.spark.SparkPipeline;
+import org.apache.crunch.io.At;
+import org.apache.crunch.io.To;
+import org.apache.crunch.test.CrunchTestSupport;
+import org.apache.crunch.types.writable.Writables;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+public class SparkUnionResultsIT extends CrunchTestSupport implements Serializable {
+
+ static class StringLengthMapFn extends MapFn<String, Pair<String, Long>> {
+ @Override
+ public Pair<String, Long> map(String input) {
+ return new Pair<String, Long>(input, 10L);
+ }
+ }
+
+
+ /**
+ * Tests combining a GBK output with a map-only job output into a single
+ * unioned collection.
+ */
+ @Test
+ public void testUnionOfGroupedOutputAndNonGroupedOutput() throws IOException {
+ String inputPath = tempDir.copyResourceFileName("set1.txt");
+ String inputPath2 = tempDir.copyResourceFileName("set2.txt");
+
+ Pipeline pipeline = new SparkPipeline("local", "unionresults");
+
+ PCollection<String> set1Lines = pipeline.read(At.textFile(inputPath, Writables.strings()));
+ PCollection<Pair<String, Long>> set1Lengths = set1Lines.parallelDo(new StringLengthMapFn(),
+ Writables.pairs(Writables.strings(), Writables.longs()));
+ PCollection<Pair<String, Long>> set2Counts = pipeline.read(At.textFile(inputPath2, Writables.strings())).count();
+
+ PCollection<Pair<String, Long>> union = set1Lengths.union(set2Counts);
+
+ Set<Pair<String, Long>> unionValues = Sets.newHashSet(union.materialize());
+ assertEquals(7, unionValues.size());
+
+ Set<Pair<String, Long>> expectedPairs = Sets.newHashSet();
+ expectedPairs.add(Pair.of("b", 10L));
+ expectedPairs.add(Pair.of("c", 10L));
+ expectedPairs.add(Pair.of("a", 10L));
+ expectedPairs.add(Pair.of("e", 10L));
+ expectedPairs.add(Pair.of("a", 1L));
+ expectedPairs.add(Pair.of("c", 1L));
+ expectedPairs.add(Pair.of("d", 1L));
+
+ assertEquals(expectedPairs, unionValues);
+
+ pipeline.done();
+ }
+
+ @Test
+ public void testMultiWrite() throws Exception {
+ String inputPath = tempDir.copyResourceFileName("set1.txt");
+ String inputPath2 = tempDir.copyResourceFileName("set2.txt");
+ String output = tempDir.getFileName("output");
+
+ Pipeline pipeline = new SparkPipeline("local", "multiwrite");
+
+ PCollection<String> set1Lines = pipeline.read(At.textFile(inputPath, Writables.strings()));
+ PTable<String, Long> set1Lengths = set1Lines.parallelDo(new StringLengthMapFn(),
+ Writables.tableOf(Writables.strings(), Writables.longs()));
+ PTable<String, Long> set2Counts = pipeline.read(At.textFile(inputPath2, Writables.strings())).count();
+
+ TableSourceTarget<String, Long> inter = At.sequenceFile(output, Writables.strings(), Writables.longs());
+ set1Lengths.write(inter);
+ set2Counts.write(inter, Target.WriteMode.APPEND);
+
+ pipeline.run();
+
+ PTable<String, Long> in = pipeline.read(inter);
+ Set<Pair<String, Long>> values = Sets.newHashSet(in.materialize());
+ assertEquals(7, values.size());
+
+ Set<Pair<String, Long>> expectedPairs = Sets.newHashSet();
+ expectedPairs.add(Pair.of("b", 10L));
+ expectedPairs.add(Pair.of("c", 10L));
+ expectedPairs.add(Pair.of("a", 10L));
+ expectedPairs.add(Pair.of("e", 10L));
+ expectedPairs.add(Pair.of("a", 1L));
+ expectedPairs.add(Pair.of("c", 1L));
+ expectedPairs.add(Pair.of("d", 1L));
+
+ assertEquals(expectedPairs, values);
+
+ pipeline.done();
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/it/java/org/apache/crunch/test/StringWrapper.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/java/org/apache/crunch/test/StringWrapper.java b/crunch-spark/src/it/java/org/apache/crunch/test/StringWrapper.java
new file mode 100644
index 0000000..34302b5
--- /dev/null
+++ b/crunch-spark/src/it/java/org/apache/crunch/test/StringWrapper.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.test;
+
+import org.apache.crunch.MapFn;
+
+/**
+ * Simple String wrapper for testing with Avro reflection.
+ */
+public class StringWrapper implements Comparable<StringWrapper> {
+
+ public static class StringToStringWrapperMapFn extends MapFn<String, StringWrapper> {
+
+ @Override
+ public StringWrapper map(String input) {
+ return wrap(input);
+ }
+
+ }
+
+ public static class StringWrapperToStringMapFn extends MapFn<StringWrapper, String> {
+
+ @Override
+ public String map(StringWrapper input) {
+ return input.getValue();
+ }
+
+ }
+
+ private String value;
+
+ public StringWrapper() {
+ this("");
+ }
+
+ public StringWrapper(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public int compareTo(StringWrapper o) {
+ return this.value.compareTo(o.value);
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public void setValue(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((value == null) ? 0 : value.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ StringWrapper other = (StringWrapper) obj;
+ if (value == null) {
+ if (other.value != null)
+ return false;
+ } else if (!value.equals(other.value))
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "StringWrapper [value=" + value + "]";
+ }
+
+ public static StringWrapper wrap(String value) {
+ return new StringWrapper(value);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/it/java/org/apache/crunch/test/Tests.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/java/org/apache/crunch/test/Tests.java b/crunch-spark/src/it/java/org/apache/crunch/test/Tests.java
new file mode 100644
index 0000000..dc4985e
--- /dev/null
+++ b/crunch-spark/src/it/java/org/apache/crunch/test/Tests.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.test;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class Tests {
+ /**
+ * This doesn't check whether the resource exists!
+ *
+ * @param testCase
+ * @param resourceName
+ * @return The path to the resource (never null)
+ */
+ public static String resource(Object testCase, String resourceName) {
+ checkNotNull(testCase);
+ checkNotNull(resourceName);
+
+ // Note: We append "Data" because otherwise Eclipse would complain about the
+ // the case's class name clashing with the resource directory's name.
+ return testCase.getClass().getName().replaceAll("\\.", "/") + "Data/" + resourceName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/it/resources/customers.txt
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/resources/customers.txt b/crunch-spark/src/it/resources/customers.txt
new file mode 100644
index 0000000..98f3f3d
--- /dev/null
+++ b/crunch-spark/src/it/resources/customers.txt
@@ -0,0 +1,4 @@
+111|John Doe
+222|Jane Doe
+333|Someone Else
+444|Has No Orders
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/it/resources/docs.txt
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/resources/docs.txt b/crunch-spark/src/it/resources/docs.txt
new file mode 100644
index 0000000..90a3f65
--- /dev/null
+++ b/crunch-spark/src/it/resources/docs.txt
@@ -0,0 +1,6 @@
+A this doc has this text
+A and this text as well
+A but also this
+B this doc has some text
+B but not as much as the last
+B doc
http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/it/resources/emptyTextFile.txt
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/resources/emptyTextFile.txt b/crunch-spark/src/it/resources/emptyTextFile.txt
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/it/resources/letters.txt
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/resources/letters.txt b/crunch-spark/src/it/resources/letters.txt
new file mode 100644
index 0000000..916bfc9
--- /dev/null
+++ b/crunch-spark/src/it/resources/letters.txt
@@ -0,0 +1,2 @@
+a
+bb
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/it/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/resources/log4j.properties b/crunch-spark/src/it/resources/log4j.properties
new file mode 100644
index 0000000..8769f6c
--- /dev/null
+++ b/crunch-spark/src/it/resources/log4j.properties
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# ***** Set root logger level to INFO and its only appender to A.
+log4j.logger.org.apache.crunch=info, A
+
+# Log warnings on Hadoop/Spark for the local runner when testing
+log4j.logger.org.apache.hadoop=warn, A
+log4j.logger.org.apache.spark=info, A
+# Except for Configuration, which is chatty.
+log4j.logger.org.apache.hadoop.conf.Configuration=error, A
+
+# ***** A is set to be a ConsoleAppender.
+log4j.appender.A=org.apache.log4j.ConsoleAppender
+# ***** A uses PatternLayout.
+log4j.appender.A.layout=org.apache.log4j.PatternLayout
+log4j.appender.A.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n