You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/04/23 22:41:22 UTC
[20/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java b/crunch/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java
deleted file mode 100644
index 671b920..0000000
--- a/crunch/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.avro;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericData.Record;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.crunch.PCollection;
-import org.apache.crunch.Pipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.At;
-import org.apache.crunch.test.Person;
-import org.apache.crunch.test.StringWrapper;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.avro.Avros;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-@SuppressWarnings("serial")
-public class AvroFileSourceTargetIT implements Serializable {
-
- private transient File avroFile;
- @Rule
- public transient TemporaryPath tmpDir = TemporaryPaths.create();
-
- @Before
- public void setUp() throws IOException {
- avroFile = tmpDir.getFile("test.avro");
- }
-
- private void populateGenericFile(List<GenericRecord> genericRecords, Schema schema) throws IOException {
- FileOutputStream outputStream = new FileOutputStream(this.avroFile);
- GenericDatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<GenericRecord>(schema);
-
- DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(genericDatumWriter);
- dataFileWriter.create(schema, outputStream);
-
- for (GenericRecord record : genericRecords) {
- dataFileWriter.append(record);
- }
-
- dataFileWriter.close();
- outputStream.close();
-
- }
-
- @Test
- public void testSpecific() throws IOException {
- GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
- savedRecord.put("name", "John Doe");
- savedRecord.put("age", 42);
- savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
- populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
-
- Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class, tmpDir.getDefaultConfiguration());
- PCollection<Person> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(),
- Avros.records(Person.class)));
-
- List<Person> personList = Lists.newArrayList(genericCollection.materialize());
-
- Person expectedPerson = new Person();
- expectedPerson.name = "John Doe";
- expectedPerson.age = 42;
-
- List<CharSequence> siblingNames = Lists.newArrayList();
- siblingNames.add("Jimmy");
- siblingNames.add("Jane");
- expectedPerson.siblingnames = siblingNames;
-
- assertEquals(Lists.newArrayList(expectedPerson), Lists.newArrayList(personList));
- }
-
- @Test
- public void testGeneric() throws IOException {
- String genericSchemaJson = Person.SCHEMA$.toString().replace("Person", "GenericPerson");
- Schema genericPersonSchema = new Schema.Parser().parse(genericSchemaJson);
- GenericRecord savedRecord = new GenericData.Record(genericPersonSchema);
- savedRecord.put("name", "John Doe");
- savedRecord.put("age", 42);
- savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
- populateGenericFile(Lists.newArrayList(savedRecord), genericPersonSchema);
-
- Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class, tmpDir.getDefaultConfiguration());
- PCollection<Record> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(),
- Avros.generics(genericPersonSchema)));
-
- List<Record> recordList = Lists.newArrayList(genericCollection.materialize());
-
- assertEquals(Lists.newArrayList(savedRecord), Lists.newArrayList(recordList));
- }
-
- @Test
- public void testReflect() throws IOException {
- Schema pojoPersonSchema = ReflectData.get().getSchema(StringWrapper.class);
- GenericRecord savedRecord = new GenericData.Record(pojoPersonSchema);
- savedRecord.put("value", "stringvalue");
- populateGenericFile(Lists.newArrayList(savedRecord), pojoPersonSchema);
-
- Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class, tmpDir.getDefaultConfiguration());
- PCollection<StringWrapper> stringValueCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(),
- Avros.reflects(StringWrapper.class)));
-
- List<StringWrapper> recordList = Lists.newArrayList(stringValueCollection.materialize());
-
- assertEquals(1, recordList.size());
- StringWrapper stringWrapper = recordList.get(0);
- assertEquals("stringvalue", stringWrapper.getValue());
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java b/crunch/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java
deleted file mode 100644
index 29bf4f5..0000000
--- a/crunch/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.crunch.io.avro;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.commons.io.FileUtils;
-import org.apache.crunch.PCollection;
-import org.apache.crunch.Pipeline;
-import org.apache.crunch.Target;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.At;
-import org.apache.crunch.io.To;
-import org.apache.crunch.test.Person;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.avro.Avros;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-public class AvroPipelineIT implements Serializable {
-
- private transient File avroFile;
- @Rule
- public transient TemporaryPath tmpDir = TemporaryPaths.create();
-
- @Before
- public void setUp() throws IOException {
- avroFile = tmpDir.getFile("test.avro");
- }
-
- private void populateGenericFile(List<GenericRecord> genericRecords, Schema schema) throws IOException {
- FileOutputStream outputStream = new FileOutputStream(this.avroFile);
- GenericDatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<GenericRecord>(schema);
-
- DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(genericDatumWriter);
- dataFileWriter.create(schema, outputStream);
-
- for (GenericRecord record : genericRecords) {
- dataFileWriter.append(record);
- }
-
- dataFileWriter.close();
- outputStream.close();
-
- }
-
- @Test
- public void toTextShouldWriteAvroDataAsDatumText() throws Exception {
- GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
- savedRecord.put("name", "John Doe");
- savedRecord.put("age", 42);
- savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
- populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
-
- Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class, tmpDir.getDefaultConfiguration());
- PCollection<Person> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(),
- Avros.records(Person.class)));
- File outputFile = tmpDir.getFile("output");
- Target textFile = To.textFile(outputFile.getAbsolutePath());
- pipeline.write(genericCollection, textFile);
- pipeline.run();
- Person person = genericCollection.materialize().iterator().next();
- String outputString = FileUtils.readFileToString(new File(outputFile, "part-m-00000"));
- assertTrue(outputString.contains(person.toString()));
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java b/crunch/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java
deleted file mode 100644
index 7a90517..0000000
--- a/crunch/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.avro;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.crunch.MapFn;
-import org.apache.crunch.PCollection;
-import org.apache.crunch.Pair;
-import org.apache.crunch.Pipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.lib.Aggregate;
-import org.apache.crunch.test.Person;
-import org.apache.crunch.test.StringWrapper;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.avro.Avros;
-import org.junit.Assume;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-public class AvroReflectIT implements Serializable {
-
- @Rule
- public transient TemporaryPath tmpDir = TemporaryPaths.create();
-
- @Test
- public void testReflection() throws IOException {
- Pipeline pipeline = new MRPipeline(AvroReflectIT.class, tmpDir.getDefaultConfiguration());
- PCollection<StringWrapper> stringWrapperCollection = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt"))
- .parallelDo(new MapFn<String, StringWrapper>() {
-
- @Override
- public StringWrapper map(String input) {
- StringWrapper stringWrapper = new StringWrapper();
- stringWrapper.setValue(input);
- return stringWrapper;
- }
- }, Avros.reflects(StringWrapper.class));
-
- List<StringWrapper> stringWrappers = Lists.newArrayList(stringWrapperCollection.materialize());
-
- pipeline.done();
-
- assertEquals(Lists.newArrayList(new StringWrapper("b"), new StringWrapper("c"), new StringWrapper("a"),
- new StringWrapper("e")), stringWrappers);
-
- }
-
- // Verify that running with a combination of reflect and specific schema
- // doesn't crash
- @Test
- public void testCombinationOfReflectionAndSpecific() throws IOException {
- Assume.assumeTrue(Avros.CAN_COMBINE_SPECIFIC_AND_REFLECT_SCHEMAS);
- Pipeline pipeline = new MRPipeline(AvroReflectIT.class, tmpDir.getDefaultConfiguration());
- PCollection<Pair<StringWrapper, Person>> hybridPairCollection = pipeline.readTextFile(
- tmpDir.copyResourceFileName("set1.txt")).parallelDo(new MapFn<String, Pair<StringWrapper, Person>>() {
-
- @Override
- public Pair<StringWrapper, Person> map(String input) {
- Person person = new Person();
- person.name = input;
- person.age = 42;
- person.siblingnames = Lists.<CharSequence> newArrayList(input);
-
- return Pair.of(new StringWrapper(input), person);
- }
- }, Avros.pairs(Avros.reflects(StringWrapper.class), Avros.records(Person.class)));
-
- PCollection<Pair<String, Long>> countCollection = Aggregate.count(hybridPairCollection).parallelDo(
- new MapFn<Pair<Pair<StringWrapper, Person>, Long>, Pair<String, Long>>() {
-
- @Override
- public Pair<String, Long> map(Pair<Pair<StringWrapper, Person>, Long> input) {
- return Pair.of(input.first().first().getValue(), input.second());
- }
- }, Avros.pairs(Avros.strings(), Avros.longs()));
-
- List<Pair<String, Long>> materialized = Lists.newArrayList(countCollection.materialize());
- List<Pair<String, Long>> expected = Lists.newArrayList(Pair.of("a", 1L), Pair.of("b", 1L), Pair.of("c", 1L),
- Pair.of("e", 1L));
- Collections.sort(materialized);
-
- assertEquals(expected, materialized);
- pipeline.done();
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/io/avro/AvroWritableIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/io/avro/AvroWritableIT.java b/crunch/src/it/java/org/apache/crunch/io/avro/AvroWritableIT.java
deleted file mode 100644
index cbb7fde..0000000
--- a/crunch/src/it/java/org/apache/crunch/io/avro/AvroWritableIT.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.avro;
-
-import static org.apache.crunch.types.avro.Avros.ints;
-import static org.apache.crunch.types.avro.Avros.tableOf;
-import static org.apache.crunch.types.avro.Avros.writables;
-import static org.junit.Assert.assertEquals;
-
-import java.io.Serializable;
-import java.util.Map;
-
-import org.apache.crunch.CombineFn;
-import org.apache.crunch.Emitter;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.PCollection;
-import org.apache.crunch.Pair;
-import org.apache.crunch.Pipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.hadoop.io.DoubleWritable;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.Maps;
-
-/**
- * Verify handling of both a ByteBuffer and byte array as input from an Avro job (depending
- * on the version of Avro being used).
- */
-public class AvroWritableIT implements Serializable {
-
- @Rule
- public transient TemporaryPath tmpDir = TemporaryPaths.create();
-
- @Test
- public void testAvroBasedWritablePipeline() throws Exception {
- String customersInputPath = tmpDir.copyResourceFileName("customers.txt");
- Pipeline pipeline = new MRPipeline(AvroWritableIT.class, tmpDir.getDefaultConfiguration());
- pipeline.enableDebug();
- PCollection<String> customerLines = pipeline.readTextFile(customersInputPath);
- Map<Integer, DoubleWritable> outputMap = customerLines.parallelDo(
- new MapFn<String, Pair<Integer, DoubleWritable>>() {
- @Override
- public Pair<Integer, DoubleWritable> map(String input) {
- int len = input.length();
- return Pair.of(len, new DoubleWritable(len));
- }
- }, tableOf(ints(), writables(DoubleWritable.class)))
- .groupByKey()
- .combineValues(new CombineFn<Integer, DoubleWritable>() {
- @Override
- public void process(Pair<Integer, Iterable<DoubleWritable>> input,
- Emitter<Pair<Integer, DoubleWritable>> emitter) {
- double sum = 0.0;
- for (DoubleWritable dw : input.second()) {
- sum += dw.get();
- }
- emitter.emit(Pair.of(input.first(), new DoubleWritable(sum)));
- }
- })
- .materializeToMap();
-
- Map<Integer, DoubleWritable> expectedMap = Maps.newHashMap();
- expectedMap.put(17, new DoubleWritable(17.0));
- expectedMap.put(16, new DoubleWritable(16.0));
- expectedMap.put(12, new DoubleWritable(24.0));
-
- assertEquals(expectedMap, outputMap);
-
- pipeline.done();
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/lib/AggregateIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/AggregateIT.java b/crunch/src/it/java/org/apache/crunch/lib/AggregateIT.java
deleted file mode 100644
index 56ee3ac..0000000
--- a/crunch/src/it/java/org/apache/crunch/lib/AggregateIT.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib;
-
-import static org.apache.crunch.types.writable.Writables.strings;
-import static org.apache.crunch.types.writable.Writables.tableOf;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-
-import org.apache.crunch.MapFn;
-import org.apache.crunch.PCollection;
-import org.apache.crunch.PTable;
-import org.apache.crunch.Pair;
-import org.apache.crunch.Pipeline;
-import org.apache.crunch.impl.mem.MemPipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.test.Employee;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PTableType;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.avro.Avros;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-import org.apache.crunch.types.writable.Writables;
-import org.apache.hadoop.io.Text;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
-public class AggregateIT {
- @Rule
- public TemporaryPath tmpDir = TemporaryPaths.create();
-
- @Test
- public void testWritables() throws Exception {
- Pipeline pipeline = new MRPipeline(AggregateIT.class, tmpDir.getDefaultConfiguration());
- String shakesInputPath = tmpDir.copyResourceFileName("shakes.txt");
- PCollection<String> shakes = pipeline.readTextFile(shakesInputPath);
- runMinMax(shakes, WritableTypeFamily.getInstance());
- pipeline.done();
- }
-
- @Test
- public void testAvro() throws Exception {
- Pipeline pipeline = new MRPipeline(AggregateIT.class, tmpDir.getDefaultConfiguration());
- String shakesInputPath = tmpDir.copyResourceFileName("shakes.txt");
- PCollection<String> shakes = pipeline.readTextFile(shakesInputPath);
- runMinMax(shakes, AvroTypeFamily.getInstance());
- pipeline.done();
- }
-
- @Test
- public void testInMemoryAvro() throws Exception {
- PCollection<String> someText = MemPipeline.collectionOf("first line", "second line", "third line");
- runMinMax(someText, AvroTypeFamily.getInstance());
- }
-
- public static void runMinMax(PCollection<String> shakes, PTypeFamily family) throws Exception {
- PCollection<Integer> lengths = shakes.parallelDo(new MapFn<String, Integer>() {
- @Override
- public Integer map(String input) {
- return input.length();
- }
- }, family.ints());
- PCollection<Integer> negLengths = lengths.parallelDo(new MapFn<Integer, Integer>() {
- @Override
- public Integer map(Integer input) {
- return -input;
- }
- }, family.ints());
- Integer maxLengths = Aggregate.max(lengths).getValue();
- Integer minLengths = Aggregate.min(negLengths).getValue();
- assertTrue(maxLengths != null);
- assertTrue(minLengths != null);
- assertEquals(maxLengths.intValue(), -minLengths.intValue());
- }
-
- private static class SplitFn extends MapFn<String, Pair<String, String>> {
- @Override
- public Pair<String, String> map(String input) {
- String[] p = input.split("\\s+");
- return Pair.of(p[0], p[1]);
- }
- }
-
- @Test
- public void testCollectUrls() throws Exception {
- Pipeline p = new MRPipeline(AggregateIT.class, tmpDir.getDefaultConfiguration());
- String urlsInputPath = tmpDir.copyResourceFileName("urls.txt");
- PTable<String, Collection<String>> urls = Aggregate.collectValues(p.readTextFile(urlsInputPath).parallelDo(
- new SplitFn(), tableOf(strings(), strings())));
- for (Pair<String, Collection<String>> e : urls.materialize()) {
- String key = e.first();
- int expectedSize = 0;
- if ("www.A.com".equals(key)) {
- expectedSize = 4;
- } else if ("www.B.com".equals(key) || "www.F.com".equals(key)) {
- expectedSize = 2;
- } else if ("www.C.com".equals(key) || "www.D.com".equals(key) || "www.E.com".equals(key)) {
- expectedSize = 1;
- }
- assertEquals("Checking key = " + key, expectedSize, e.second().size());
- p.done();
- }
- }
-
- @Test
- public void testTopN() throws Exception {
- PTableType<String, Integer> ptype = Avros.tableOf(Avros.strings(), Avros.ints());
- PTable<String, Integer> counts = MemPipeline.typedTableOf(ptype, "foo", 12, "bar", 17, "baz", 29);
-
- PTable<String, Integer> top2 = Aggregate.top(counts, 2, true);
- assertEquals(ImmutableList.of(Pair.of("baz", 29), Pair.of("bar", 17)), top2.materialize());
-
- PTable<String, Integer> bottom2 = Aggregate.top(counts, 2, false);
- assertEquals(ImmutableList.of(Pair.of("foo", 12), Pair.of("bar", 17)), bottom2.materialize());
- }
-
- @Test
- public void testCollectValues_Writables() throws IOException {
- Pipeline pipeline = new MRPipeline(AggregateIT.class, tmpDir.getDefaultConfiguration());
- Map<Integer, Collection<Text>> collectionMap = pipeline.readTextFile(tmpDir.copyResourceFileName("set2.txt"))
- .parallelDo(new MapStringToTextPair(), Writables.tableOf(Writables.ints(), Writables.writables(Text.class)))
- .collectValues().materializeToMap();
-
- assertEquals(1, collectionMap.size());
-
- assertTrue(collectionMap.get(1).containsAll(Lists.newArrayList(new Text("c"), new Text("d"), new Text("a"))));
- }
-
- @Test
- public void testCollectValues_Avro() throws IOException {
-
- MapStringToEmployeePair mapFn = new MapStringToEmployeePair();
- Pipeline pipeline = new MRPipeline(AggregateIT.class, tmpDir.getDefaultConfiguration());
- Map<Integer, Collection<Employee>> collectionMap = pipeline.readTextFile(tmpDir.copyResourceFileName("set2.txt"))
- .parallelDo(mapFn, Avros.tableOf(Avros.ints(), Avros.records(Employee.class))).collectValues()
- .materializeToMap();
-
- assertEquals(1, collectionMap.size());
-
- Employee empC = mapFn.map("c").second();
- Employee empD = mapFn.map("d").second();
- Employee empA = mapFn.map("a").second();
-
- assertTrue(collectionMap.get(1).containsAll(Lists.newArrayList(empC, empD, empA)));
- }
-
- private static class MapStringToTextPair extends MapFn<String, Pair<Integer, Text>> {
- @Override
- public Pair<Integer, Text> map(String input) {
- return Pair.of(1, new Text(input));
- }
- }
-
- private static class MapStringToEmployeePair extends MapFn<String, Pair<Integer, Employee>> {
- @Override
- public Pair<Integer, Employee> map(String input) {
- Employee emp = new Employee();
- emp.name = input;
- emp.salary = 0;
- emp.department = "";
- return Pair.of(1, emp);
- }
- }
-
- public static class PojoText {
- private String value;
-
- public PojoText() {
- this("");
- }
-
- public PojoText(String value) {
- this.value = value;
- }
-
- public String getValue() {
- return value;
- }
-
- public void setValue(String value) {
- this.value = value;
- }
-
- @Override
- public String toString() {
- return String.format("PojoText<%s>", this.value);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- PojoText other = (PojoText) obj;
- if (value == null) {
- if (other.value != null)
- return false;
- } else if (!value.equals(other.value))
- return false;
- return true;
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/lib/AvroTypeSortIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/AvroTypeSortIT.java b/crunch/src/it/java/org/apache/crunch/lib/AvroTypeSortIT.java
deleted file mode 100644
index a832a5d..0000000
--- a/crunch/src/it/java/org/apache/crunch/lib/AvroTypeSortIT.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib;
-
-import static junit.framework.Assert.assertEquals;
-import static org.apache.crunch.types.avro.Avros.ints;
-import static org.apache.crunch.types.avro.Avros.records;
-import static org.apache.crunch.types.avro.Avros.strings;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.PCollection;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.At;
-import org.apache.crunch.test.Person;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-/**
- * Test sorting Avro types by selected inner field
- */
-public class AvroTypeSortIT implements Serializable {
-
- private static final long serialVersionUID = 1344118240353796561L;
-
- private transient File avroFile;
- @Rule
- public transient TemporaryPath tmpDir = TemporaryPaths.create();
-
- @Before
- public void setUp() throws IOException {
- avroFile = File.createTempFile("avrotest", ".avro");
- }
-
- @After
- public void tearDown() {
- avroFile.delete();
- }
-
- @Test
- public void testSortAvroTypesBySelectedFields() throws Exception {
-
- MRPipeline pipeline = new MRPipeline(AvroTypeSortIT.class, tmpDir.getDefaultConfiguration());
-
- Person ccc10 = createPerson("CCC", 10);
- Person bbb20 = createPerson("BBB", 20);
- Person aaa30 = createPerson("AAA", 30);
-
- writeAvroFile(Lists.newArrayList(ccc10, bbb20, aaa30), avroFile);
-
- PCollection<Person> unsorted = pipeline.read(At.avroFile(avroFile.getAbsolutePath(), records(Person.class)));
-
- // Sort by Name
- MapFn<Person, String> nameExtractor = new MapFn<Person, String>() {
-
- @Override
- public String map(Person input) {
- return input.name.toString();
- }
- };
-
- PCollection<Person> sortedByName = unsorted.by(nameExtractor, strings()).groupByKey().ungroup().values();
-
- List<Person> sortedByNameList = Lists.newArrayList(sortedByName.materialize());
-
- assertEquals(3, sortedByNameList.size());
- assertEquals(aaa30, sortedByNameList.get(0));
- assertEquals(bbb20, sortedByNameList.get(1));
- assertEquals(ccc10, sortedByNameList.get(2));
-
- // Sort by Age
-
- MapFn<Person, Integer> ageExtractor = new MapFn<Person, Integer>() {
-
- @Override
- public Integer map(Person input) {
- return input.age;
- }
- };
-
- PCollection<Person> sortedByAge = unsorted.by(ageExtractor, ints()).groupByKey().ungroup().values();
-
- List<Person> sortedByAgeList = Lists.newArrayList(sortedByAge.materialize());
-
- assertEquals(3, sortedByAgeList.size());
- assertEquals(ccc10, sortedByAgeList.get(0));
- assertEquals(bbb20, sortedByAgeList.get(1));
- assertEquals(aaa30, sortedByAgeList.get(2));
-
- pipeline.done();
- }
-
- private void writeAvroFile(List<Person> people, File avroFile) throws IOException {
-
- FileOutputStream outputStream = new FileOutputStream(avroFile);
- SpecificDatumWriter<Person> writer = new SpecificDatumWriter<Person>(Person.class);
-
- DataFileWriter<Person> dataFileWriter = new DataFileWriter<Person>(writer);
- dataFileWriter.create(Person.SCHEMA$, outputStream);
- for (Person person : people) {
- dataFileWriter.append(person);
- }
- dataFileWriter.close();
- outputStream.close();
- }
-
- private Person createPerson(String name, int age) throws IOException {
-
- Person person = new Person();
- person.age = age;
- person.name = name;
- List<CharSequence> siblingNames = Lists.newArrayList();
- person.siblingnames = siblingNames;
-
- return person;
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java b/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java
deleted file mode 100644
index 4b28da7..0000000
--- a/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib;
-
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Emitter;
-import org.apache.crunch.PCollection;
-import org.apache.crunch.PTable;
-import org.apache.crunch.Pair;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.test.Tests;
-import org.apache.crunch.types.PTableType;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-
-
-public class CogroupIT {
- @Rule
- public TemporaryPath tmpDir = TemporaryPaths.create();
- private MRPipeline pipeline;
- private PCollection<String> lines1;
- private PCollection<String> lines2;
-
-
- @Before
- public void setUp() throws IOException {
- pipeline = new MRPipeline(CogroupIT.class, tmpDir.getDefaultConfiguration());
- lines1 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src1.txt")));
- lines2 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src2.txt")));
- }
-
- @After
- public void tearDown() {
- pipeline.done();
- }
-
- @Test
- public void testCogroupWritables() {
- runCogroup(WritableTypeFamily.getInstance());
- }
-
- @Test
- public void testCogroupAvro() {
- runCogroup(AvroTypeFamily.getInstance());
- }
-
- public void runCogroup(PTypeFamily ptf) {
- PTableType<String, String> tt = ptf.tableOf(ptf.strings(), ptf.strings());
-
- PTable<String, String> kv1 = lines1.parallelDo("kv1", new KeyValueSplit(), tt);
- PTable<String, String> kv2 = lines2.parallelDo("kv2", new KeyValueSplit(), tt);
-
- PTable<String, Pair<Collection<String>, Collection<String>>> cg = Cogroup.cogroup(kv1, kv2);
-
- Map<String, Pair<Collection<String>, Collection<String>>> actual = cg.materializeToMap();
-
- Map<String, Pair<Collection<String>, Collection<String>>> expected = ImmutableMap.of(
- "a", Pair.of(coll("1-1", "1-4"), coll()),
- "b", Pair.of(coll("1-2"), coll("2-1")),
- "c", Pair.of(coll("1-3"), coll("2-2", "2-3")),
- "d", Pair.of(coll(), coll("2-4"))
- );
-
- assertThat(actual, is(expected));
- }
-
-
- private static class KeyValueSplit extends DoFn<String, Pair<String, String>> {
- @Override
- public void process(String input, Emitter<Pair<String, String>> emitter) {
- String[] fields = input.split(",");
- emitter.emit(Pair.of(fields[0], fields[1]));
- }
- }
-
- private static Collection<String> coll(String... values) {
- return ImmutableList.copyOf(values);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/lib/SecondarySortIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/SecondarySortIT.java b/crunch/src/it/java/org/apache/crunch/lib/SecondarySortIT.java
deleted file mode 100644
index 242f621..0000000
--- a/crunch/src/it/java/org/apache/crunch/lib/SecondarySortIT.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib;
-
-import static org.apache.crunch.types.avro.Avros.*;
-import static org.junit.Assert.assertEquals;
-
-import java.io.Serializable;
-
-import org.apache.crunch.MapFn;
-import org.apache.crunch.PTable;
-import org.apache.crunch.Pair;
-import org.apache.crunch.Pipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.From;
-import org.apache.crunch.test.CrunchTestSupport;
-import org.junit.Test;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableList;
-
-
-public class SecondarySortIT extends CrunchTestSupport implements Serializable {
-
- @Test
- public void testSecondarySort() throws Exception {
- Pipeline p = new MRPipeline(SecondarySortIT.class, tempDir.getDefaultConfiguration());
- String inputFile = tempDir.copyResourceFileName("secondary_sort_input.txt");
-
- PTable<String, Pair<Integer, Integer>> in = p.read(From.textFile(inputFile))
- .parallelDo(new MapFn<String, Pair<String, Pair<Integer, Integer>>>() {
- @Override
- public Pair<String, Pair<Integer, Integer>> map(String input) {
- String[] pieces = input.split(",");
- return Pair.of(pieces[0],
- Pair.of(Integer.valueOf(pieces[1].trim()), Integer.valueOf(pieces[2].trim())));
- }
- }, tableOf(strings(), pairs(ints(), ints())));
- Iterable<String> lines = SecondarySort.sortAndApply(in, new MapFn<Pair<String, Iterable<Pair<Integer, Integer>>>, String>() {
- @Override
- public String map(Pair<String, Iterable<Pair<Integer, Integer>>> input) {
- Joiner j = Joiner.on(',');
- return j.join(input.first(), j.join(input.second()));
- }
- }, strings()).materialize();
- assertEquals(ImmutableList.of("one,[-5,10],[1,1],[2,-3]", "three,[0,-1]", "two,[1,7],[2,6],[4,5]"),
- ImmutableList.copyOf(lines));
- p.done();
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/lib/SetIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/SetIT.java b/crunch/src/it/java/org/apache/crunch/lib/SetIT.java
deleted file mode 100644
index d1300d2..0000000
--- a/crunch/src/it/java/org/apache/crunch/lib/SetIT.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Iterator;
-
-import org.apache.crunch.PCollection;
-import org.apache.crunch.Pipeline;
-import org.apache.crunch.Tuple3;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.At;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import com.google.common.collect.Lists;
-
-@RunWith(value = Parameterized.class)
-public class SetIT {
-
- private PTypeFamily typeFamily;
-
- private Pipeline pipeline;
- private PCollection<String> set1;
- private PCollection<String> set2;
-
- public SetIT(PTypeFamily typeFamily) {
- this.typeFamily = typeFamily;
- }
-
- @Rule
- public TemporaryPath tmpDir = TemporaryPaths.create();
-
- @Parameters
- public static Collection<Object[]> data() {
- Object[][] data = new Object[][] { { WritableTypeFamily.getInstance() }, { AvroTypeFamily.getInstance() } };
- return Arrays.asList(data);
- }
-
- @Before
- public void setUp() throws IOException {
- String set1InputPath = tmpDir.copyResourceFileName("set1.txt");
- String set2InputPath = tmpDir.copyResourceFileName("set2.txt");
- pipeline = new MRPipeline(SetIT.class, tmpDir.getDefaultConfiguration());
- set1 = pipeline.read(At.textFile(set1InputPath, typeFamily.strings()));
- set2 = pipeline.read(At.textFile(set2InputPath, typeFamily.strings()));
- }
-
- @After
- public void tearDown() {
- pipeline.done();
- }
-
- @Test
- public void testDifference() throws Exception {
- PCollection<String> difference = Set.difference(set1, set2);
- assertEquals(Lists.newArrayList("b", "e"), Lists.newArrayList(difference.materialize()));
- }
-
- @Test
- public void testIntersection() throws Exception {
- PCollection<String> intersection = Set.intersection(set1, set2);
- assertEquals(Lists.newArrayList("a", "c"), Lists.newArrayList(intersection.materialize()));
- }
-
- @Test
- public void testComm() throws Exception {
- PCollection<Tuple3<String, String, String>> comm = Set.comm(set1, set2);
- Iterator<Tuple3<String, String, String>> i = comm.materialize().iterator();
- checkEquals(null, null, "a", i.next());
- checkEquals("b", null, null, i.next());
- checkEquals(null, null, "c", i.next());
- checkEquals(null, "d", null, i.next());
- checkEquals("e", null, null, i.next());
- assertFalse(i.hasNext());
- }
-
- private void checkEquals(String s1, String s2, String s3, Tuple3<String, String, String> tuple) {
- assertEquals("first string", s1, tuple.first());
- assertEquals("second string", s2, tuple.second());
- assertEquals("third string", s3, tuple.third());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/lib/SortByValueIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/SortByValueIT.java b/crunch/src/it/java/org/apache/crunch/lib/SortByValueIT.java
deleted file mode 100644
index e19c7d3..0000000
--- a/crunch/src/it/java/org/apache/crunch/lib/SortByValueIT.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.crunch.MapFn;
-import org.apache.crunch.PCollection;
-import org.apache.crunch.PTable;
-import org.apache.crunch.Pair;
-import org.apache.crunch.Pipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.From;
-import org.apache.crunch.lib.Sort.ColumnOrder;
-import org.apache.crunch.lib.Sort.Order;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.ImmutableList;
-
-/**
- *
- */
-public class SortByValueIT {
- @Rule
- public transient TemporaryPath tmpDir = TemporaryPaths.create();
-
- private static class SplitFn extends MapFn<String, Pair<String, Long>> {
- private String sep;
-
- public SplitFn(String sep) {
- this.sep = sep;
- }
-
- @Override
- public Pair<String, Long> map(String input) {
- String[] pieces = input.split(sep);
- return Pair.of(pieces[0], Long.valueOf(pieces[1]));
- }
- }
-
- @Test
- public void testSortByValueWritables() throws Exception {
- run(new MRPipeline(SortByValueIT.class), WritableTypeFamily.getInstance());
- }
-
- @Test
- public void testSortByValueAvro() throws Exception {
- run(new MRPipeline(SortByValueIT.class), AvroTypeFamily.getInstance());
- }
-
- public void run(Pipeline pipeline, PTypeFamily ptf) throws Exception {
- String sbv = tmpDir.copyResourceFileName("sort_by_value.txt");
- PTable<String, Long> letterCounts = pipeline.read(From.textFile(sbv)).parallelDo(new SplitFn("\t"),
- ptf.tableOf(ptf.strings(), ptf.longs()));
- PCollection<Pair<String, Long>> sorted = Sort.sortPairs(
- letterCounts,
- new ColumnOrder(2, Order.DESCENDING),
- new ColumnOrder(1, Order.ASCENDING));
- assertEquals(
- ImmutableList.of(Pair.of("C", 3L), Pair.of("A", 2L), Pair.of("D", 2L), Pair.of("B", 1L), Pair.of("E", 1L)),
- ImmutableList.copyOf(sorted.materialize()));
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/lib/SortIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/SortIT.java b/crunch/src/it/java/org/apache/crunch/lib/SortIT.java
deleted file mode 100644
index bad4864..0000000
--- a/crunch/src/it/java/org/apache/crunch/lib/SortIT.java
+++ /dev/null
@@ -1,327 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib;
-
-import static org.apache.crunch.lib.Sort.ColumnOrder.by;
-import static org.apache.crunch.lib.Sort.Order.ASCENDING;
-import static org.apache.crunch.lib.Sort.Order.DESCENDING;
-import static org.apache.crunch.test.StringWrapper.wrap;
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Emitter;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.PCollection;
-import org.apache.crunch.PTable;
-import org.apache.crunch.Pair;
-import org.apache.crunch.Pipeline;
-import org.apache.crunch.Tuple3;
-import org.apache.crunch.Tuple4;
-import org.apache.crunch.TupleN;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.lib.Sort.ColumnOrder;
-import org.apache.crunch.lib.Sort.Order;
-import org.apache.crunch.test.StringWrapper;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.avro.Avros;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-public class SortIT implements Serializable {
- @Rule
- public transient TemporaryPath tmpDir = TemporaryPaths.create();
-
- @Test
- public void testWritableSortAsc() throws Exception {
- runSingle(new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance(), Order.ASCENDING,
- "A\tand this text as well");
- }
-
- @Test
- public void testWritableSortDesc() throws Exception {
- runSingle(new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance(), Order.DESCENDING,
- "B\tthis doc has some text");
- }
-
- @Test
- public void testWritableSortAscDesc() throws Exception {
- runPair(new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING), "A",
- "this doc has this text");
- }
-
- @Test
- public void testWritableSortSecondDescFirstAsc() throws Exception {
- runPair(new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance(), by(2, DESCENDING), by(1, ASCENDING), "A",
- "this doc has this text");
- }
-
- @Test
- public void testWritableSortTripleAscDescAsc() throws Exception {
- runTriple(new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING),
- by(3, ASCENDING), "A", "this", "doc");
- }
-
- @Test
- public void testWritableSortQuadAscDescAscDesc() throws Exception {
- runQuad(new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING),
- by(3, ASCENDING), by(4, DESCENDING), "A", "this", "doc", "has");
- }
-
- @Test
- public void testWritableSortTupleNAscDesc() throws Exception {
- runTupleN(new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance(),
- new ColumnOrder[] { by(1, ASCENDING), by(2, DESCENDING) }, new String[] { "A", "this doc has this text" });
- }
-
- @Test
- public void testWritableSortTable() throws Exception {
- runTable(new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance(), "A");
- }
-
- @Test
- public void testAvroSortAsc() throws Exception {
- runSingle(new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance(), Order.ASCENDING, "A\tand this text as well");
- }
-
- @Test
- public void testAvroSortDesc() throws Exception {
- runSingle(new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance(), Order.DESCENDING, "B\tthis doc has some text");
- }
-
- @Test
- public void testAvroSortPairAscDesc() throws Exception {
- runPair(new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING), "A",
- "this doc has this text");
- }
-
- @Test
- public void testAvroSortPairSecondDescFirstAsc() throws Exception {
- runPair(new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance(), by(2, DESCENDING), by(1, ASCENDING), "A",
- "this doc has this text");
- }
-
- @Test
- public void testAvroSortTripleAscDescAsc() throws Exception {
- runTriple(new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING),
- by(3, ASCENDING), "A", "this", "doc");
- }
-
- @Test
- public void testAvroSortQuadAscDescAscDesc() throws Exception {
- runQuad(new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING),
- by(3, ASCENDING), by(4, DESCENDING), "A", "this", "doc", "has");
- }
-
- @Test
- public void testAvroSortTupleNAscDesc() throws Exception {
- runTupleN(new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance(),
- new ColumnOrder[] { by(1, ASCENDING), by(2, DESCENDING) }, new String[] { "A", "this doc has this text" });
- }
-
- @Test
- public void testAvroReflectSortPair() throws IOException {
- Pipeline pipeline = new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration());
- pipeline.enableDebug();
- String rsrc = tmpDir.copyResourceFileName("set2.txt");
- PCollection<Pair<String, StringWrapper>> in = pipeline.readTextFile(rsrc)
- .parallelDo(new MapFn<String, Pair<String, StringWrapper>>() {
-
- @Override
- public Pair<String, StringWrapper> map(String input) {
- return Pair.of(input, wrap(input));
- }
- }, Avros.pairs(Avros.strings(), Avros.reflects(StringWrapper.class)));
- PCollection<Pair<String, StringWrapper>> sorted = Sort.sort(in, Order.ASCENDING);
-
- List<Pair<String, StringWrapper>> expected = Lists.newArrayList();
- expected.add(Pair.of("a", wrap("a")));
- expected.add(Pair.of("c", wrap("c")));
- expected.add(Pair.of("d", wrap("d")));
-
- assertEquals(expected, Lists.newArrayList(sorted.materialize()));
- }
-
- @Test
- public void testAvroReflectSortTable() throws IOException {
- Pipeline pipeline = new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration());
- PTable<String, StringWrapper> unsorted = pipeline.readTextFile(tmpDir.copyResourceFileName("set2.txt")).parallelDo(
- new MapFn<String, Pair<String, StringWrapper>>() {
-
- @Override
- public Pair<String, StringWrapper> map(String input) {
- return Pair.of(input, wrap(input));
- }
- }, Avros.tableOf(Avros.strings(), Avros.reflects(StringWrapper.class)));
-
- PTable<String, StringWrapper> sorted = Sort.sort(unsorted);
-
- List<Pair<String, StringWrapper>> expected = Lists.newArrayList();
- expected.add(Pair.of("a", wrap("a")));
- expected.add(Pair.of("c", wrap("c")));
- expected.add(Pair.of("d", wrap("d")));
-
- assertEquals(expected, Lists.newArrayList(sorted.materialize()));
- }
-
- @Test
- public void testAvroSortTable() throws Exception {
- runTable(new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance(), "A");
- }
-
- private void runSingle(Pipeline pipeline, PTypeFamily typeFamily, Order order, String firstLine) throws IOException {
- String inputPath = tmpDir.copyResourceFileName("docs.txt");
-
- PCollection<String> input = pipeline.readTextFile(inputPath);
- // following turns the input from Writables to required type family
- PCollection<String> input2 = input.parallelDo(new DoFn<String, String>() {
- @Override
- public void process(String input, Emitter<String> emitter) {
- emitter.emit(input);
- }
- }, typeFamily.strings());
- PCollection<String> sorted = Sort.sort(input2, order);
- Iterable<String> lines = sorted.materialize();
-
- assertEquals(firstLine, lines.iterator().next());
- pipeline.done(); // TODO: finally
- }
-
- private void runPair(Pipeline pipeline, PTypeFamily typeFamily, ColumnOrder first, ColumnOrder second,
- String firstField, String secondField) throws IOException {
- String inputPath = tmpDir.copyResourceFileName("docs.txt");
-
- PCollection<String> input = pipeline.readTextFile(inputPath);
- PTable<String, String> kv = input.parallelDo(new DoFn<String, Pair<String, String>>() {
- @Override
- public void process(String input, Emitter<Pair<String, String>> emitter) {
- String[] split = input.split("[\t]+");
- emitter.emit(Pair.of(split[0], split[1]));
- }
- }, typeFamily.tableOf(typeFamily.strings(), typeFamily.strings()));
- PCollection<Pair<String, String>> sorted = Sort.sortPairs(kv, first, second);
- List<Pair<String, String>> lines = Lists.newArrayList(sorted.materialize());
- Pair<String, String> l = lines.iterator().next();
- assertEquals(firstField, l.first());
- assertEquals(secondField, l.second());
- pipeline.done();
- }
-
- private void runTriple(Pipeline pipeline, PTypeFamily typeFamily, ColumnOrder first, ColumnOrder second,
- ColumnOrder third, String firstField, String secondField, String thirdField) throws IOException {
- String inputPath = tmpDir.copyResourceFileName("docs.txt");
-
- PCollection<String> input = pipeline.readTextFile(inputPath);
- PCollection<Tuple3<String, String, String>> kv = input.parallelDo(
- new DoFn<String, Tuple3<String, String, String>>() {
- @Override
- public void process(String input, Emitter<Tuple3<String, String, String>> emitter) {
- String[] split = input.split("[\t ]+");
- int len = split.length;
- emitter.emit(Tuple3.of(split[0], split[1 % len], split[2 % len]));
- }
- }, typeFamily.triples(typeFamily.strings(), typeFamily.strings(), typeFamily.strings()));
- PCollection<Tuple3<String, String, String>> sorted = Sort.sortTriples(kv, first, second, third);
- List<Tuple3<String, String, String>> lines = Lists.newArrayList(sorted.materialize());
- Tuple3<String, String, String> l = lines.iterator().next();
- assertEquals(firstField, l.first());
- assertEquals(secondField, l.second());
- assertEquals(thirdField, l.third());
- pipeline.done();
- }
-
- private void runQuad(Pipeline pipeline, PTypeFamily typeFamily, ColumnOrder first, ColumnOrder second,
- ColumnOrder third, ColumnOrder fourth, String firstField, String secondField, String thirdField,
- String fourthField) throws IOException {
- String inputPath = tmpDir.copyResourceFileName("docs.txt");
-
- PCollection<String> input = pipeline.readTextFile(inputPath);
- PCollection<Tuple4<String, String, String, String>> kv = input.parallelDo(
- new DoFn<String, Tuple4<String, String, String, String>>() {
- @Override
- public void process(String input, Emitter<Tuple4<String, String, String, String>> emitter) {
- String[] split = input.split("[\t ]+");
- int len = split.length;
- emitter.emit(Tuple4.of(split[0], split[1 % len], split[2 % len], split[3 % len]));
- }
- }, typeFamily.quads(typeFamily.strings(), typeFamily.strings(), typeFamily.strings(), typeFamily.strings()));
- PCollection<Tuple4<String, String, String, String>> sorted = Sort.sortQuads(kv, first, second, third, fourth);
- Iterable<Tuple4<String, String, String, String>> lines = sorted.materialize();
- Tuple4<String, String, String, String> l = lines.iterator().next();
- assertEquals(firstField, l.first());
- assertEquals(secondField, l.second());
- assertEquals(thirdField, l.third());
- assertEquals(fourthField, l.fourth());
- pipeline.done();
- }
-
- private void runTupleN(Pipeline pipeline, PTypeFamily typeFamily, ColumnOrder[] orders, String[] fields)
- throws IOException {
- String inputPath = tmpDir.copyResourceFileName("docs.txt");
-
- PCollection<String> input = pipeline.readTextFile(inputPath);
- PType[] types = new PType[orders.length];
- Arrays.fill(types, typeFamily.strings());
- PCollection<TupleN> kv = input.parallelDo(new DoFn<String, TupleN>() {
- @Override
- public void process(String input, Emitter<TupleN> emitter) {
- String[] split = input.split("[\t]+");
- emitter.emit(new TupleN(split));
- }
- }, typeFamily.tuples(types));
- PCollection<TupleN> sorted = Sort.sortTuples(kv, orders);
- Iterable<TupleN> lines = sorted.materialize();
- TupleN l = lines.iterator().next();
- int i = 0;
- for (String field : fields) {
- assertEquals(field, l.get(i++));
- }
- pipeline.done();
- }
-
- private void runTable(Pipeline pipeline, PTypeFamily typeFamily, String firstKey) throws IOException {
- String inputPath = tmpDir.copyResourceFileName("docs.txt");
-
- PCollection<String> input = pipeline.readTextFile(inputPath);
- PTable<String, String> table = input.parallelDo(new DoFn<String, Pair<String, String>>() {
- @Override
- public void process(String input, Emitter<Pair<String, String>> emitter) {
- String[] split = input.split("[\t]+");
- emitter.emit(Pair.of(split[0], split[1]));
- }
- }, typeFamily.tableOf(typeFamily.strings(), typeFamily.strings()));
-
- PTable<String, String> sorted = Sort.sort(table);
- Iterable<Pair<String, String>> lines = sorted.materialize();
- Pair<String, String> l = lines.iterator().next();
- assertEquals(firstKey, l.first());
- pipeline.done();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java b/crunch/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java
deleted file mode 100644
index 5292353..0000000
--- a/crunch/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib;
-
-import static junit.framework.Assert.assertEquals;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.PCollection;
-import org.apache.crunch.PTable;
-import org.apache.crunch.Pair;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.At;
-import org.apache.crunch.test.Person;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.avro.Avros;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-/**
- * Test {@link org.apache.crunch.types.avro.SafeAvroSerialization} with Specific Avro types
- */
-public class SpecificAvroGroupByIT implements Serializable {
-
- private static final long serialVersionUID = 1344118240353796561L;
-
- private transient File avroFile;
- @Rule
- public transient TemporaryPath tmpDir = TemporaryPaths.create();
-
-
- @Before
- public void setUp() throws IOException {
- avroFile = File.createTempFile("avrotest", ".avro");
- }
-
- @After
- public void tearDown() {
- avroFile.delete();
- }
-
- @Test
- public void testGrouByWithSpecificAvroType() throws Exception {
- MRPipeline pipeline = new MRPipeline(SpecificAvroGroupByIT.class, tmpDir.getDefaultConfiguration());
- testSpecificAvro(pipeline);
- }
-
- public void testSpecificAvro(MRPipeline pipeline) throws Exception {
-
- createPersonAvroFile(avroFile);
-
- PCollection<Person> unsorted = pipeline.read(At.avroFile(avroFile.getAbsolutePath(), Avros.records(Person.class)));
-
- PTable<String, Person> sorted = unsorted.parallelDo(new MapFn<Person, Pair<String, Person>>() {
-
- @Override
- public Pair<String, Person> map(Person input) {
- String key = input.name.toString();
- return Pair.of(key, input);
-
- }
- }, Avros.tableOf(Avros.strings(), Avros.records(Person.class))).groupByKey().ungroup();
-
- List<Pair<String, Person>> outputPersonList = Lists.newArrayList(sorted.materialize());
-
- assertEquals(1, outputPersonList.size());
- assertEquals(String.class, outputPersonList.get(0).first().getClass());
- assertEquals(Person.class, outputPersonList.get(0).second().getClass());
-
- pipeline.done();
- }
-
- private void createPersonAvroFile(File avroFile) throws IOException {
-
- Person person = new Person();
- person.age = 40;
- person.name = "Bob";
- List<CharSequence> siblingNames = Lists.newArrayList();
- siblingNames.add("Bob" + "1");
- siblingNames.add("Bob" + "2");
- person.siblingnames = siblingNames;
-
- FileOutputStream outputStream = new FileOutputStream(avroFile);
- SpecificDatumWriter<Person> writer = new SpecificDatumWriter<Person>(Person.class);
-
- DataFileWriter<Person> dataFileWriter = new DataFileWriter<Person>(writer);
- dataFileWriter.create(Person.SCHEMA$, outputStream);
- dataFileWriter.append(person);
- dataFileWriter.close();
- outputStream.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/lib/join/FullOuterJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/join/FullOuterJoinIT.java b/crunch/src/it/java/org/apache/crunch/lib/join/FullOuterJoinIT.java
deleted file mode 100644
index 63d594d..0000000
--- a/crunch/src/it/java/org/apache/crunch/lib/join/FullOuterJoinIT.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib.join;
-
-import static org.junit.Assert.assertTrue;
-
-import org.apache.crunch.Pair;
-import org.apache.crunch.types.PTypeFamily;
-
-public class FullOuterJoinIT extends JoinTester {
- @Override
- public void assertPassed(Iterable<Pair<String, Long>> lines) {
- boolean passed1 = false;
- boolean passed2 = false;
- boolean passed3 = false;
- for (Pair<String, Long> line : lines) {
- if ("wretched".equals(line.first()) && 24 == line.second()) {
- passed1 = true;
- }
- if ("againe".equals(line.first()) && 10 == line.second()) {
- passed2 = true;
- }
- if ("Montparnasse.".equals(line.first()) && 2 == line.second()) {
- passed3 = true;
- }
- }
- assertTrue(passed1);
- assertTrue(passed2);
- assertTrue(passed3);
- }
-
- @Override
- protected JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily) {
- return new FullOuterJoinFn<String, Long, Long>(typeFamily.strings(), typeFamily.longs());
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/lib/join/InnerJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/join/InnerJoinIT.java b/crunch/src/it/java/org/apache/crunch/lib/join/InnerJoinIT.java
deleted file mode 100644
index 4759050..0000000
--- a/crunch/src/it/java/org/apache/crunch/lib/join/InnerJoinIT.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib.join;
-
-import static org.junit.Assert.assertTrue;
-
-import org.apache.crunch.Pair;
-import org.apache.crunch.types.PTypeFamily;
-
-public class InnerJoinIT extends JoinTester {
- @Override
- public void assertPassed(Iterable<Pair<String, Long>> lines) {
- boolean passed1 = false;
- boolean passed2 = true;
- boolean passed3 = true;
- for (Pair<String, Long> line : lines) {
- if ("wretched".equals(line.first()) && 24 == line.second()) {
- passed1 = true;
- }
- if ("againe".equals(line.first())) {
- passed2 = false;
- }
- if ("Montparnasse.".equals(line.first())) {
- passed3 = false;
- }
- }
- assertTrue(passed1);
- assertTrue(passed2);
- assertTrue(passed3);
- }
-
- @Override
- protected JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily) {
- return new InnerJoinFn<String, Long, Long>(typeFamily.strings(), typeFamily.longs());
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/lib/join/JoinTester.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/join/JoinTester.java b/crunch/src/it/java/org/apache/crunch/lib/join/JoinTester.java
deleted file mode 100644
index 3e8ffda..0000000
--- a/crunch/src/it/java/org/apache/crunch/lib/join/JoinTester.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
-R * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib.join;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Emitter;
-import org.apache.crunch.PCollection;
-import org.apache.crunch.PTable;
-import org.apache.crunch.Pair;
-import org.apache.crunch.Pipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.lib.Aggregate;
-import org.apache.crunch.lib.Join;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PTableType;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-import org.junit.Rule;
-import org.junit.Test;
-
-public abstract class JoinTester implements Serializable {
- private static class WordSplit extends DoFn<String, String> {
- @Override
- public void process(String input, Emitter<String> emitter) {
- for (String word : input.split("\\s+")) {
- emitter.emit(word);
- }
- }
- }
-
- protected PTable<String, Long> join(PCollection<String> w1, PCollection<String> w2, PTypeFamily ptf) {
- PTableType<String, Long> ntt = ptf.tableOf(ptf.strings(), ptf.longs());
- PTable<String, Long> ws1 = Aggregate.count(w1.parallelDo("ws1", new WordSplit(), ptf.strings()));
- PTable<String, Long> ws2 = Aggregate.count(w2.parallelDo("ws2", new WordSplit(), ptf.strings()));
-
- PTable<String, Pair<Long, Long>> join = Join.join(ws1, ws2, getJoinFn(ptf));
-
- PTable<String, Long> sums = join.parallelDo("cnt", new DoFn<Pair<String, Pair<Long, Long>>, Pair<String, Long>>() {
- @Override
- public void process(Pair<String, Pair<Long, Long>> input, Emitter<Pair<String, Long>> emitter) {
- Pair<Long, Long> pair = input.second();
- long sum = (pair.first() != null ? pair.first() : 0) + (pair.second() != null ? pair.second() : 0);
- emitter.emit(Pair.of(input.first(), sum));
- }
- }, ntt);
-
- return sums;
- }
-
- protected void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
- String shakesInputPath = tmpDir.copyResourceFileName("shakes.txt");
- String maughamInputPath = tmpDir.copyResourceFileName("maugham.txt");
-
- PCollection<String> shakespeare = pipeline.readTextFile(shakesInputPath);
- PCollection<String> maugham = pipeline.readTextFile(maughamInputPath);
- PTable<String, Long> joined = join(shakespeare, maugham, typeFamily);
- Iterable<Pair<String, Long>> lines = joined.materialize();
-
- assertPassed(lines);
-
- pipeline.done();
- }
- @Rule
- public transient TemporaryPath tmpDir = TemporaryPaths.create();
-
- @Test
- public void testWritableJoin() throws Exception {
- run(new MRPipeline(InnerJoinIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance());
- }
-
- @Test
- public void testAvroJoin() throws Exception {
- run(new MRPipeline(InnerJoinIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance());
- }
-
- /**
- * Used to check that the result of the join makes sense.
- *
- * @param lines
- * The result of the join.
- */
- public abstract void assertPassed(Iterable<Pair<String, Long>> lines);
-
- /**
- * @return The JoinFn to use.
- */
- protected abstract JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily);
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/lib/join/LeftOuterJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/join/LeftOuterJoinIT.java b/crunch/src/it/java/org/apache/crunch/lib/join/LeftOuterJoinIT.java
deleted file mode 100644
index 4ad2a81..0000000
--- a/crunch/src/it/java/org/apache/crunch/lib/join/LeftOuterJoinIT.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib.join;
-
-import static org.junit.Assert.assertTrue;
-
-import org.apache.crunch.Pair;
-import org.apache.crunch.types.PTypeFamily;
-
-public class LeftOuterJoinIT extends JoinTester {
- @Override
- public void assertPassed(Iterable<Pair<String, Long>> lines) {
- boolean passed1 = false;
- boolean passed2 = false;
- boolean passed3 = true;
- for (Pair<String, Long> line : lines) {
- if ("wretched".equals(line.first()) && 24 == line.second()) {
- passed1 = true;
- }
- if ("againe".equals(line.first()) && 10 == line.second()) {
- passed2 = true;
- }
- if ("Montparnasse.".equals(line.first())) {
- passed3 = false;
- }
- }
- assertTrue(passed1);
- assertTrue(passed2);
- assertTrue(passed3);
- }
-
- @Override
- protected JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily) {
- return new LeftOuterJoinFn<String, Long, Long>(typeFamily.strings(), typeFamily.longs());
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java b/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java
deleted file mode 100644
index 8bb5586..0000000
--- a/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib.join;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.crunch.MapFn;
-import org.apache.crunch.PTable;
-import org.apache.crunch.Pair;
-import org.apache.crunch.Pipeline;
-import org.apache.crunch.PipelineResult;
-import org.apache.crunch.fn.FilterFns;
-import org.apache.crunch.fn.MapValuesFn;
-import org.apache.crunch.impl.mem.MemPipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.writable.Writables;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-public class MapsideJoinIT {
-
- private static String saveTempDir;
-
- @BeforeClass
- public static void setUpClass(){
-
- // Ensure a consistent temporary directory for use of the DistributedCache.
-
- // The DistributedCache technically isn't supported when running in local mode, and the default
- // temporary directiory "/tmp" is used as its location. This typically only causes an issue when
- // running integration tests on Mac OS X, as OS X doesn't use "/tmp" as it's default temporary
- // directory. The following call ensures that "/tmp" is used as the temporary directory on all platforms.
- saveTempDir = System.setProperty("java.io.tmpdir", "/tmp");
- }
-
- @AfterClass
- public static void tearDownClass(){
- System.setProperty("java.io.tmpdir", saveTempDir);
- }
-
- private static class LineSplitter extends MapFn<String, Pair<Integer, String>> {
- @Override
- public Pair<Integer, String> map(String input) {
- String[] fields = input.split("\\|");
- return Pair.of(Integer.parseInt(fields[0]), fields[1]);
- }
- }
-
- private static class CapOrdersFn extends MapValuesFn<Integer, String, String> {
- @Override
- public String map(String v) {
- return v.toUpperCase();
- }
- }
-
- private static class ConcatValuesFn extends MapValuesFn<Integer, Pair<String, String>, String> {
- @Override
- public String map(Pair<String, String> v) {
- return v.toString();
- }
- }
-
- @Rule
- public TemporaryPath tmpDir = TemporaryPaths.create();
-
- @Test
- public void testMapSideJoin_MemPipeline() {
- runMapsideJoin(MemPipeline.getInstance(), true);
- }
-
- @Test
- public void testMapsideJoin_RightSideIsEmpty() throws IOException {
- MRPipeline pipeline = new MRPipeline(MapsideJoinIT.class, tmpDir.getDefaultConfiguration());
- PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");
- PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
-
- PTable<Integer, String> filteredOrderTable = orderTable
- .parallelDo(FilterFns.<Pair<Integer, String>>REJECT_ALL(), orderTable.getPTableType());
-
- PTable<Integer, Pair<String, String>> joined = MapsideJoin.join(customerTable, filteredOrderTable);
-
- List<Pair<Integer, Pair<String, String>>> materializedJoin = Lists.newArrayList(joined.materialize());
-
- assertTrue(materializedJoin.isEmpty());
- }
-
- @Test
- public void testMapsideJoin() throws IOException {
- runMapsideJoin(new MRPipeline(MapsideJoinIT.class, tmpDir.getDefaultConfiguration()), false);
- }
-
- private void runMapsideJoin(Pipeline pipeline, boolean inMemory) {
- PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");
- PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
-
- PTable<Integer, String> custOrders = MapsideJoin.join(customerTable, orderTable)
- .parallelDo("concat", new ConcatValuesFn(), Writables.tableOf(Writables.ints(), Writables.strings()));
-
- PTable<Integer, String> ORDER_TABLE = orderTable.parallelDo(new CapOrdersFn(), orderTable.getPTableType());
-
- PTable<Integer, Pair<String, String>> joined = MapsideJoin.join(custOrders, ORDER_TABLE);
-
- List<Pair<Integer, Pair<String, String>>> expectedJoinResult = Lists.newArrayList();
- expectedJoinResult.add(Pair.of(111, Pair.of("[John Doe,Corn flakes]", "CORN FLAKES")));
- expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet paper]", "TOILET PAPER")));
- expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet paper]", "TOILET PLUNGER")));
- expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet plunger]", "TOILET PAPER")));
- expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet plunger]", "TOILET PLUNGER")));
- expectedJoinResult.add(Pair.of(333, Pair.of("[Someone Else,Toilet brush]", "TOILET BRUSH")));
- Iterable<Pair<Integer, Pair<String, String>>> iter = joined.materialize();
-
- PipelineResult res = pipeline.run();
- if (!inMemory) {
- assertEquals(2, res.getStageResults().size());
- }
-
- List<Pair<Integer, Pair<String, String>>> joinedResultList = Lists.newArrayList(iter);
- Collections.sort(joinedResultList);
-
- assertEquals(expectedJoinResult, joinedResultList);
- }
-
- private PTable<Integer, String> readTable(Pipeline pipeline, String filename) {
- try {
- return pipeline.readTextFile(tmpDir.copyResourceFileName(filename)).parallelDo("asTable", new LineSplitter(),
- Writables.tableOf(Writables.ints(), Writables.strings()));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
-}