You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/04/23 22:41:24 UTC
[22/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java b/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java
new file mode 100644
index 0000000..5396fba
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.writable;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.Tuple4;
+import org.apache.crunch.TupleN;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class WritablesTest {
+
+ @Test
+ public void testNulls() throws Exception {
+ Void n = null;
+ NullWritable nw = NullWritable.get();
+ testInputOutputFn(Writables.nulls(), n, nw);
+ }
+
+ @Test
+ public void testStrings() throws Exception {
+ String s = "abc";
+ Text text = new Text(s);
+ testInputOutputFn(Writables.strings(), s, text);
+ }
+
+ @Test
+ public void testInts() throws Exception {
+ int j = 55;
+ IntWritable w = new IntWritable(j);
+ testInputOutputFn(Writables.ints(), j, w);
+ }
+
+ @Test
+ public void testLongs() throws Exception {
+ long j = 55;
+ LongWritable w = new LongWritable(j);
+ testInputOutputFn(Writables.longs(), j, w);
+ }
+
+ @Test
+ public void testFloats() throws Exception {
+ float j = 55.5f;
+ FloatWritable w = new FloatWritable(j);
+ testInputOutputFn(Writables.floats(), j, w);
+ }
+
+ @Test
+ public void testDoubles() throws Exception {
+ double j = 55.5d;
+ DoubleWritable w = new DoubleWritable(j);
+ testInputOutputFn(Writables.doubles(), j, w);
+ }
+
+ @Test
+ public void testBoolean() throws Exception {
+ boolean j = false;
+ BooleanWritable w = new BooleanWritable(j);
+ testInputOutputFn(Writables.booleans(), j, w);
+ }
+
+ @Test
+ public void testBytes() throws Exception {
+ byte[] bytes = new byte[] { 17, 26, -98 };
+ BytesWritable bw = new BytesWritable(bytes);
+ ByteBuffer bb = ByteBuffer.wrap(bytes);
+ testInputOutputFn(Writables.bytes(), bb, bw);
+ }
+
+ @Test
+ public void testCollections() throws Exception {
+ String s = "abc";
+ Collection<String> j = Lists.newArrayList();
+ j.add(s);
+ GenericArrayWritable<Text> w = new GenericArrayWritable<Text>(Text.class);
+ w.set(new Text[] { new Text(s) });
+ testInputOutputFn(Writables.collections(Writables.strings()), j, w);
+ }
+
+ @Test
+ public void testPairs() throws Exception {
+ Pair<String, String> j = Pair.of("a", "b");
+ TupleWritable w = new TupleWritable(new Text[] { new Text("a"), new Text("b"), });
+ w.setWritten(0);
+ w.setWritten(1);
+ testInputOutputFn(Writables.pairs(Writables.strings(), Writables.strings()), j, w);
+ }
+
+ @Test
+ public void testNestedTables() throws Exception {
+ PTableType<Long, Long> pll = Writables.tableOf(Writables.longs(), Writables.longs());
+ PTableType<Pair<Long, Long>, String> nest = Writables.tableOf(pll, Writables.strings());
+ assertNotNull(nest);
+ }
+
+ @Test
+ public void testPairEquals() throws Exception {
+ PType<Pair<Long, ByteBuffer>> t1 = Writables.pairs(Writables.longs(), Writables.bytes());
+ PType<Pair<Long, ByteBuffer>> t2 = Writables.pairs(Writables.longs(), Writables.bytes());
+ assertEquals(t1, t2);
+ assertEquals(t1.hashCode(), t2.hashCode());
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testTriples() throws Exception {
+ Tuple3 j = Tuple3.of("a", "b", "c");
+ TupleWritable w = new TupleWritable(new Text[] { new Text("a"), new Text("b"), new Text("c"), });
+ w.setWritten(0);
+ w.setWritten(1);
+ w.setWritten(2);
+ WritableType<?, ?> wt = Writables.triples(Writables.strings(), Writables.strings(), Writables.strings());
+ testInputOutputFn(wt, j, w);
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testQuads() throws Exception {
+ Tuple4 j = Tuple4.of("a", "b", "c", "d");
+ TupleWritable w = new TupleWritable(new Text[] { new Text("a"), new Text("b"), new Text("c"), new Text("d"), });
+ w.setWritten(0);
+ w.setWritten(1);
+ w.setWritten(2);
+ w.setWritten(3);
+ WritableType<?, ?> wt = Writables.quads(Writables.strings(), Writables.strings(), Writables.strings(),
+ Writables.strings());
+ testInputOutputFn(wt, j, w);
+ }
+
+ @Test
+ public void testTupleN() throws Exception {
+ TupleN j = new TupleN("a", "b", "c", "d", "e");
+ TupleWritable w = new TupleWritable(new Text[] { new Text("a"), new Text("b"), new Text("c"), new Text("d"),
+ new Text("e"), });
+ w.setWritten(0);
+ w.setWritten(1);
+ w.setWritten(2);
+ w.setWritten(3);
+ w.setWritten(4);
+ WritableType<?, ?> wt = Writables.tuples(Writables.strings(), Writables.strings(), Writables.strings(),
+ Writables.strings(), Writables.strings());
+ testInputOutputFn(wt, j, w);
+ }
+
+ protected static class TestWritable implements Writable {
+ String left;
+ int right;
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(left);
+ out.writeInt(right);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ left = in.readUTF();
+ right = in.readInt();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ TestWritable other = (TestWritable) obj;
+ if (left == null) {
+ if (other.left != null)
+ return false;
+ } else if (!left.equals(other.left))
+ return false;
+ if (right != other.right)
+ return false;
+ return true;
+ }
+
+ }
+
+ @Test
+ public void testRecords() throws Exception {
+ TestWritable j = new TestWritable();
+ j.left = "a";
+ j.right = 1;
+ TestWritable w = new TestWritable();
+ w.left = "a";
+ w.right = 1;
+ WritableType<?, ?> wt = Writables.records(TestWritable.class);
+ testInputOutputFn(wt, j, w);
+ }
+
+ @Test
+ public void testTableOf() throws Exception {
+ Pair<String, String> j = Pair.of("a", "b");
+ Pair<Text, Text> w = Pair.of(new Text("a"), new Text("b"));
+ WritableTableType<String, String> wtt = Writables.tableOf(Writables.strings(), Writables.strings());
+ testInputOutputFn(wtt, j, w);
+ }
+
+ @Test
+ public void testRegister() throws Exception {
+ WritableType<TestWritable, TestWritable> wt = Writables.writables(TestWritable.class);
+ Writables.register(TestWritable.class, wt);
+ assertSame(Writables.records(TestWritable.class), wt);
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ protected static void testInputOutputFn(PType ptype, Object java, Object writable) {
+ ptype.getInputMapFn().initialize();
+ ptype.getOutputMapFn().initialize();
+ assertEquals(java, ptype.getInputMapFn().map(writable));
+ assertEquals(writable, ptype.getOutputMapFn().map(java));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/util/DistCacheTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/util/DistCacheTest.java b/crunch-core/src/test/java/org/apache/crunch/util/DistCacheTest.java
new file mode 100644
index 0000000..6784f14
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/util/DistCacheTest.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class DistCacheTest {
+
+ // A temporary folder used to hold files created for the test.
+ @Rule
+ public TemporaryFolder testFolder = new TemporaryFolder();
+
+ // A configuration and lists of paths to use in tests.
+ private Configuration testConf;
+ private String[] testFilePaths;
+ private String[] testFileQualifiedPaths;
+
+ /**
+ * Setup resources for tests. These include:
+ * <ol>
+ * <li>A Hadoop configuration.
+ * <li>A directory of temporary files that includes 3 .jar files and 1 other
+ * file.
+ * <li>Arrays containing the canonical paths and qualified paths to the test
+ * files.
+ * </ol>
+ */
+ @Before
+ public void setup() throws IOException {
+ // Create a configuration for tests.
+ testConf = new Configuration();
+
+ // Create the test files and add their paths to the list of test file paths.
+ testFilePaths = new String[3];
+ testFilePaths[0] = testFolder.newFile("jar1.jar").getCanonicalPath();
+ testFilePaths[1] = testFolder.newFile("jar2.jar").getCanonicalPath();
+ testFilePaths[2] = testFolder.newFile("jar3.jar").getCanonicalPath();
+ testFolder.newFile("notJar.other");
+
+ // Populate a list of qualified paths from the test file paths.
+ testFileQualifiedPaths = new String[3];
+ for (int i = 0; i < testFilePaths.length; i++) {
+ testFileQualifiedPaths[i] = "file:" + testFilePaths[i];
+ }
+ }
+
+ /**
+ * Tests adding jars one-by-one to a job's configuration.
+ *
+ * @throws IOException
+ * If there is a problem adding the jars.
+ */
+ @Test
+ public void testAddJar() throws IOException {
+ // Add each valid jar path to the distributed cache configuration, and
+ // verify each was
+ // added correctly in turn.
+ for (int i = 0; i < testFilePaths.length; i++) {
+ DistCache.addJarToDistributedCache(testConf, testFilePaths[i]);
+ assertEquals("tmpjars configuration var does not contain expected value.",
+ StringUtils.join(testFileQualifiedPaths, ",", 0, i + 1), testConf.get("tmpjars"));
+ }
+ }
+
+ /**
+ * Tests that attempting to add the path to a jar that does not exist to the
+ * configuration throws an exception.
+ *
+ * @throws IOException
+ * If the added jar path does not exist. This exception is expected.
+ */
+ @Test(expected = IOException.class)
+ public void testAddJarThatDoesntExist() throws IOException {
+ DistCache.addJarToDistributedCache(testConf, "/garbage/doesntexist.jar");
+ }
+
+ /**
+ * Tests that adding a directory of jars to the configuration works as
+ * expected. .jar files under the added directory should be added to the
+ * configuration, and all other files should be skipped.
+ *
+ * @throws IOException
+ * If there is a problem adding the jar directory to the
+ * configuration.
+ */
+ @Test
+ public void testAddJarDirectory() throws IOException {
+ DistCache.addJarDirToDistributedCache(testConf, testFolder.getRoot().getCanonicalPath());
+ // Throw the added jar paths in a set to detect duplicates.
+ String[] splitJarPaths = StringUtils.split(testConf.get("tmpjars"), ",");
+ Set<String> addedJarPaths = new HashSet<String>();
+ for (String path : splitJarPaths) {
+ addedJarPaths.add(path);
+ }
+ assertEquals("Incorrect number of jar paths added.", testFilePaths.length, addedJarPaths.size());
+
+ // Ensure all expected paths were added.
+ for (int i = 0; i < testFileQualifiedPaths.length; i++) {
+ assertTrue("Expected jar path missing from jar paths added to tmpjars: " + testFileQualifiedPaths[i],
+ addedJarPaths.contains(testFileQualifiedPaths[i]));
+ }
+ }
+
+ /**
+ * Tests that adding a jar directory that does not exist to the configuration
+ * throws an exception.
+ *
+ * @throws IOException
+ * If the added jar directory does not exist. This exception is
+ * expected.
+ */
+ @Test(expected = IOException.class)
+ public void testAddJarDirectoryThatDoesntExist() throws IOException {
+ DistCache.addJarDirToDistributedCache(testConf, "/garbage/doesntexist");
+ }
+
+ /**
+ * Tests that adding a jar directory that is not a directory to the
+ * configuration throws an exception.
+ *
+ * @throws IOException
+ * If the added jar directory is not a directory. This exception is
+ * expected.
+ */
+ @Test(expected = IOException.class)
+ public void testAddJarDirectoryNotDirectory() throws IOException {
+ DistCache.addJarDirToDistributedCache(testConf, testFilePaths[0]);
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-dist/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-dist/pom.xml b/crunch-dist/pom.xml
index 749a767..cdd4256 100644
--- a/crunch-dist/pom.xml
+++ b/crunch-dist/pom.xml
@@ -35,7 +35,7 @@ under the License.
<dependencies>
<dependency>
<groupId>org.apache.crunch</groupId>
- <artifactId>crunch</artifactId>
+ <artifactId>crunch-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.crunch</groupId>
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-examples/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-examples/pom.xml b/crunch-examples/pom.xml
index fd790c3..fcbe30c 100644
--- a/crunch-examples/pom.xml
+++ b/crunch-examples/pom.xml
@@ -36,7 +36,7 @@ under the License.
<dependency>
<groupId>org.apache.crunch</groupId>
- <artifactId>crunch</artifactId>
+ <artifactId>crunch-core</artifactId>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-hbase/pom.xml b/crunch-hbase/pom.xml
index 656c6cc..df21ef8 100644
--- a/crunch-hbase/pom.xml
+++ b/crunch-hbase/pom.xml
@@ -31,7 +31,7 @@ under the License.
<dependencies>
<dependency>
<groupId>org.apache.crunch</groupId>
- <artifactId>crunch</artifactId>
+ <artifactId>crunch-core</artifactId>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-scrunch/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-scrunch/pom.xml b/crunch-scrunch/pom.xml
index 7db5ac7..b97766a 100644
--- a/crunch-scrunch/pom.xml
+++ b/crunch-scrunch/pom.xml
@@ -43,7 +43,7 @@ under the License.
</dependency>
<dependency>
<groupId>org.apache.crunch</groupId>
- <artifactId>crunch</artifactId>
+ <artifactId>crunch-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/pom.xml
----------------------------------------------------------------------
diff --git a/crunch/pom.xml b/crunch/pom.xml
deleted file mode 100644
index 2a38913..0000000
--- a/crunch/pom.xml
+++ /dev/null
@@ -1,182 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
-http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.crunch</groupId>
- <artifactId>crunch-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
- </parent>
-
- <artifactId>crunch</artifactId>
- <name>Apache Crunch Core</name>
-
- <dependencies>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro-mapred</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.javassist</groupId>
- <artifactId>javassist</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <scope>provided</scope>
- </dependency>
-
- <!-- Override the slf4j dependency from Avro, which is incompatible with
- Hadoop's. -->
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>commons-codec</groupId>
- <artifactId>commons-codec</artifactId>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-core-asl</artifactId>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-mapper-asl</artifactId>
- <scope>provided</scope>
- </dependency>
-
- <!-- Both Protobufs and Thrift are supported as
- derived serialization types, and you can use
- (almost) any version of them you like, Crunch
- only relies on the stable public APIs, not the
- structure of the files themselves.
-
- Both dependencies are scoped as provided, in
- order to not expand the size of the assembly jars
- unnecessarily.
- -->
-
- <dependency>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.thrift</groupId>
- <artifactId>libthrift</artifactId>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <scope>provided</scope>
- </dependency>
-
- <!-- Used by LocalJobRunner in integration tests -->
- <dependency>
- <groupId>commons-httpclient</groupId>
- <artifactId>commons-httpclient</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.crunch</groupId>
- <artifactId>crunch-test</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-all</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.hamcrest</groupId>
- <artifactId>hamcrest-all</artifactId>
- <scope>test</scope>
- </dependency>
-
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-failsafe-plugin</artifactId>
- </plugin>
- <plugin>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro-maven-plugin</artifactId>
- <executions>
- <execution>
- <id>schemas</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>schema</goal>
- </goals>
- <configuration>
- <testSourceDirectory>${project.basedir}/src/test/avro/</testSourceDirectory>
- <testOutputDirectory>target/generated-test-sources/</testOutputDirectory>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-
-</project>
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/CancelJobsIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/CancelJobsIT.java b/crunch/src/it/java/org/apache/crunch/CancelJobsIT.java
deleted file mode 100644
index ff01a2f..0000000
--- a/crunch/src/it/java/org/apache/crunch/CancelJobsIT.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.To;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.junit.Rule;
-import org.junit.Test;
-
-/**
- *
- */
-public class CancelJobsIT {
-
- @Rule
- public TemporaryPath tmpDir = TemporaryPaths.create();
-
- @Test
- public void testRun() throws Exception {
- PipelineExecution pe = run();
- pe.waitUntilDone();
- PipelineResult pr = pe.getResult();
- assertEquals(PipelineExecution.Status.SUCCEEDED, pe.getStatus());
- assertEquals(2, pr.getStageResults().size());
- }
-
- @Test
- public void testKill() throws Exception {
- PipelineExecution pe = run();
- pe.kill();
- pe.waitUntilDone();
- assertEquals(PipelineExecution.Status.KILLED, pe.getStatus());
- }
-
- @Test
- public void testKillMultipleTimes() throws Exception {
- PipelineExecution pe = run();
- for (int i = 0; i < 10; i++) {
- pe.kill();
- }
- pe.waitUntilDone();
- assertEquals(PipelineExecution.Status.KILLED, pe.getStatus());
- }
-
- @Test
- public void testKillAfterDone() throws Exception {
- PipelineExecution pe = run();
- pe.waitUntilDone();
- assertEquals(PipelineExecution.Status.SUCCEEDED, pe.getStatus());
- pe.kill(); // expect no-op
- assertEquals(PipelineExecution.Status.SUCCEEDED, pe.getStatus());
- }
-
- public PipelineExecution run() throws IOException {
- String shakes = tmpDir.copyResourceFileName("shakes.txt");
- String out = tmpDir.getFileName("cancel");
- Pipeline p = new MRPipeline(CancelJobsIT.class, tmpDir.getDefaultConfiguration());
- PCollection<String> words = p.readTextFile(shakes);
- p.write(words.count().top(20), To.textFile(out));
- return p.runAsync(); // need to hack to slow down job start up if this test becomes flaky.
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/CleanTextIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/CleanTextIT.java b/crunch/src/it/java/org/apache/crunch/CleanTextIT.java
deleted file mode 100644
index 2f4004e..0000000
--- a/crunch/src/it/java/org/apache/crunch/CleanTextIT.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.nio.charset.Charset;
-import java.util.List;
-
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.To;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.avro.Avros;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.io.Files;
-
-/**
- *
- */
-public class CleanTextIT {
-
- private static final int LINES_IN_SHAKES = 3667;
-
- @Rule
- public TemporaryPath tmpDir = TemporaryPaths.create();
-
- static DoFn<String, String> CLEANER = new DoFn<String, String>() {
- @Override
- public void process(String input, Emitter<String> emitter) {
- emitter.emit(input.toLowerCase());
- }
- };
-
- static DoFn<String, String> SPLIT = new DoFn<String, String>() {
- @Override
- public void process(String input, Emitter<String> emitter) {
- for (String word : input.split("\\S+")) {
- if (!word.isEmpty()) {
- emitter.emit(word);
- }
- }
- }
- };
-
- @Test
- public void testMapSideOutputs() throws Exception {
- Pipeline pipeline = new MRPipeline(CleanTextIT.class, tmpDir.getDefaultConfiguration());
- String shakesInputPath = tmpDir.copyResourceFileName("shakes.txt");
- PCollection<String> shakespeare = pipeline.readTextFile(shakesInputPath);
-
- PCollection<String> cleanShakes = shakespeare.parallelDo(CLEANER, Avros.strings());
- File cso = tmpDir.getFile("cleanShakes");
- cleanShakes.write(To.textFile(cso.getAbsolutePath()));
-
- File wc = tmpDir.getFile("wordCounts");
- cleanShakes.parallelDo(SPLIT, Avros.strings()).count().write(To.textFile(wc.getAbsolutePath()));
- pipeline.done();
-
- File cleanFile = new File(cso, "part-m-00000");
- List<String> lines = Files.readLines(cleanFile, Charset.defaultCharset());
- assertEquals(LINES_IN_SHAKES, lines.size());
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/CollectionPObjectIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/CollectionPObjectIT.java b/crunch/src/it/java/org/apache/crunch/CollectionPObjectIT.java
deleted file mode 100644
index 7e0c75c..0000000
--- a/crunch/src/it/java/org/apache/crunch/CollectionPObjectIT.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.lang.String;
-import java.util.Collection;
-
-import org.apache.crunch.PCollection;
-import org.apache.crunch.PObject;
-import org.apache.crunch.Pipeline;
-import org.apache.crunch.impl.mem.MemPipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.materialize.pobject.CollectionPObject;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.junit.Rule;
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class CollectionPObjectIT {
-
- private static final int LINES_IN_SHAKES = 3667;
-
- private static final String FIRST_SHAKESPEARE_LINE =
- "***The Project Gutenberg's Etext of Shakespeare's First Folio***";
-
- private static final String LAST_SHAKESPEARE_LINE =
- "FINIS. THE TRAGEDIE OF MACBETH.";
-
- @Rule
- public TemporaryPath tmpDir = TemporaryPaths.create();
-
- @Test
- public void testPObjectMRPipeline() throws IOException {
- runPObject(new MRPipeline(CollectionPObjectIT.class, tmpDir.getDefaultConfiguration()));
- }
-
- @Test
- public void testAsCollectionMRPipeline() throws IOException {
- runAsCollection(new MRPipeline(CollectionPObjectIT.class, tmpDir.getDefaultConfiguration()));
- }
-
- @Test
- public void testPObjectMemPipeline() throws IOException {
- runPObject(MemPipeline.getInstance());
- }
-
- @Test
- public void testAsCollectionMemPipeline() throws IOException {
- runAsCollection(MemPipeline.getInstance());
- }
-
- private PCollection<String> getPCollection(Pipeline pipeline) throws IOException {
- String shakesInputPath = tmpDir.copyResourceFileName("shakes.txt");
- PCollection<String> shakespeare = pipeline.readTextFile(shakesInputPath);
- return shakespeare;
- }
-
- private void verifyLines(String[] lines) {
- assertEquals("Not enough lines in Shakespeare.", LINES_IN_SHAKES, lines.length);
- assertEquals("First line in Shakespeare is wrong.", FIRST_SHAKESPEARE_LINE, lines[0]);
- assertEquals("Last line in Shakespeare is wrong.", LAST_SHAKESPEARE_LINE,
- lines[lines.length - 1]);
- }
-
- public void runPObject(Pipeline pipeline) throws IOException {
- PCollection<String> shakespeare = getPCollection(pipeline);
- PObject<Collection<String>> linesP = new CollectionPObject<String>(shakespeare);
- String[] lines = new String[LINES_IN_SHAKES];
- lines = linesP.getValue().toArray(lines);
- verifyLines(lines);
- }
-
- public void runAsCollection(Pipeline pipeline) throws IOException {
- PCollection<String> shakespeare = getPCollection(pipeline);
- String[] lines = new String[LINES_IN_SHAKES];
- lines = shakespeare.asCollection().getValue().toArray(lines);
- verifyLines(lines);
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/CollectionsIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/CollectionsIT.java b/crunch/src/it/java/org/apache/crunch/CollectionsIT.java
deleted file mode 100644
index 17d0cae..0000000
--- a/crunch/src/it/java/org/apache/crunch/CollectionsIT.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import org.apache.crunch.fn.Aggregators.SimpleAggregator;
-import org.apache.crunch.impl.mem.MemPipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
-@SuppressWarnings("serial")
-public class CollectionsIT {
-
- private static class AggregateStringListFn extends SimpleAggregator<Collection<String>> {
- private final Collection<String> rtn = Lists.newArrayList();
-
- @Override
- public void reset() {
- rtn.clear();
- }
-
- @Override
- public void update(Collection<String> values) {
- rtn.addAll(values);
- }
-
- @Override
- public Iterable<Collection<String>> results() {
- return ImmutableList.of(rtn);
- }
- }
-
- private static PTable<String, Collection<String>> listOfCharcters(PCollection<String> lines, PTypeFamily typeFamily) {
-
- return lines.parallelDo(new DoFn<String, Pair<String, Collection<String>>>() {
- @Override
- public void process(String line, Emitter<Pair<String, Collection<String>>> emitter) {
- for (String word : line.split("\\s+")) {
- Collection<String> characters = Lists.newArrayList();
- for (char c : word.toCharArray()) {
- characters.add(String.valueOf(c));
- }
- emitter.emit(Pair.of(word, characters));
- }
- }
- }, typeFamily.tableOf(typeFamily.strings(), typeFamily.collections(typeFamily.strings())))
- .groupByKey().combineValues(new AggregateStringListFn());
- }
-
- @Rule
- public TemporaryPath tmpDir = TemporaryPaths.create();
-
- @Test
- public void testWritables() throws IOException {
- run(new MRPipeline(CollectionsIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance());
- }
-
- @Test
- public void testAvro() throws IOException {
- run(new MRPipeline(CollectionsIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance());
- }
-
- @Test
- public void testInMemoryWritables() throws IOException {
- run(MemPipeline.getInstance(), WritableTypeFamily.getInstance());
- }
-
- @Test
- public void testInMemoryAvro() throws IOException {
- run(MemPipeline.getInstance(), AvroTypeFamily.getInstance());
- }
-
- public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
- String shakesInputPath = tmpDir.copyResourceFileName("shakes.txt");
-
- PCollection<String> shakespeare = pipeline.readTextFile(shakesInputPath);
- Iterable<Pair<String, Collection<String>>> lines = listOfCharcters(shakespeare, typeFamily).materialize();
-
- boolean passed = false;
- for (Pair<String, Collection<String>> line : lines) {
- if (line.first().startsWith("yellow")) {
- passed = true;
- break;
- }
- }
- pipeline.done();
- assertTrue(passed);
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/CollectionsLengthIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/CollectionsLengthIT.java b/crunch/src/it/java/org/apache/crunch/CollectionsLengthIT.java
deleted file mode 100644
index 3a38b92..0000000
--- a/crunch/src/it/java/org/apache/crunch/CollectionsLengthIT.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.lang.Long;
-
-import org.apache.crunch.impl.mem.MemPipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-import org.junit.Rule;
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class CollectionsLengthIT {
-
- public static final Long LINES_IN_SHAKESPEARE = 3667L;
-
- @Rule
- public TemporaryPath tmpDir = TemporaryPaths.create();
-
- @Test
- public void testWritables() throws IOException {
- run(new MRPipeline(CollectionsIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance());
- }
-
- @Test
- public void testAvro() throws IOException {
- run(new MRPipeline(CollectionsIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance());
- }
-
- @Test
- public void testInMemoryWritables() throws IOException {
- run(MemPipeline.getInstance(), WritableTypeFamily.getInstance());
- }
-
- @Test
- public void testInMemoryAvro() throws IOException {
- run(MemPipeline.getInstance(), AvroTypeFamily.getInstance());
- }
-
- public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
- String shakesInputPath = tmpDir.copyResourceFileName("shakes.txt");
-
- PCollection<String> shakespeare = pipeline.readTextFile(shakesInputPath);
- Long length = shakespeare.length().getValue();
- assertEquals("Incorrect length for shakespear PCollection.", LINES_IN_SHAKESPEARE, length);
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/DeepCopyCustomTuplesIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/DeepCopyCustomTuplesIT.java b/crunch/src/it/java/org/apache/crunch/DeepCopyCustomTuplesIT.java
deleted file mode 100644
index f1323ca..0000000
--- a/crunch/src/it/java/org/apache/crunch/DeepCopyCustomTuplesIT.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.apache.crunch.types.avro.Avros.*;
-import static org.junit.Assert.assertEquals;
-
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PType;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.Iterables;
-
-/**
- *
- */
-public class DeepCopyCustomTuplesIT {
- @Rule
- public TemporaryPath tmpDir = TemporaryPaths.create();
-
- public static class PID extends Pair<Integer, String> {
- public PID(Integer first, String second) {
- super(first, second);
- }
- }
-
- private static PType<PID> pids = tuples(PID.class, ints(), strings());
-
- @Test
- public void testDeepCopyCustomTuple() throws Exception {
- Pipeline p = new MRPipeline(DeepCopyCustomTuplesIT.class, tmpDir.getDefaultConfiguration());
- String shakesInputPath = tmpDir.copyResourceFileName("shakes.txt");
- PCollection<String> shakes = p.readTextFile(shakesInputPath);
- Iterable<String> out = shakes
- .parallelDo(new PreProcFn(), tableOf(ints(), pairs(ints(), pids)))
- .groupByKey()
- .parallelDo(new PostProcFn(), strings())
- .materialize();
- assertEquals(65, Iterables.size(out));
- p.done();
- }
-
- private static class PreProcFn extends MapFn<String, Pair<Integer, Pair<Integer, PID>>> {
- private int counter = 0;
- @Override
- public Pair<Integer, Pair<Integer, PID>> map(String input) {
- return Pair.of(counter++, Pair.of(counter++, new PID(input.length(), input)));
- }
- };
-
- private static class PostProcFn extends DoFn<Pair<Integer, Iterable<Pair<Integer, PID>>>, String> {
- @Override
- public void process(Pair<Integer, Iterable<Pair<Integer, PID>>> input, Emitter<String> emitter) {
- for (Pair<Integer, PID> p : input.second()) {
- if (p.second().first() > 0 && p.second().first() < 10) {
- emitter.emit(p.second().second());
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/EnumPairIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/EnumPairIT.java b/crunch/src/it/java/org/apache/crunch/EnumPairIT.java
deleted file mode 100644
index 1d0974e..0000000
--- a/crunch/src/it/java/org/apache/crunch/EnumPairIT.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PTypes;
-import org.apache.crunch.types.writable.Writables;
-import org.junit.Rule;
-import org.junit.Test;
-
-public class EnumPairIT implements Serializable {
- @Rule
- public transient TemporaryPath tmpDir = TemporaryPaths.create();
-
- static enum etypes {
- type1,
- }
-
- @Test
- public void testEnumPTypes() throws IOException {
- String inputFile1 = tmpDir.copyResourceFileName("set1.txt");
- Pipeline pipeline = new MRPipeline(EnumPairIT.class);
- PCollection<String> set1 = pipeline.readTextFile(inputFile1);
- PTable<String, etypes> data = set1.parallelDo(new DoFn<String, Pair<String, etypes>>() {
- @Override
- public void process(String input, Emitter<Pair<String, etypes>> emitter) {
- emitter.emit(new Pair<String, etypes>(input, etypes.type1));
- }
- }, Writables.tableOf(Writables.strings(), PTypes.enums(etypes.class, set1.getTypeFamily())));
-
- Iterable<Pair<String, etypes>> materialized = data.materialize();
- pipeline.run();
- for (Pair<String, etypes> pair : materialized) {
- assertEquals(etypes.type1, pair.second());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/FirstElementPObjectIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/FirstElementPObjectIT.java b/crunch/src/it/java/org/apache/crunch/FirstElementPObjectIT.java
deleted file mode 100644
index d985e10..0000000
--- a/crunch/src/it/java/org/apache/crunch/FirstElementPObjectIT.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.lang.String;
-
-import org.apache.crunch.PCollection;
-import org.apache.crunch.PObject;
-import org.apache.crunch.impl.mem.MemPipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.materialize.pobject.FirstElementPObject;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.junit.Rule;
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class FirstElementPObjectIT {
-
- private static final String FIRST_SHAKESPEARE_LINE =
- "***The Project Gutenberg's Etext of Shakespeare's First Folio***";
-
- @Rule
- public TemporaryPath tmpDir = TemporaryPaths.create();
-
- @Test
- public void testMRPipeline() throws IOException {
- run(new MRPipeline(FirstElementPObjectIT.class, tmpDir.getDefaultConfiguration()));
- }
-
- @Test
- public void testInMemoryPipeline() throws IOException {
- run(MemPipeline.getInstance());
- }
-
- public void run(Pipeline pipeline) throws IOException {
- String shakesInputPath = tmpDir.copyResourceFileName("shakes.txt");
- PCollection<String> shakespeare = pipeline.readTextFile(shakesInputPath);
- PObject<String> firstLine = new FirstElementPObject<String>(shakespeare);
- String first = firstLine.getValue();
- assertEquals("First line in Shakespeare is wrong.", FIRST_SHAKESPEARE_LINE, first);
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/IterableReuseProtectionIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/IterableReuseProtectionIT.java b/crunch/src/it/java/org/apache/crunch/IterableReuseProtectionIT.java
deleted file mode 100644
index da487eb..0000000
--- a/crunch/src/it/java/org/apache/crunch/IterableReuseProtectionIT.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.crunch.fn.IdentityFn;
-import org.apache.crunch.impl.mem.MemPipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.writable.Writables;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-/**
- * Verify that calling the iterator method on a Reducer-based Iterable
- * is forcefully disallowed.
- */
-public class IterableReuseProtectionIT {
-
- @Rule
- public TemporaryPath tmpDir = TemporaryPaths.create();
-
-
- public void checkIteratorReuse(Pipeline pipeline) throws IOException {
- Iterable<String> values = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt"))
- .by(IdentityFn.<String>getInstance(), Writables.strings())
- .groupByKey()
- .combineValues(new TestIterableReuseFn())
- .values().materialize();
-
- List<String> valueList = Lists.newArrayList(values);
- Collections.sort(valueList);
- assertEquals(Lists.newArrayList("a", "b", "c", "e"), valueList);
- }
-
- @Test
- public void testIteratorReuse_MRPipeline() throws IOException {
- checkIteratorReuse(new MRPipeline(IterableReuseProtectionIT.class, tmpDir.getDefaultConfiguration()));
- }
-
- @Test
- public void testIteratorReuse_InMemoryPipeline() throws IOException {
- checkIteratorReuse(MemPipeline.getInstance());
- }
-
- static class TestIterableReuseFn extends CombineFn<String, String> {
-
- @Override
- public void process(Pair<String, Iterable<String>> input, Emitter<Pair<String, String>> emitter) {
- StringBuilder combinedBuilder = new StringBuilder();
- for (String v : input.second()) {
- combinedBuilder.append(v);
- }
-
- try {
- input.second().iterator();
- throw new RuntimeException("Second call to iterator should throw an exception");
- } catch (IllegalStateException e) {
- // Expected situation
- }
- emitter.emit(Pair.of(input.first(), combinedBuilder.toString()));
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java b/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java
deleted file mode 100644
index 7670e88..0000000
--- a/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.crunch.fn.FilterFns;
-import org.apache.crunch.fn.IdentityFn;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.To;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.writable.Writables;
-import org.junit.Rule;
-import org.junit.Test;
-
-public class MRPipelineIT implements Serializable {
- @Rule
- public transient TemporaryPath tmpDir = TemporaryPaths.create();
-
- @Test
- public void materializedColShouldBeWritten() throws Exception {
- File textFile = tmpDir.copyResourceFile("shakes.txt");
- Pipeline pipeline = new MRPipeline(MRPipelineIT.class, tmpDir.getDefaultConfiguration());
- PCollection<String> genericCollection = pipeline.readTextFile(textFile.getAbsolutePath());
- pipeline.run();
- PCollection<String> filter = genericCollection.filter("Filtering data", FilterFns.<String>ACCEPT_ALL());
- filter.materialize();
- pipeline.run();
- File file = tmpDir.getFile("output.txt");
- Target outFile = To.textFile(file.getAbsolutePath());
- PCollection<String> write = filter.write(outFile);
- write.materialize();
- pipeline.run();
- }
-
-
-
- @Test
- public void testPGroupedTableToMultipleOutputs() throws IOException{
- Pipeline pipeline = new MRPipeline(MRPipelineIT.class, tmpDir.getDefaultConfiguration());
- PGroupedTable<String, String> groupedLineTable = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt")).by(IdentityFn.<String>getInstance(), Writables.strings()).groupByKey();
-
- PTable<String, String> ungroupedTableA = groupedLineTable.ungroup();
- PTable<String, String> ungroupedTableB = groupedLineTable.ungroup();
-
- File outputDirA = tmpDir.getFile("output_a");
- File outputDirB = tmpDir.getFile("output_b");
-
- pipeline.writeTextFile(ungroupedTableA, outputDirA.getAbsolutePath());
- pipeline.writeTextFile(ungroupedTableB, outputDirB.getAbsolutePath());
- pipeline.done();
-
- // Verify that output from a single PGroupedTable can be sent to multiple collections
- assertTrue(new File(outputDirA, "part-r-00000").exists());
- assertTrue(new File(outputDirB, "part-r-00000").exists());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/MapPObjectIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/MapPObjectIT.java b/crunch/src/it/java/org/apache/crunch/MapPObjectIT.java
deleted file mode 100644
index c48284f..0000000
--- a/crunch/src/it/java/org/apache/crunch/MapPObjectIT.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static junit.framework.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.crunch.impl.mem.MemPipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.materialize.pobject.MapPObject;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PTypeFamily;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.ImmutableList;
-
-public class MapPObjectIT {
-
- static final ImmutableList<Pair<Integer, String>> kvPairs = ImmutableList.of(Pair.of(0, "a"), Pair.of(1, "b"),
- Pair.of(2, "c"), Pair.of(3, "e"));
-
- public void assertMatches(Map<Integer, String> m) {
- for (Integer k : m.keySet()) {
- assertEquals(kvPairs.get(k).second(), m.get(k));
- }
- }
-
- private static class Set1Mapper extends MapFn<String, Pair<Integer, String>> {
- @Override
- public Pair<Integer, String> map(String input) {
-
- int k = -1;
- if (input.equals("a"))
- k = 0;
- else if (input.equals("b"))
- k = 1;
- else if (input.equals("c"))
- k = 2;
- else if (input.equals("e"))
- k = 3;
- return Pair.of(k, input);
- }
- }
- @Rule
- public TemporaryPath tmpDir = TemporaryPaths.create();
-
- @Test
- public void testMemMapPObject() {
- PTable<Integer, String> table = MemPipeline.tableOf(kvPairs);
- PObject<Map<Integer, String>> map = new MapPObject<Integer, String>(table);
- assertMatches(map.getValue());
- }
-
- @Test
- public void testMemAsMap() {
- PTable<Integer, String> table = MemPipeline.tableOf(kvPairs);
- assertMatches(table.asMap().getValue());
- }
-
- private PTable<Integer, String> getMRPTable() throws IOException {
- Pipeline p = new MRPipeline(MaterializeToMapIT.class, tmpDir.getDefaultConfiguration());
- String inputFile = tmpDir.copyResourceFileName("set1.txt");
- PCollection<String> c = p.readTextFile(inputFile);
- PTypeFamily tf = c.getTypeFamily();
- PTable<Integer, String> table = c.parallelDo(new Set1Mapper(), tf.tableOf(tf.ints(),
- tf.strings()));
- return table;
- }
-
- @Test
- public void testMRMapPObject() throws IOException {
- PTable<Integer, String> table = getMRPTable();
- PObject<Map<Integer, String>> map = new MapPObject<Integer, String>(table);
- assertMatches(map.getValue());
- }
-
- @Test
- public void testMRAsMap() throws IOException {
- PTable<Integer, String> table = getMRPTable();
- assertMatches(table.asMap().getValue());
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/MapsIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/MapsIT.java b/crunch/src/it/java/org/apache/crunch/MapsIT.java
deleted file mode 100644
index 5b3187b..0000000
--- a/crunch/src/it/java/org/apache/crunch/MapsIT.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-
-import java.util.Map;
-
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-
-public class MapsIT {
- @Rule
- public TemporaryPath tmpDir = TemporaryPaths.create();
-
- @Test
- public void testWritables() throws Exception {
- run(WritableTypeFamily.getInstance(), tmpDir);
- }
-
- @Test
- public void testAvros() throws Exception {
- run(AvroTypeFamily.getInstance(), tmpDir);
- }
-
- public static void run(PTypeFamily typeFamily, TemporaryPath tmpDir) throws Exception {
- Pipeline pipeline = new MRPipeline(MapsIT.class, tmpDir.getDefaultConfiguration());
- String shakesInputPath = tmpDir.copyResourceFileName("shakes.txt");
- PCollection<String> shakespeare = pipeline.readTextFile(shakesInputPath);
- Iterable<Pair<String, Map<String, Long>>> output = shakespeare
- .parallelDo(new DoFn<String, Pair<String, Map<String, Long>>>() {
- @Override
- public void process(String input, Emitter<Pair<String, Map<String, Long>>> emitter) {
- String last = null;
- for (String word : input.toLowerCase().split("\\W+")) {
- if (!word.isEmpty()) {
- String firstChar = word.substring(0, 1);
- if (last != null) {
- Map<String, Long> cc = ImmutableMap.of(firstChar, 1L);
- emitter.emit(Pair.of(last, cc));
- }
- last = firstChar;
- }
- }
- }
- }, typeFamily.tableOf(typeFamily.strings(), typeFamily.maps(typeFamily.longs()))).groupByKey()
- .combineValues(new CombineFn<String, Map<String, Long>>() {
- @Override
- public void process(Pair<String, Iterable<Map<String, Long>>> input,
- Emitter<Pair<String, Map<String, Long>>> emitter) {
- Map<String, Long> agg = Maps.newHashMap();
- for (Map<String, Long> in : input.second()) {
- for (Map.Entry<String, Long> e : in.entrySet()) {
- if (!agg.containsKey(e.getKey())) {
- agg.put(e.getKey(), e.getValue());
- } else {
- agg.put(e.getKey(), e.getValue() + agg.get(e.getKey()));
- }
- }
- }
- emitter.emit(Pair.of(input.first(), agg));
- }
- }).materialize();
-
- boolean passed = false;
- for (Pair<String, Map<String, Long>> v : output) {
- if (v.first().equals("k") && v.second().get("n") == 8L) {
- passed = true;
- break;
- }
- }
- pipeline.done();
-
- assertThat(passed, is(true));
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/MaterializeIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/MaterializeIT.java b/crunch/src/it/java/org/apache/crunch/MaterializeIT.java
deleted file mode 100644
index d064993..0000000
--- a/crunch/src/it/java/org/apache/crunch/MaterializeIT.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.crunch.fn.FilterFns;
-import org.apache.crunch.impl.mem.MemPipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.test.Person;
-import org.apache.crunch.test.StringWrapper;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.avro.Avros;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-import org.junit.Assume;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-public class MaterializeIT {
-
- @Rule
- public TemporaryPath tmpDir = TemporaryPaths.create();
-
- @Test
- public void testMaterializeInput_Writables() throws IOException {
- runMaterializeInput(new MRPipeline(MaterializeIT.class, tmpDir.getDefaultConfiguration()),
- WritableTypeFamily.getInstance());
- }
-
- @Test
- public void testMaterializeInput_Avro() throws IOException {
- runMaterializeInput(new MRPipeline(MaterializeIT.class, tmpDir.getDefaultConfiguration()),
- AvroTypeFamily.getInstance());
- }
-
- @Test
- public void testMaterializeInput_InMemoryWritables() throws IOException {
- runMaterializeInput(MemPipeline.getInstance(), WritableTypeFamily.getInstance());
- }
-
- @Test
- public void testMaterializeInput_InMemoryAvro() throws IOException {
- runMaterializeInput(MemPipeline.getInstance(), AvroTypeFamily.getInstance());
- }
-
- @Test
- public void testMaterializeEmptyIntermediate_Writables() throws IOException {
- runMaterializeEmptyIntermediate(
- new MRPipeline(MaterializeIT.class, tmpDir.getDefaultConfiguration()),
- WritableTypeFamily.getInstance());
- }
-
- @Test
- public void testMaterializeEmptyIntermediate_Avro() throws IOException {
- runMaterializeEmptyIntermediate(
- new MRPipeline(MaterializeIT.class, tmpDir.getDefaultConfiguration()),
- AvroTypeFamily.getInstance());
- }
-
- @Test
- public void testMaterializeEmptyIntermediate_InMemoryWritables() throws IOException {
- runMaterializeEmptyIntermediate(MemPipeline.getInstance(), WritableTypeFamily.getInstance());
- }
-
- @Test
- public void testMaterializeEmptyIntermediate_InMemoryAvro() throws IOException {
- runMaterializeEmptyIntermediate(MemPipeline.getInstance(), AvroTypeFamily.getInstance());
- }
-
- public void runMaterializeInput(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
- List<String> expectedContent = Lists.newArrayList("b", "c", "a", "e");
- String inputPath = tmpDir.copyResourceFileName("set1.txt");
-
- PCollection<String> lines = pipeline.readTextFile(inputPath);
- assertEquals(expectedContent, Lists.newArrayList(lines.materialize()));
- pipeline.done();
- }
-
- public void runMaterializeEmptyIntermediate(Pipeline pipeline, PTypeFamily typeFamily)
- throws IOException {
- String inputPath = tmpDir.copyResourceFileName("set1.txt");
- PCollection<String> empty = pipeline.readTextFile(inputPath).filter(FilterFns.<String>REJECT_ALL());
-
- assertTrue(Lists.newArrayList(empty.materialize()).isEmpty());
- pipeline.done();
- }
-
- static class StringToStringWrapperPersonPairMapFn extends MapFn<String, Pair<StringWrapper, Person>> {
-
- @Override
- public Pair<StringWrapper, Person> map(String input) {
- Person person = new Person();
- person.name = input;
- person.age = 42;
- person.siblingnames = Lists.<CharSequence> newArrayList();
- return Pair.of(new StringWrapper(input), person);
- }
-
- }
-
- @Test
- public void testMaterializeAvroPersonAndReflectsPair_GroupedTable() throws IOException {
- Assume.assumeTrue(Avros.CAN_COMBINE_SPECIFIC_AND_REFLECT_SCHEMAS);
- Pipeline pipeline = new MRPipeline(MaterializeIT.class);
- List<Pair<StringWrapper, Person>> pairList = Lists.newArrayList(pipeline
- .readTextFile(tmpDir.copyResourceFileName("set1.txt"))
- .parallelDo(new StringToStringWrapperPersonPairMapFn(),
- Avros.pairs(Avros.reflects(StringWrapper.class), Avros.records(Person.class)))
- .materialize());
-
- // We just need to make sure this doesn't crash
- assertEquals(4, pairList.size());
-
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/MaterializeToMapIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/MaterializeToMapIT.java b/crunch/src/it/java/org/apache/crunch/MaterializeToMapIT.java
deleted file mode 100644
index 7fef30e..0000000
--- a/crunch/src/it/java/org/apache/crunch/MaterializeToMapIT.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static junit.framework.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.crunch.impl.mem.MemPipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PTypeFamily;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.ImmutableList;
-
-public class MaterializeToMapIT {
-
- static final ImmutableList<Pair<Integer, String>> kvPairs = ImmutableList.of(Pair.of(0, "a"), Pair.of(1, "b"),
- Pair.of(2, "c"), Pair.of(3, "e"));
-
- public void assertMatches(Map<Integer, String> m) {
- for (Integer k : m.keySet()) {
- assertEquals(kvPairs.get(k).second(), m.get(k));
- }
- }
-
- @Test
- public void testMemMaterializeToMap() {
- assertMatches(MemPipeline.tableOf(kvPairs).materializeToMap());
- }
-
- private static class Set1Mapper extends MapFn<String, Pair<Integer, String>> {
- @Override
- public Pair<Integer, String> map(String input) {
-
- int k = -1;
- if (input.equals("a"))
- k = 0;
- else if (input.equals("b"))
- k = 1;
- else if (input.equals("c"))
- k = 2;
- else if (input.equals("e"))
- k = 3;
- return Pair.of(k, input);
- }
- }
- @Rule
- public TemporaryPath tmpDir = TemporaryPaths.create();
-
- @Test
- public void testMRMaterializeToMap() throws IOException {
- Pipeline p = new MRPipeline(MaterializeToMapIT.class, tmpDir.getDefaultConfiguration());
- String inputFile = tmpDir.copyResourceFileName("set1.txt");
- PCollection<String> c = p.readTextFile(inputFile);
- PTypeFamily tf = c.getTypeFamily();
- PTable<Integer, String> t = c.parallelDo(new Set1Mapper(), tf.tableOf(tf.ints(), tf.strings()));
- Map<Integer, String> m = t.materializeToMap();
- assertMatches(m);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/MultipleOutputIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/MultipleOutputIT.java b/crunch/src/it/java/org/apache/crunch/MultipleOutputIT.java
deleted file mode 100644
index 1a85b6a..0000000
--- a/crunch/src/it/java/org/apache/crunch/MultipleOutputIT.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.At;
-import org.apache.crunch.test.StringWrapper;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.avro.Avros;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-import org.apache.crunch.types.writable.Writables;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-import com.google.common.io.Files;
-
-public class MultipleOutputIT {
- @Rule
- public TemporaryPath tmpDir = TemporaryPaths.create();
-
- public static PCollection<String> evenCountLetters(PCollection<String> words, PTypeFamily typeFamily) {
- return words.parallelDo("even", new FilterFn<String>() {
-
- @Override
- public boolean accept(String input) {
- return input.length() % 2 == 0;
- }
- }, typeFamily.strings());
- }
-
- public static PCollection<String> oddCountLetters(PCollection<String> words, PTypeFamily typeFamily) {
- return words.parallelDo("odd", new FilterFn<String>() {
-
- @Override
- public boolean accept(String input) {
- return input.length() % 2 != 0;
- }
- }, typeFamily.strings());
-
- }
-
- public static PTable<String, Long> substr(PTable<String, Long> ptable) {
- return ptable.parallelDo(new DoFn<Pair<String, Long>, Pair<String, Long>>() {
- public void process(Pair<String, Long> input, Emitter<Pair<String, Long>> emitter) {
- if (input.first().length() > 0) {
- emitter.emit(Pair.of(input.first().substring(0, 1), input.second()));
- }
- }
- }, ptable.getPTableType());
- }
-
- @Test
- public void testWritables() throws IOException {
- run(new MRPipeline(MultipleOutputIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance());
- }
-
- @Test
- public void testAvro() throws IOException {
- run(new MRPipeline(MultipleOutputIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance());
- }
-
- @Test
- public void testParallelDosFused() throws IOException {
-
- PipelineResult result = run(new MRPipeline(MultipleOutputIT.class, tmpDir.getDefaultConfiguration()),
- WritableTypeFamily.getInstance());
-
- // Ensure our multiple outputs were fused into a single job.
- assertEquals("parallel Dos not fused into a single job", 1, result.getStageResults().size());
- }
-
- public PipelineResult run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
- String inputPath = tmpDir.copyResourceFileName("letters.txt");
- String outputPathEven = tmpDir.getFileName("even");
- String outputPathOdd = tmpDir.getFileName("odd");
-
- PCollection<String> words = pipeline.read(At.textFile(inputPath, typeFamily.strings()));
-
- PCollection<String> evenCountWords = evenCountLetters(words, typeFamily);
- PCollection<String> oddCountWords = oddCountLetters(words, typeFamily);
- pipeline.writeTextFile(evenCountWords, outputPathEven);
- pipeline.writeTextFile(oddCountWords, outputPathOdd);
-
- PipelineResult result = pipeline.done();
-
- checkFileContents(outputPathEven, Arrays.asList("bb"));
- checkFileContents(outputPathOdd, Arrays.asList("a"));
-
- return result;
- }
-
- /**
- * Mutates the state of an input and then emits the mutated object.
- */
- static class AppendFn extends DoFn<StringWrapper, StringWrapper> {
-
- private String value;
-
- public AppendFn(String value) {
- this.value = value;
- }
-
- @Override
- public void process(StringWrapper input, Emitter<StringWrapper> emitter) {
- input.setValue(input.getValue() + value);
- emitter.emit(input);
- }
-
- }
-
- /**
- * Fusing multiple pipelines has a risk of running into object reuse bugs.
- * This test verifies that mutating the state of an object that is passed
- * through multiple streams of a pipeline doesn't allow one stream to affect
- * another.
- */
- @Test
- public void testFusedMappersObjectReuseBug() throws IOException {
- Pipeline pipeline = new MRPipeline(MultipleOutputIT.class, tmpDir.getDefaultConfiguration());
- PCollection<StringWrapper> stringWrappers = pipeline.readTextFile(tmpDir.copyResourceFileName("set2.txt"))
- .parallelDo(new StringWrapper.StringToStringWrapperMapFn(), Avros.reflects(StringWrapper.class));
-
- PCollection<String> stringsA = stringWrappers.parallelDo(new AppendFn("A"), stringWrappers.getPType())
- .parallelDo(new StringWrapper.StringWrapperToStringMapFn(), Writables.strings());
- PCollection<String> stringsB = stringWrappers.parallelDo(new AppendFn("B"), stringWrappers.getPType())
- .parallelDo(new StringWrapper.StringWrapperToStringMapFn(), Writables.strings());
-
- String outputA = tmpDir.getFileName("stringsA");
- String outputB = tmpDir.getFileName("stringsB");
-
- pipeline.writeTextFile(stringsA, outputA);
- pipeline.writeTextFile(stringsB, outputB);
- PipelineResult pipelineResult = pipeline.done();
-
- // Make sure fusing did actually occur
- assertEquals(1, pipelineResult.getStageResults().size());
-
- checkFileContents(outputA, Lists.newArrayList("cA", "dA", "aA"));
- checkFileContents(outputB, Lists.newArrayList("cB", "dB", "aB"));
-
- }
-
- private void checkFileContents(String filePath, List<String> expected) throws IOException {
- File outputFile = new File(filePath, "part-m-00000");
- List<String> lines = Files.readLines(outputFile, Charset.defaultCharset());
- assertEquals(expected, lines);
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/PCollectionGetSizeIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/PCollectionGetSizeIT.java b/crunch/src/it/java/org/apache/crunch/PCollectionGetSizeIT.java
deleted file mode 100644
index 44eb897..0000000
--- a/crunch/src/it/java/org/apache/crunch/PCollectionGetSizeIT.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static com.google.common.collect.Lists.newArrayList;
-import static org.apache.crunch.io.At.sequenceFile;
-import static org.apache.crunch.io.At.textFile;
-import static org.apache.crunch.types.writable.Writables.strings;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-
-import java.io.IOException;
-
-import org.apache.crunch.fn.FilterFns;
-import org.apache.crunch.impl.mem.MemPipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-
-public class PCollectionGetSizeIT {
- @Rule
- public TemporaryPath tmpDir = TemporaryPaths.create();
-
- private String emptyInputPath;
- private String nonEmptyInputPath;
- private String outputPath;
-
- @Before
- public void setUp() throws IOException {
- emptyInputPath = tmpDir.copyResourceFileName("emptyTextFile.txt");
- nonEmptyInputPath = tmpDir.copyResourceFileName("set1.txt");
- outputPath = tmpDir.getFileName("output");
- }
-
- @Test
- public void testGetSizeOfEmptyInput_MRPipeline() throws IOException {
- testCollectionGetSizeOfEmptyInput(new MRPipeline(this.getClass(), tmpDir.getDefaultConfiguration()));
- }
-
- @Test
- public void testGetSizeOfEmptyInput_MemPipeline() throws IOException {
- testCollectionGetSizeOfEmptyInput(MemPipeline.getInstance());
- }
-
- private void testCollectionGetSizeOfEmptyInput(Pipeline pipeline) throws IOException {
-
- assertThat(pipeline.read(textFile(emptyInputPath)).getSize(), is(0L));
- }
-
- @Test
- public void testMaterializeEmptyInput_MRPipeline() throws IOException {
- testMaterializeEmptyInput(new MRPipeline(this.getClass(), tmpDir.getDefaultConfiguration()));
- }
-
- @Test
- public void testMaterializeEmptyImput_MemPipeline() throws IOException {
- testMaterializeEmptyInput(MemPipeline.getInstance());
- }
-
- private void testMaterializeEmptyInput(Pipeline pipeline) throws IOException {
- assertThat(newArrayList(pipeline.readTextFile(emptyInputPath).materialize().iterator()).size(), is(0));
- }
-
- @Test
- public void testGetSizeOfEmptyIntermediatePCollection_MRPipeline() throws IOException {
-
- PCollection<String> emptyIntermediate = createPesistentEmptyIntermediate(
- new MRPipeline(this.getClass(), tmpDir.getDefaultConfiguration()));
-
- assertThat(emptyIntermediate.getSize(), is(0L));
- }
-
- @Test
- @Ignore("GetSize of a DoCollection is only an estimate based on scale factor, so we can't count on it being reported as 0")
- public void testGetSizeOfEmptyIntermediatePCollection_NoSave_MRPipeline() throws IOException {
-
- PCollection<String> data = new MRPipeline(this.getClass(), tmpDir.getDefaultConfiguration())
- .readTextFile(nonEmptyInputPath);
-
- PCollection<String> emptyPCollection = data.filter(FilterFns.<String>REJECT_ALL());
-
- assertThat(emptyPCollection.getSize(), is(0L));
- }
-
- @Test
- public void testGetSizeOfEmptyIntermediatePCollection_MemPipeline() {
-
- PCollection<String> emptyIntermediate = createPesistentEmptyIntermediate(MemPipeline.getInstance());
-
- assertThat(emptyIntermediate.getSize(), is(0L));
- }
-
- @Test
- public void testMaterializeOfEmptyIntermediatePCollection_MRPipeline() throws IOException {
-
- PCollection<String> emptyIntermediate = createPesistentEmptyIntermediate(
- new MRPipeline(this.getClass(), tmpDir.getDefaultConfiguration()));
-
- assertThat(newArrayList(emptyIntermediate.materialize()).size(), is(0));
- }
-
- @Test
- public void testMaterializeOfEmptyIntermediatePCollection_MemPipeline() {
-
- PCollection<String> emptyIntermediate = createPesistentEmptyIntermediate(MemPipeline.getInstance());
-
- assertThat(newArrayList(emptyIntermediate.materialize()).size(), is(0));
- }
-
- private PCollection<String> createPesistentEmptyIntermediate(Pipeline pipeline) {
-
- PCollection<String> data = pipeline.readTextFile(nonEmptyInputPath);
-
- PCollection<String> emptyPCollection = data.filter(FilterFns.<String>REJECT_ALL());
-
- emptyPCollection.write(sequenceFile(outputPath, strings()));
-
- pipeline.run();
-
- return pipeline.read(sequenceFile(outputPath, strings()));
- }
-
- @Test(expected = IllegalStateException.class)
- public void testExpectExceptionForGettingSizeOfNonExistingFile_MRPipeline() throws IOException {
- new MRPipeline(this.getClass(), tmpDir.getDefaultConfiguration()).readTextFile("non_existing.file").getSize();
- }
-
- @Test(expected = IllegalStateException.class)
- public void testExpectExceptionForGettingSizeOfNonExistingFile_MemPipeline() {
- MemPipeline.getInstance().readTextFile("non_existing.file").getSize();
- }
-}