You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/04/23 22:41:23 UTC
[21/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/PObjectsIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/PObjectsIT.java b/crunch/src/it/java/org/apache/crunch/PObjectsIT.java
deleted file mode 100644
index 6ee849f..0000000
--- a/crunch/src/it/java/org/apache/crunch/PObjectsIT.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.lang.Integer;
-import java.lang.Iterable;
-import java.lang.String;
-import java.util.Iterator;
-
-import org.apache.crunch.PCollection;
-import org.apache.crunch.impl.mem.MemPipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.materialize.pobject.PObjectImpl;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.junit.Rule;
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class PObjectsIT {
-
- private static final Integer LINES_IN_SHAKES = 3667;
-
- @Rule
- public TemporaryPath tmpDir = TemporaryPaths.create();
-
- /**
- * A mock PObject that should map PCollections of strings to an integer count of the number of
- * elements in the underlying PCollection.
- */
- public static class MockPObjectImpl extends PObjectImpl<String, Integer> {
- private int numProcessCalls;
-
- public MockPObjectImpl(PCollection<String> collect) {
- super(collect);
- numProcessCalls = 0;
- }
-
- @Override
- public Integer process(Iterable<String> input) {
- numProcessCalls++;
- int i = 0;
- Iterator<String> itr = input.iterator();
- while (itr.hasNext()) {
- i++;
- itr.next();
- }
- return i;
- }
-
- public int getNumProcessCalls() {
- return numProcessCalls;
- }
- }
-
- @Test
- public void testMRPipeline() throws IOException {
- run(new MRPipeline(PObjectsIT.class, tmpDir.getDefaultConfiguration()));
- }
-
- @Test
- public void testInMemoryPipeline() throws IOException {
- run(MemPipeline.getInstance());
- }
-
- public void run(Pipeline pipeline) throws IOException {
- String shakesInputPath = tmpDir.copyResourceFileName("shakes.txt");
- PCollection<String> shakespeare = pipeline.readTextFile(shakesInputPath);
- MockPObjectImpl lineCount = new MockPObjectImpl(shakespeare);
- // Get the line count once and verify it's correctness.
- assertEquals("Incorrect number of lines counted from PCollection.", LINES_IN_SHAKES,
- lineCount.getValue());
- // And do it again.
- assertEquals("Incorrect number of lines counted from PCollection.", LINES_IN_SHAKES,
- lineCount.getValue());
- // Make sure process was called only once because the PObject's value was cached after the
- // first call.
- assertEquals("Process on PObject not called exactly 1 times.", 1,
- lineCount.getNumProcessCalls());
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/PTableKeyValueIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/PTableKeyValueIT.java b/crunch/src/it/java/org/apache/crunch/PTableKeyValueIT.java
deleted file mode 100644
index d56e122..0000000
--- a/crunch/src/it/java/org/apache/crunch/PTableKeyValueIT.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-
-import junit.framework.Assert;
-
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.At;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import com.google.common.collect.Lists;
-
-@RunWith(value = Parameterized.class)
-public class PTableKeyValueIT implements Serializable {
-
- private static final long serialVersionUID = 4374227704751746689L;
-
- private transient PTypeFamily typeFamily;
- private transient MRPipeline pipeline;
- private transient String inputFile;
- @Rule
- public transient TemporaryPath tmpDir = TemporaryPaths.create();
-
- @Before
- public void setUp() throws IOException {
- pipeline = new MRPipeline(PTableKeyValueIT.class, tmpDir.getDefaultConfiguration());
- inputFile = tmpDir.copyResourceFileName("set1.txt");
- }
-
- @After
- public void tearDown() {
- pipeline.done();
- }
-
- public PTableKeyValueIT(PTypeFamily typeFamily) {
- this.typeFamily = typeFamily;
- }
-
- @Parameters
- public static Collection<Object[]> data() {
- Object[][] data = new Object[][] { { WritableTypeFamily.getInstance() }, { AvroTypeFamily.getInstance() } };
- return Arrays.asList(data);
- }
-
- @Test
- public void testKeysAndValues() throws Exception {
-
- PCollection<String> collection = pipeline.read(At.textFile(inputFile, typeFamily.strings()));
-
- PTable<String, String> table = collection.parallelDo(new DoFn<String, Pair<String, String>>() {
-
- @Override
- public void process(String input, Emitter<Pair<String, String>> emitter) {
- emitter.emit(Pair.of(input.toUpperCase(), input));
-
- }
- }, typeFamily.tableOf(typeFamily.strings(), typeFamily.strings()));
-
- PCollection<String> keys = table.keys();
- PCollection<String> values = table.values();
-
- ArrayList<String> keyList = Lists.newArrayList(keys.materialize().iterator());
- ArrayList<String> valueList = Lists.newArrayList(values.materialize().iterator());
-
- Assert.assertEquals(keyList.size(), valueList.size());
- for (int i = 0; i < keyList.size(); i++) {
- Assert.assertEquals(keyList.get(i), valueList.get(i).toUpperCase());
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/PageRankIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/PageRankIT.java b/crunch/src/it/java/org/apache/crunch/PageRankIT.java
deleted file mode 100644
index 6291ef8..0000000
--- a/crunch/src/it/java/org/apache/crunch/PageRankIT.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.crunch.impl.mem.MemPipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.lib.Aggregate;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.PTypes;
-import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.avro.Avros;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
-public class PageRankIT {
-
- public static class PageRankData {
- public float score;
- public float lastScore;
- public List<String> urls;
-
- public PageRankData() {
- }
-
- public PageRankData(float score, float lastScore, Iterable<String> urls) {
- this.score = score;
- this.lastScore = lastScore;
- this.urls = Lists.newArrayList(urls);
- }
-
- public PageRankData next(float newScore) {
- return new PageRankData(newScore, score, urls);
- }
-
- public float propagatedScore() {
- return score / urls.size();
- }
-
- @Override
- public String toString() {
- return score + " " + lastScore + " " + urls;
- }
- }
-
- @Rule
- public TemporaryPath tmpDir = TemporaryPaths.create();
-
- @Test
- public void testAvroReflect() throws Exception {
- PTypeFamily tf = AvroTypeFamily.getInstance();
- PType<PageRankData> prType = Avros.reflects(PageRankData.class);
- String urlInput = tmpDir.copyResourceFileName("urls.txt");
- run(new MRPipeline(PageRankIT.class, tmpDir.getDefaultConfiguration()),
- urlInput, prType, tf);
- }
-
- @Test
- public void testAvroMReflectInMemory() throws Exception {
- PTypeFamily tf = AvroTypeFamily.getInstance();
- PType<PageRankData> prType = Avros.reflects(PageRankData.class);
- String urlInput = tmpDir.copyResourceFileName("urls.txt");
- run(MemPipeline.getInstance(), urlInput, prType, tf);
- }
-
- @Test
- public void testAvroJSON() throws Exception {
- PTypeFamily tf = AvroTypeFamily.getInstance();
- PType<PageRankData> prType = PTypes.jsonString(PageRankData.class, tf);
- String urlInput = tmpDir.copyResourceFileName("urls.txt");
- run(new MRPipeline(PageRankIT.class, tmpDir.getDefaultConfiguration()),
- urlInput, prType, tf);
- }
-
- @Test
- public void testWritablesJSON() throws Exception {
- PTypeFamily tf = WritableTypeFamily.getInstance();
- PType<PageRankData> prType = PTypes.jsonString(PageRankData.class, tf);
- String urlInput = tmpDir.copyResourceFileName("urls.txt");
- run(new MRPipeline(PageRankIT.class, tmpDir.getDefaultConfiguration()),
- urlInput, prType, tf);
- }
-
- public static PTable<String, PageRankData> pageRank(PTable<String, PageRankData> input, final float d) {
- PTypeFamily ptf = input.getTypeFamily();
- PTable<String, Float> outbound = input.parallelDo(new DoFn<Pair<String, PageRankData>, Pair<String, Float>>() {
- @Override
- public void process(Pair<String, PageRankData> input, Emitter<Pair<String, Float>> emitter) {
- PageRankData prd = input.second();
- for (String link : prd.urls) {
- emitter.emit(Pair.of(link, prd.propagatedScore()));
- }
- }
- }, ptf.tableOf(ptf.strings(), ptf.floats()));
-
- return input.cogroup(outbound).parallelDo(
- new MapFn<Pair<String, Pair<Collection<PageRankData>, Collection<Float>>>, Pair<String, PageRankData>>() {
- @Override
- public Pair<String, PageRankData> map(Pair<String, Pair<Collection<PageRankData>, Collection<Float>>> input) {
- PageRankData prd = Iterables.getOnlyElement(input.second().first());
- Collection<Float> propagatedScores = input.second().second();
- float sum = 0.0f;
- for (Float s : propagatedScores) {
- sum += s;
- }
- return Pair.of(input.first(), prd.next(d + (1.0f - d) * sum));
- }
- }, input.getPTableType());
- }
-
- public static void run(Pipeline pipeline, String urlInput,
- PType<PageRankData> prType, PTypeFamily ptf) throws Exception {
- PTable<String, PageRankData> scores = pipeline.readTextFile(urlInput)
- .parallelDo(new MapFn<String, Pair<String, String>>() {
- @Override
- public Pair<String, String> map(String input) {
- String[] urls = input.split("\\t");
- return Pair.of(urls[0], urls[1]);
- }
- }, ptf.tableOf(ptf.strings(), ptf.strings())).groupByKey()
- .parallelDo(new MapFn<Pair<String, Iterable<String>>, Pair<String, PageRankData>>() {
- @Override
- public Pair<String, PageRankData> map(Pair<String, Iterable<String>> input) {
- return Pair.of(input.first(), new PageRankData(1.0f, 0.0f, input.second()));
- }
- }, ptf.tableOf(ptf.strings(), prType));
-
- Float delta = 1.0f;
- while (delta > 0.01) {
- scores = pageRank(scores, 0.5f);
- scores.materialize().iterator(); // force the write
- delta = Aggregate.max(scores.parallelDo(new MapFn<Pair<String, PageRankData>, Float>() {
- @Override
- public Float map(Pair<String, PageRankData> input) {
- PageRankData prd = input.second();
- return Math.abs(prd.score - prd.lastScore);
- }
- }, ptf.floats())).getValue();
- }
- assertEquals(0.0048, delta, 0.001);
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/StageResultsCountersIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/StageResultsCountersIT.java b/crunch/src/it/java/org/apache/crunch/StageResultsCountersIT.java
deleted file mode 100644
index 19fc302..0000000
--- a/crunch/src/it/java/org/apache/crunch/StageResultsCountersIT.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertTrue;
-
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.crunch.PipelineResult.StageResult;
-import org.apache.crunch.impl.mem.MemPipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.From;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-import org.apache.hadoop.mapreduce.Counter;
-import org.junit.After;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-public class StageResultsCountersIT {
-
- @Rule
- public TemporaryPath tmpDir = TemporaryPaths.create();
-
- public static HashSet<String> SPECIAL_KEYWORDS = Sets.newHashSet("AND", "OR", "NOT");
-
- public static String KEYWORDS_COUNTER_GROUP = "KEYWORDS_COUNTER_GROUP";
-
- @After
- public void after() {
- MemPipeline.clearCounters();
- }
-
- @Test
- public void testStageResultsCountersMRWritables() throws Exception {
- testSpecialKeywordCount(new MRPipeline(StageResultsCountersIT.class, tmpDir.getDefaultConfiguration()),
- WritableTypeFamily.getInstance());
- }
-
- @Test
- public void testStageResultsCountersMRAvro() throws Exception {
- testSpecialKeywordCount(new MRPipeline(StageResultsCountersIT.class, tmpDir.getDefaultConfiguration()),
- AvroTypeFamily.getInstance());
- }
-
- @Test
- public void testStageResultsCountersMemWritables() throws Exception {
- testSpecialKeywordCount(MemPipeline.getInstance(), WritableTypeFamily.getInstance());
- }
-
- @Test
- public void testStageResultsCountersMemAvro() throws Exception {
- testSpecialKeywordCount(MemPipeline.getInstance(), AvroTypeFamily.getInstance());
- }
-
- public void testSpecialKeywordCount(Pipeline pipeline, PTypeFamily tf) throws Exception {
-
- String rowsInputPath = tmpDir.copyResourceFileName("shakes.txt");
-
- PipelineResult result = coutSpecialKeywords(pipeline, rowsInputPath, tf);
-
- assertTrue(result.succeeded());
-
- Map<String, Long> keywordsMap = countersToMap(result.getStageResults(), KEYWORDS_COUNTER_GROUP);
-
- assertEquals(3, keywordsMap.size());
-
- assertEquals("{NOT=157, AND=596, OR=81}", keywordsMap.toString());
- }
-
- private static PipelineResult coutSpecialKeywords(Pipeline pipeline, String inputFileName, PTypeFamily tf) {
-
- pipeline.read(From.textFile(inputFileName)).parallelDo(new DoFn<String, Void>() {
-
- @Override
- public void process(String text, Emitter<Void> emitter) {
-
- if (!StringUtils.isBlank(text)) {
-
- String[] tokens = text.toUpperCase().split("\\s");
-
- for (String token : tokens) {
- if (SPECIAL_KEYWORDS.contains(token)) {
- getCounter(KEYWORDS_COUNTER_GROUP, token).increment(1);
- }
- }
- }
- }
- }, tf.nulls()).materialize(); // TODO can we avoid the materialize ?
-
- return pipeline.done();
- }
-
- private static Map<String, Long> countersToMap(List<StageResult> stages, String counterGroupName) {
-
- Map<String, Long> countersMap = Maps.newHashMap();
-
- for (StageResult sr : stages) {
- Iterator<Counter> iterator = sr.getCounters().getGroup(counterGroupName).iterator();
- while (iterator.hasNext()) {
- Counter counter = (Counter) iterator.next();
- countersMap.put(counter.getDisplayName(), counter.getValue());
- }
- }
-
- return countersMap;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/TermFrequencyIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/TermFrequencyIT.java b/crunch/src/it/java/org/apache/crunch/TermFrequencyIT.java
deleted file mode 100644
index ca66aa8..0000000
--- a/crunch/src/it/java/org/apache/crunch/TermFrequencyIT.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.crunch.impl.mem.MemPipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.At;
-import org.apache.crunch.io.ReadableSourceTarget;
-import org.apache.crunch.lib.Aggregate;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-import org.junit.Rule;
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class TermFrequencyIT implements Serializable {
- @Rule
- public transient TemporaryPath tmpDir = TemporaryPaths.create();
-
- @Test
- public void testTermFrequencyWithNoTransform() throws IOException {
- run(new MRPipeline(TermFrequencyIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance(), false);
- }
-
- @Test
- public void testTermFrequencyWithTransform() throws IOException {
- run(new MRPipeline(TermFrequencyIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance(), true);
- }
-
- @Test
- public void testTermFrequencyNoTransformInMemory() throws IOException {
- run(MemPipeline.getInstance(), WritableTypeFamily.getInstance(), false);
- }
-
- @Test
- public void testTermFrequencyWithTransformInMemory() throws IOException {
- run(MemPipeline.getInstance(), WritableTypeFamily.getInstance(), true);
- }
-
- public void run(Pipeline pipeline, PTypeFamily typeFamily, boolean transformTF) throws IOException {
- String input = tmpDir.copyResourceFileName("docs.txt");
-
- File transformedOutput = tmpDir.getFile("transformed-output");
- File tfOutput = tmpDir.getFile("tf-output");
-
- PCollection<String> docs = pipeline.readTextFile(input);
-
- PTypeFamily ptf = docs.getTypeFamily();
-
- /*
- * Input: String Input title text
- *
- * Output: PTable<Pair<String, String>, Long> Pair<Pair<word, title>, count
- * in title>
- */
- PTable<Pair<String, String>, Long> tf = Aggregate.count(docs.parallelDo("term document frequency",
- new DoFn<String, Pair<String, String>>() {
- @Override
- public void process(String doc, Emitter<Pair<String, String>> emitter) {
- String[] kv = doc.split("\t");
- String title = kv[0];
- String text = kv[1];
- for (String word : text.split("\\W+")) {
- if (word.length() > 0) {
- Pair<String, String> pair = Pair.of(word.toLowerCase(), title);
- emitter.emit(pair);
- }
- }
- }
- }, ptf.pairs(ptf.strings(), ptf.strings())));
-
- if (transformTF) {
- /*
- * Input: Pair<Pair<String, String>, Long> Pair<Pair<word, title>, count
- * in title>
- *
- * Output: PTable<String, Pair<String, Long>> PTable<word, Pair<title,
- * count in title>>
- */
- PTable<String, Pair<String, Long>> wordDocumentCountPair = tf.parallelDo("transform wordDocumentPairCount",
- new MapFn<Pair<Pair<String, String>, Long>, Pair<String, Pair<String, Long>>>() {
- @Override
- public Pair<String, Pair<String, Long>> map(Pair<Pair<String, String>, Long> input) {
- Pair<String, String> wordDocumentPair = input.first();
- return Pair.of(wordDocumentPair.first(), Pair.of(wordDocumentPair.second(), input.second()));
- }
- }, ptf.tableOf(ptf.strings(), ptf.pairs(ptf.strings(), ptf.longs())));
-
- pipeline.writeTextFile(wordDocumentCountPair, transformedOutput.getAbsolutePath());
- }
-
- SourceTarget<String> st = At.textFile(tfOutput.getAbsolutePath());
- pipeline.write(tf, st);
-
- pipeline.run();
-
- // test the case we should see
- Iterable<String> lines = ((ReadableSourceTarget<String>) st).read(pipeline.getConfiguration());
- boolean passed = false;
- for (String line : lines) {
- if ("[well,A]\t0".equals(line)) {
- fail("Found " + line + " but well is in Document A 1 time");
- }
- if ("[well,A]\t1".equals(line)) {
- passed = true;
- }
- }
- assertTrue(passed);
- pipeline.done();
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/TextPairIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/TextPairIT.java b/crunch/src/it/java/org/apache/crunch/TextPairIT.java
deleted file mode 100644
index 55d9af9..0000000
--- a/crunch/src/it/java/org/apache/crunch/TextPairIT.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.From;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.writable.Writables;
-import org.junit.Rule;
-import org.junit.Test;
-
-public class TextPairIT {
- @Rule
- public TemporaryPath tmpDir = TemporaryPaths.create();
-
- @Test
- public void testWritables() throws IOException {
- run(new MRPipeline(TextPairIT.class, tmpDir.getDefaultConfiguration()));
- }
-
- private static final String CANARY = "Writables.STRING_TO_TEXT";
-
- public static PCollection<Pair<String, String>> wordDuplicate(PCollection<String> words) {
- return words.parallelDo("my word duplicator", new DoFn<String, Pair<String, String>>() {
- public void process(String line, Emitter<Pair<String, String>> emitter) {
- for (String word : line.split("\\W+")) {
- if (word.length() > 0) {
- Pair<String, String> pair = Pair.of(CANARY, word);
- emitter.emit(pair);
- }
- }
- }
- }, Writables.pairs(Writables.strings(), Writables.strings()));
- }
-
- public void run(Pipeline pipeline) throws IOException {
- String input = tmpDir.copyResourceFileName("shakes.txt");
-
- PCollection<String> shakespeare = pipeline.read(From.textFile(input));
- Iterable<Pair<String, String>> lines = pipeline.materialize(wordDuplicate(shakespeare));
- boolean passed = false;
- for (Pair<String, String> line : lines) {
- if (line.first().contains(CANARY)) {
- passed = true;
- break;
- }
- }
-
- pipeline.done();
- assertTrue(passed);
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/TfIdfIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/TfIdfIT.java b/crunch/src/it/java/org/apache/crunch/TfIdfIT.java
deleted file mode 100644
index 218f538..0000000
--- a/crunch/src/it/java/org/apache/crunch/TfIdfIT.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.nio.charset.Charset;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.crunch.fn.MapKeysFn;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.seq.SeqFileSourceTarget;
-import org.apache.crunch.lib.Aggregate;
-import org.apache.crunch.lib.Join;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-import org.apache.hadoop.fs.Path;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-import com.google.common.io.Files;
-
-@SuppressWarnings("serial")
-public class TfIdfIT implements Serializable {
- @Rule
- public transient TemporaryPath tmpDir = TemporaryPaths.create();
-
- // total number of documents, should calculate
- protected static final double N = 2;
-
- @Test
- public void testWritablesSingleRun() throws IOException {
- run(new MRPipeline(TfIdfIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance(), true);
- }
-
- @Test
- public void testWritablesMultiRun() throws IOException {
- run(new MRPipeline(TfIdfIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance(), false);
- }
-
- /**
- * This method should generate a TF-IDF score for the input.
- */
- public PTable<String, Collection<Pair<String, Double>>> generateTFIDF(PCollection<String> docs, Path termFreqPath,
- PTypeFamily ptf) throws IOException {
-
- /*
- * Input: String Input title text
- *
- * Output: PTable<Pair<String, String>, Long> Pair<Pair<word, title>, count
- * in title>
- */
- PTable<Pair<String, String>, Long> tf = Aggregate.count(docs.parallelDo("term document frequency",
- new DoFn<String, Pair<String, String>>() {
- @Override
- public void process(String doc, Emitter<Pair<String, String>> emitter) {
- String[] kv = doc.split("\t");
- String title = kv[0];
- String text = kv[1];
- for (String word : text.split("\\W+")) {
- if (word.length() > 0) {
- Pair<String, String> pair = Pair.of(word.toLowerCase(), title);
- emitter.emit(pair);
- }
- }
- }
- }, ptf.pairs(ptf.strings(), ptf.strings())));
-
- tf.write(new SeqFileSourceTarget<Pair<Pair<String, String>, Long>>(termFreqPath, tf.getPType()));
-
- /*
- * Input: Pair<Pair<String, String>, Long> Pair<Pair<word, title>, count in
- * title>
- *
- * Output: PTable<String, Long> PTable<word, # of docs containing word>
- */
- PTable<String, Long> n = Aggregate.count(tf.parallelDo("little n (# of docs contain word)",
- new DoFn<Pair<Pair<String, String>, Long>, String>() {
- @Override
- public void process(Pair<Pair<String, String>, Long> input, Emitter<String> emitter) {
- emitter.emit(input.first().first());
- }
- }, ptf.strings()));
-
- /*
- * Input: Pair<Pair<String, String>, Long> Pair<Pair<word, title>, count in
- * title>
- *
- * Output: PTable<String, Pair<String, Long>> PTable<word, Pair<title, count
- * in title>>
- */
- PTable<String, Collection<Pair<String, Long>>> wordDocumentCountPair = tf.parallelDo(
- "transform wordDocumentPairCount",
- new DoFn<Pair<Pair<String, String>, Long>, Pair<String, Collection<Pair<String, Long>>>>() {
- Collection<Pair<String, Long>> buffer;
- String key;
-
- @Override
- public void process(Pair<Pair<String, String>, Long> input,
- Emitter<Pair<String, Collection<Pair<String, Long>>>> emitter) {
- Pair<String, String> wordDocumentPair = input.first();
- if (!wordDocumentPair.first().equals(key)) {
- flush(emitter);
- key = wordDocumentPair.first();
- buffer = Lists.newArrayList();
- }
- buffer.add(Pair.of(wordDocumentPair.second(), input.second()));
- }
-
- protected void flush(Emitter<Pair<String, Collection<Pair<String, Long>>>> emitter) {
- if (buffer != null) {
- emitter.emit(Pair.of(key, buffer));
- buffer = null;
- }
- }
-
- @Override
- public void cleanup(Emitter<Pair<String, Collection<Pair<String, Long>>>> emitter) {
- flush(emitter);
- }
- }, ptf.tableOf(ptf.strings(), ptf.collections(ptf.pairs(ptf.strings(), ptf.longs()))));
-
- PTable<String, Pair<Long, Collection<Pair<String, Long>>>> joinedResults = Join.join(n, wordDocumentCountPair);
-
- /*
- * Input: Pair<String, Pair<Long, Collection<Pair<String, Long>>> Pair<word,
- * Pair<# of docs containing word, Collection<Pair<title, term frequency>>>
- *
- * Output: Pair<String, Collection<Pair<String, Double>>> Pair<word,
- * Collection<Pair<title, tfidf>>>
- */
- return joinedResults
- .parallelDo(
- "calculate tfidf",
- new MapFn<Pair<String, Pair<Long, Collection<Pair<String, Long>>>>, Pair<String, Collection<Pair<String, Double>>>>() {
- @Override
- public Pair<String, Collection<Pair<String, Double>>> map(
- Pair<String, Pair<Long, Collection<Pair<String, Long>>>> input) {
- Collection<Pair<String, Double>> tfidfs = Lists.newArrayList();
- String word = input.first();
- double n = input.second().first();
- double idf = Math.log(N / n);
- for (Pair<String, Long> tf : input.second().second()) {
- double tfidf = tf.second() * idf;
- tfidfs.add(Pair.of(tf.first(), tfidf));
- }
- return Pair.of(word, tfidfs);
- }
-
- }, ptf.tableOf(ptf.strings(), ptf.collections(ptf.pairs(ptf.strings(), ptf.doubles()))));
- }
-
- public void run(Pipeline pipeline, PTypeFamily typeFamily, boolean singleRun) throws IOException {
- String inputFile = tmpDir.copyResourceFileName("docs.txt");
- String outputPath1 = tmpDir.getFileName("output1");
- String outputPath2 = tmpDir.getFileName("output2");
-
- Path tfPath = tmpDir.getPath("termfreq");
-
- PCollection<String> docs = pipeline.readTextFile(inputFile);
-
- PTable<String, Collection<Pair<String, Double>>> results = generateTFIDF(docs, tfPath, typeFamily);
- pipeline.writeTextFile(results, outputPath1);
- if (!singleRun) {
- pipeline.run();
- }
-
- PTable<String, Collection<Pair<String, Double>>> uppercased = results.parallelDo(
- new MapKeysFn<String, String, Collection<Pair<String, Double>>>() {
- @Override
- public String map(String k1) {
- return k1.toUpperCase();
- }
- }, results.getPTableType());
- pipeline.writeTextFile(uppercased, outputPath2);
- pipeline.done();
-
- // Check the lowercase version...
- File outputFile = new File(outputPath1, "part-r-00000");
- List<String> lines = Files.readLines(outputFile, Charset.defaultCharset());
- boolean passed = false;
- for (String line : lines) {
- if (line.startsWith("[the") && line.contains("B,0.6931471805599453")) {
- passed = true;
- break;
- }
- }
- assertTrue(passed);
-
- // ...and the uppercase version
- outputFile = new File(outputPath2, "part-r-00000");
- lines = Files.readLines(outputFile, Charset.defaultCharset());
- passed = false;
- for (String line : lines) {
- if (line.startsWith("[THE") && line.contains("B,0.6931471805599453")) {
- passed = true;
- break;
- }
- }
- assertTrue(passed);
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/TupleNClassCastBugIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/TupleNClassCastBugIT.java b/crunch/src/it/java/org/apache/crunch/TupleNClassCastBugIT.java
deleted file mode 100644
index e49f4d5..0000000
--- a/crunch/src/it/java/org/apache/crunch/TupleNClassCastBugIT.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.List;
-
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.io.Files;
-
-
-public class TupleNClassCastBugIT {
- @Rule
- public TemporaryPath tmpDir = TemporaryPaths.create();
-
- public static PCollection<TupleN> mapGroupDo(PCollection<String> lines, PTypeFamily ptf) {
- PTable<String, TupleN> mapped = lines.parallelDo(new MapFn<String, Pair<String, TupleN>>() {
-
- @Override
- public Pair<String, TupleN> map(String line) {
- String[] columns = line.split("\\t");
- String docId = columns[0];
- String docLine = columns[1];
- return Pair.of(docId, new TupleN(docId, docLine));
- }
- }, ptf.tableOf(ptf.strings(), ptf.tuples(ptf.strings(), ptf.strings())));
- return mapped.groupByKey().parallelDo(new DoFn<Pair<String, Iterable<TupleN>>, TupleN>() {
- @Override
- public void process(Pair<String, Iterable<TupleN>> input, Emitter<TupleN> tupleNEmitter) {
- for (TupleN tuple : input.second()) {
- tupleNEmitter.emit(tuple);
- }
- }
- }, ptf.tuples(ptf.strings(), ptf.strings()));
- }
-
- @Test
- public void testWritables() throws IOException {
- run(new MRPipeline(TupleNClassCastBugIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance());
- }
-
- @Test
- public void testAvro() throws IOException {
- run(new MRPipeline(TupleNClassCastBugIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance());
- }
-
- public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
- String inputPath = tmpDir.copyResourceFileName("docs.txt");
- String outputPath = tmpDir.getFileName("output");
-
- PCollection<String> docLines = pipeline.readTextFile(inputPath);
- pipeline.writeTextFile(mapGroupDo(docLines, typeFamily), outputPath);
- pipeline.done();
-
- // *** We are not directly testing the output, we are looking for a
- // ClassCastException
- // *** which is thrown in a different thread during the reduce phase. If all
- // is well
- // *** the file will exist and have six lines. Otherwise the bug is present.
- File outputFile = new File(outputPath, "part-r-00000");
- List<String> lines = Files.readLines(outputFile, Charset.defaultCharset());
- int lineCount = 0;
- for (String line : lines) {
- lineCount++;
- }
- assertEquals(6, lineCount);
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/UnionFromSameSourceIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/UnionFromSameSourceIT.java b/crunch/src/it/java/org/apache/crunch/UnionFromSameSourceIT.java
deleted file mode 100644
index 501a944..0000000
--- a/crunch/src/it/java/org/apache/crunch/UnionFromSameSourceIT.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-
-import org.apache.crunch.fn.IdentityFn;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PTableType;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.writable.Writables;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-
-/**
- * Collection of tests re-using the same PCollection in various unions.
- */
-public class UnionFromSameSourceIT {
-
- private static final int NUM_ELEMENTS = 4;
-
- @Rule
- public transient TemporaryPath tmpDir = TemporaryPaths.create();
-
- private Pipeline pipeline;
- private PType<String> elementType = Writables.strings();
- private PTableType<String, String> tableType = Writables.tableOf(Writables.strings(),
- Writables.strings());
-
- @Before
- public void setUp() {
- pipeline = new MRPipeline(UnionFromSameSourceIT.class, tmpDir.getDefaultConfiguration());
- }
-
- @Test
- public void testUnion_SingleRead() throws IOException {
- PCollection<String> strings = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt"));
- PCollection<String> union = strings.union(strings.parallelDo(IdentityFn.<String> getInstance(),
- strings.getPType()));
-
- assertEquals(NUM_ELEMENTS * 2, getCount(union));
- }
-
- @Test
- public void testUnion_TwoReads() throws IOException {
- PCollection<String> stringsA = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt"));
- PCollection<String> stringsB = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt"));
-
- PCollection<String> union = stringsA.union(stringsB);
-
- assertEquals(NUM_ELEMENTS * 2, getCount(union));
- }
-
- @Test
- public void testDoubleUnion_EndingWithGBK() throws IOException {
- runDoubleUnionPipeline(true);
- }
-
- @Test
- public void testDoubleUnion_EndingWithoutGBK() throws IOException {
- runDoubleUnionPipeline(false);
- }
-
- private void runDoubleUnionPipeline(boolean endWithGBK) throws IOException {
- PCollection<String> strings = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt"));
- PTable<String, String> tableA = strings.parallelDo("to table A", new ToTableFn(), tableType);
- PTable<String, String> tableB = strings.parallelDo("to table B", new ToTableFn(), tableType);
-
- PGroupedTable<String, String> groupedTable = tableA.union(tableB).groupByKey();
- PCollection<String> ungrouped = groupedTable.parallelDo("ungroup before union",
- new FromGroupedTableFn(), elementType).union(
- strings.parallelDo("fake id", IdentityFn.<String> getInstance(), elementType));
-
- PTable<String, String> table = ungrouped.parallelDo("union back to table", new ToTableFn(),
- tableType);
-
- if (endWithGBK) {
- table = table.groupByKey().ungroup();
- }
-
- assertEquals(3 * NUM_ELEMENTS, getCount(table));
- }
-
- private int getCount(PCollection<?> pcollection) {
- int cnt = 0;
- for (Object v : pcollection.materialize()) {
- cnt++;
- }
- return cnt;
- }
-
- private static class ToTableFn extends MapFn<String, Pair<String, String>> {
-
- @Override
- public Pair<String, String> map(String input) {
- return Pair.of(input, input);
- }
-
- }
-
- private static class FromGroupedTableFn extends DoFn<Pair<String, Iterable<String>>, String> {
-
- @Override
- public void process(Pair<String, Iterable<String>> input, Emitter<String> emitter) {
- for (String value : input.second()) {
- emitter.emit(value);
- }
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/UnionIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/UnionIT.java b/crunch/src/it/java/org/apache/crunch/UnionIT.java
deleted file mode 100644
index 1c60a1b..0000000
--- a/crunch/src/it/java/org/apache/crunch/UnionIT.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.crunch.fn.Aggregators;
-import org.apache.crunch.fn.IdentityFn;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.test.Tests;
-import org.apache.crunch.types.avro.Avros;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableMultiset;
-
-
-public class UnionIT {
-
- @Rule
- public TemporaryPath tmpDir = TemporaryPaths.create();
- private MRPipeline pipeline;
- private PCollection<String> words1;
- private PCollection<String> words2;
-
- @Before
- public void setUp() throws IOException {
- pipeline = new MRPipeline(UnionIT.class, tmpDir.getDefaultConfiguration());
- words1 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src1.txt")));
- words2 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src2.txt")));
- }
-
- @After
- public void tearDown() {
- pipeline.done();
- }
-
- @Test
- public void testUnion() throws Exception {
- IdentityFn<String> identity = IdentityFn.getInstance();
- words1 = words1.parallelDo(identity, Avros.strings());
- words2 = words2.parallelDo(identity, Avros.strings());
-
- PCollection<String> union = words1.union(words2);
-
- ImmutableMultiset<String> actual = ImmutableMultiset.copyOf(union.materialize());
- assertThat(actual.elementSet().size(), is(3));
- assertThat(actual.count("a1"), is(4));
- assertThat(actual.count("b2"), is(2));
- assertThat(actual.count("c3"), is(2));
- }
-
- @Test
- public void testTableUnion() throws IOException {
- PTable<String, String> words1ByFirstLetter = byFirstLetter(words1);
- PTable<String, String> words2ByFirstLetter = byFirstLetter(words2);
-
- PTable<String, String> union = words1ByFirstLetter.union(words2ByFirstLetter);
-
- ImmutableMultiset<Pair<String, String>> actual = ImmutableMultiset.copyOf(union.materialize());
-
- assertThat(actual.elementSet().size(), is(3));
- assertThat(actual.count(Pair.of("a", "1")), is(4));
- assertThat(actual.count(Pair.of("b", "2")), is(2));
- assertThat(actual.count(Pair.of("c", "3")), is(2));
- }
-
- @Test
- public void testUnionThenGroupByKey() throws IOException {
- PCollection<String> union = words1.union(words2);
-
- PGroupedTable<String, String> grouped = byFirstLetter(union).groupByKey();
-
- Map<String, String> actual = grouped.combineValues(Aggregators.STRING_CONCAT("", true))
- .materializeToMap();
-
- Map<String, String> expected = ImmutableMap.of("a", "1111", "b", "22", "c", "33");
- assertThat(actual, is(expected));
- }
-
- @Test
- public void testTableUnionThenGroupByKey() throws IOException {
- PTable<String, String> words1ByFirstLetter = byFirstLetter(words1);
- PTable<String, String> words2ByFirstLetter = byFirstLetter(words2);
-
- PTable<String, String> union = words1ByFirstLetter.union(words2ByFirstLetter);
-
- PGroupedTable<String, String> grouped = union.groupByKey();
-
- Map<String, String> actual = grouped.combineValues(Aggregators.STRING_CONCAT("", true))
- .materializeToMap();
-
- Map<String, String> expected = ImmutableMap.of("a", "1111", "b", "22", "c", "33");
- assertThat(actual, is(expected));
- }
-
-
- private static PTable<String, String> byFirstLetter(PCollection<String> values) {
- return values.parallelDo("byFirstLetter", new FirstLetterKeyFn(),
- Avros.tableOf(Avros.strings(), Avros.strings()));
- }
-
- private static class FirstLetterKeyFn extends DoFn<String, Pair<String, String>> {
- @Override
- public void process(String input, Emitter<Pair<String, String>> emitter) {
- if (input.length() > 1) {
- emitter.emit(Pair.of(input.substring(0, 1), input.substring(1)));
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/UnionResultsIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/UnionResultsIT.java b/crunch/src/it/java/org/apache/crunch/UnionResultsIT.java
deleted file mode 100644
index df0511a..0000000
--- a/crunch/src/it/java/org/apache/crunch/UnionResultsIT.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.At;
-import org.apache.crunch.test.CrunchTestSupport;
-import org.apache.crunch.types.writable.Writables;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-public class UnionResultsIT extends CrunchTestSupport implements Serializable {
-
- static class StringLengthMapFn extends MapFn<String, Pair<String, Long>> {
-
- @Override
- public Pair<String, Long> map(String input) {
- return new Pair<String, Long>(input, 10L);
- }
- }
-
-
- /**
- * Tests combining a GBK output with a map-only job output into a single
- * unioned collection.
- */
- @Test
- public void testUnionOfGroupedOutputAndNonGroupedOutput() throws IOException {
- String inputPath = tempDir.copyResourceFileName("set1.txt");
- String inputPath2 = tempDir.copyResourceFileName("set2.txt");
-
- Pipeline pipeline = new MRPipeline(UnionResultsIT.class);
-
- PCollection<String> set1Lines = pipeline.read(At.textFile(inputPath, Writables.strings()));
- PCollection<Pair<String, Long>> set1Lengths = set1Lines.parallelDo(new StringLengthMapFn(),
- Writables.pairs(Writables.strings(), Writables.longs()));
- PCollection<Pair<String, Long>> set2Counts = pipeline.read(At.textFile(inputPath2, Writables.strings())).count();
-
- PCollection<Pair<String, Long>> union = set1Lengths.union(set2Counts);
-
- List<Pair<String, Long>> unionValues = Lists.newArrayList(union.materialize());
- assertEquals(7, unionValues.size());
-
- Set<Pair<String, Long>> expectedPairs = Sets.newHashSet();
- expectedPairs.add(Pair.of("b", 10L));
- expectedPairs.add(Pair.of("c", 10L));
- expectedPairs.add(Pair.of("a", 10L));
- expectedPairs.add(Pair.of("e", 10L));
- expectedPairs.add(Pair.of("a", 1L));
- expectedPairs.add(Pair.of("c", 1L));
- expectedPairs.add(Pair.of("d", 1L));
-
- assertEquals(expectedPairs, Sets.newHashSet(unionValues));
-
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/WordCountIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/WordCountIT.java b/crunch/src/it/java/org/apache/crunch/WordCountIT.java
deleted file mode 100644
index c646663..0000000
--- a/crunch/src/it/java/org/apache/crunch/WordCountIT.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.List;
-
-import org.apache.crunch.fn.Aggregators;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.At;
-import org.apache.crunch.io.To;
-import org.apache.crunch.lib.Aggregate;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.io.Files;
-
-public class WordCountIT {
- @Rule
- public TemporaryPath tmpDir = TemporaryPaths.create();
-
- enum WordCountStats {
- ANDS
- };
-
- public static PTable<String, Long> wordCount(PCollection<String> words, PTypeFamily typeFamily) {
- return Aggregate.count(words.parallelDo(new DoFn<String, String>() {
-
- @Override
- public void process(String line, Emitter<String> emitter) {
- for (String word : line.split("\\s+")) {
- emitter.emit(word);
- if ("and".equals(word)) {
- increment(WordCountStats.ANDS);
- }
- }
- }
- }, typeFamily.strings()));
- }
-
- public static PTable<String, Long> substr(PTable<String, Long> ptable) {
- return ptable.parallelDo(new DoFn<Pair<String, Long>, Pair<String, Long>>() {
-
- public void process(Pair<String, Long> input, Emitter<Pair<String, Long>> emitter) {
- if (input.first().length() > 0) {
- emitter.emit(Pair.of(input.first().substring(0, 1), input.second()));
- }
- }
- }, ptable.getPTableType());
- }
-
- private boolean runSecond = false;
- private boolean useToOutput = false;
-
- @Test
- public void testWritables() throws IOException {
- run(new MRPipeline(WordCountIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance());
- }
-
- @Test
- public void testWritablesWithSecond() throws IOException {
- runSecond = true;
- run(new MRPipeline(WordCountIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance());
- }
-
- @Test
- public void testWritablesWithSecondUseToOutput() throws IOException {
- runSecond = true;
- useToOutput = true;
- run(new MRPipeline(WordCountIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance());
- }
-
- @Test
- public void testAvro() throws IOException {
- run(new MRPipeline(WordCountIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance());
- }
-
- @Test
- public void testAvroWithSecond() throws IOException {
- runSecond = true;
- run(new MRPipeline(WordCountIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance());
- }
-
- @Test
- public void testWithTopWritable() throws IOException {
- runWithTop(WritableTypeFamily.getInstance());
- }
-
- @Test
- public void testWithTopAvro() throws IOException {
- runWithTop(AvroTypeFamily.getInstance());
- }
-
- public void runWithTop(PTypeFamily tf) throws IOException {
- Pipeline pipeline = new MRPipeline(WordCountIT.class, tmpDir.getDefaultConfiguration());
- String inputPath = tmpDir.copyResourceFileName("shakes.txt");
-
- PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, tf.strings()));
- PTable<String, Long> wordCount = wordCount(shakespeare, tf);
- List<Pair<String, Long>> top5 = Lists.newArrayList(Aggregate.top(wordCount, 5, true).materialize());
- assertEquals(
- ImmutableList.of(Pair.of("", 1470L), Pair.of("the", 620L), Pair.of("and", 427L), Pair.of("of", 396L),
- Pair.of("to", 367L)), top5);
- }
-
- public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
- String inputPath = tmpDir.copyResourceFileName("shakes.txt");
- String outputPath = tmpDir.getFileName("output");
-
- PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, typeFamily.strings()));
- PTable<String, Long> wordCount = wordCount(shakespeare, typeFamily);
- if (useToOutput) {
- wordCount.write(To.textFile(outputPath));
- } else {
- pipeline.writeTextFile(wordCount, outputPath);
- }
-
- if (runSecond) {
- String substrPath = tmpDir.getFileName("substr");
- PTable<String, Long> we = substr(wordCount).groupByKey().combineValues(Aggregators.SUM_LONGS());
- pipeline.writeTextFile(we, substrPath);
- }
- PipelineResult res = pipeline.done();
- assertTrue(res.succeeded());
- List<PipelineResult.StageResult> stageResults = res.getStageResults();
- if (runSecond) {
- assertEquals(2, stageResults.size());
- } else {
- assertEquals(1, stageResults.size());
- assertEquals(427, stageResults.get(0).getCounterValue(WordCountStats.ANDS));
- }
-
- File outputFile = new File(outputPath, "part-r-00000");
- List<String> lines = Files.readLines(outputFile, Charset.defaultCharset());
- boolean passed = false;
- for (String line : lines) {
- if (line.startsWith("Macbeth\t28") || line.startsWith("[Macbeth,28]")) {
- passed = true;
- break;
- }
- }
- assertTrue(passed);
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/fn/AggregatorsIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/fn/AggregatorsIT.java b/crunch/src/it/java/org/apache/crunch/fn/AggregatorsIT.java
deleted file mode 100644
index c9584a1..0000000
--- a/crunch/src/it/java/org/apache/crunch/fn/AggregatorsIT.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.fn;
-
-import static org.apache.crunch.fn.Aggregators.SUM_INTS;
-import static org.apache.crunch.fn.Aggregators.pairAggregator;
-import static org.apache.crunch.types.writable.Writables.ints;
-import static org.apache.crunch.types.writable.Writables.pairs;
-import static org.apache.crunch.types.writable.Writables.strings;
-import static org.apache.crunch.types.writable.Writables.tableOf;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-
-import java.util.Collection;
-import java.util.Map;
-
-import org.apache.crunch.MapFn;
-import org.apache.crunch.PCollection;
-import org.apache.crunch.PTable;
-import org.apache.crunch.Pair;
-import org.apache.crunch.Pipeline;
-import org.apache.crunch.test.Tests;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-
-@RunWith(Parameterized.class)
-public class AggregatorsIT {
- private Pipeline pipeline;
-
- @Parameters
- public static Collection<Object[]> params() {
- return Tests.pipelinesParams(AggregatorsIT.class);
- }
-
- public AggregatorsIT(Pipeline pipeline) {
- this.pipeline = pipeline;
- }
-
- @Test
- public void testPairAggregator() {
- PCollection<String> lines = pipeline.readTextFile(Tests.pathTo(this, "ints.txt"));
-
- PTable<String, Pair<Integer, Integer>> table = lines.parallelDo(new SplitLine(),
- tableOf(strings(), pairs(ints(), ints())));
-
- PTable<String, Pair<Integer, Integer>> combinedTable = table.groupByKey().combineValues(
- pairAggregator(SUM_INTS(), SUM_INTS()));
-
- Map<String, Pair<Integer, Integer>> result = combinedTable.asMap().getValue();
-
- assertThat(result.size(), is(2));
- assertThat(result.get("a"), is(Pair.of(9, 12)));
- assertThat(result.get("b"), is(Pair.of(11, 13)));
- }
-
- private static final class SplitLine extends MapFn<String, Pair<String, Pair<Integer, Integer>>> {
- @Override
- public Pair<String, Pair<Integer, Integer>> map(String input) {
- String[] split = input.split("\t");
- return Pair.of(split[0],
- Pair.of(Integer.parseInt(split[1]), Integer.parseInt(split[2])));
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileWritingIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileWritingIT.java b/crunch/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileWritingIT.java
deleted file mode 100644
index 976a43e..0000000
--- a/crunch/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileWritingIT.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mem;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.util.List;
-
-import org.apache.crunch.PCollection;
-import org.apache.crunch.Pipeline;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableList;
-import com.google.common.io.Files;
-
-public class MemPipelineFileWritingIT {
- @Rule
- public TemporaryPath baseTmpDir = TemporaryPaths.create();
-
- @Test
- public void testMemPipelineFileWriter() throws Exception {
- File tmpDir = baseTmpDir.getFile("mempipe");
- Pipeline p = MemPipeline.getInstance();
- PCollection<String> lines = MemPipeline.collectionOf("hello", "world");
- p.writeTextFile(lines, tmpDir.toString());
- p.done();
- assertTrue(tmpDir.exists());
- File[] files = tmpDir.listFiles();
- assertTrue(files != null && files.length > 0);
- for (File f : files) {
- if (!f.getName().startsWith(".")) {
- List<String> txt = Files.readLines(f, Charsets.UTF_8);
- assertEquals(ImmutableList.of("hello", "world"), txt);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java b/crunch/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java
deleted file mode 100644
index f9f73b2..0000000
--- a/crunch/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.collect;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.crunch.PCollection;
-import org.apache.crunch.PTableKeyValueIT;
-import org.apache.crunch.Pipeline;
-import org.apache.crunch.impl.mem.MemPipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.At;
-import org.apache.crunch.io.To;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.avro.Avros;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import com.google.common.collect.Lists;
-
-@RunWith(value = Parameterized.class)
-public class UnionCollectionIT {
- @Rule
- public TemporaryPath tmpDir = TemporaryPaths.create();
-
- private static final Log LOG = LogFactory.getLog(UnionCollectionIT.class);
-
- private PTypeFamily typeFamily;
- private Pipeline pipeline;
- private PCollection<String> union;
-
- private ArrayList<String> EXPECTED = Lists.newArrayList("a", "a", "b", "c", "c", "d", "e");
-
- private Class pipelineClass;
-
- @Before
- @SuppressWarnings("unchecked")
- public void setUp() throws IOException {
- String inputFile1 = tmpDir.copyResourceFileName("set1.txt");
- String inputFile2 = tmpDir.copyResourceFileName("set2.txt");
- if (pipelineClass == null) {
- pipeline = MemPipeline.getInstance();
- } else {
- pipeline = new MRPipeline(pipelineClass, tmpDir.getDefaultConfiguration());
- }
- PCollection<String> firstCollection = pipeline.read(At.textFile(inputFile1, typeFamily.strings()));
- PCollection<String> secondCollection = pipeline.read(At.textFile(inputFile2, typeFamily.strings()));
-
- LOG.info("Test fixture: [" + pipeline.getClass().getSimpleName() + " : " + typeFamily.getClass().getSimpleName()
- + "] First: " + Lists.newArrayList(firstCollection.materialize().iterator()) + ", Second: "
- + Lists.newArrayList(secondCollection.materialize().iterator()));
-
- union = secondCollection.union(firstCollection);
- }
-
- @Parameters
- public static Collection<Object[]> data() throws IOException {
- Object[][] data = new Object[][] { { WritableTypeFamily.getInstance(), PTableKeyValueIT.class },
- { WritableTypeFamily.getInstance(), null }, { AvroTypeFamily.getInstance(), PTableKeyValueIT.class },
- { AvroTypeFamily.getInstance(), null } };
- return Arrays.asList(data);
- }
-
- public UnionCollectionIT(PTypeFamily typeFamily, Class pipelineClass) {
- this.typeFamily = typeFamily;
- this.pipelineClass = pipelineClass;
- }
-
- @Test
- public void unionMaterializeShouldNotThrowNPE() throws Exception {
- checkMaterialized(union.materialize());
- checkMaterialized(pipeline.materialize(union));
- }
-
- private void checkMaterialized(Iterable<String> materialized) {
- List<String> materializedValues = Lists.newArrayList(materialized.iterator());
- Collections.sort(materializedValues);
- LOG.info("Materialized union: " + materializedValues);
- assertEquals(EXPECTED, materializedValues);
- }
-
- @Test
- public void unionWriteShouldNotThrowNPE() throws IOException {
- String outputPath1 = tmpDir.getFileName("output1");
- String outputPath2 = tmpDir.getFileName("output2");
- String outputPath3 = tmpDir.getFileName("output3");
-
- if (typeFamily == AvroTypeFamily.getInstance()) {
- union.write(To.avroFile(outputPath1));
- pipeline.write(union, To.avroFile(outputPath2));
-
- pipeline.run();
-
- checkFileContents(outputPath1);
- checkFileContents(outputPath2);
-
- } else {
-
- union.write(To.textFile(outputPath1));
- pipeline.write(union, To.textFile(outputPath2));
- pipeline.writeTextFile(union, outputPath3);
-
- pipeline.run();
-
- checkFileContents(outputPath1);
- checkFileContents(outputPath2);
- checkFileContents(outputPath3);
- }
- }
-
- private void checkFileContents(String filePath) throws IOException {
-
- List<String> fileContentValues = (typeFamily != AvroTypeFamily.getInstance() || !(pipeline instanceof MRPipeline)) ? Lists
- .newArrayList(pipeline.read(At.textFile(filePath, typeFamily.strings())).materialize().iterator()) : Lists
- .newArrayList(pipeline.read(At.avroFile(filePath, Avros.strings())).materialize().iterator());
-
- Collections.sort(fileContentValues);
-
- LOG.info("Saved Union: " + fileContentValues);
- assertEquals(EXPECTED, fileContentValues);
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/io/CompositePathIterableIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/io/CompositePathIterableIT.java b/crunch/src/it/java/org/apache/crunch/io/CompositePathIterableIT.java
deleted file mode 100644
index 08d226d..0000000
--- a/crunch/src/it/java/org/apache/crunch/io/CompositePathIterableIT.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.crunch.io.text.TextFileReaderFactory;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.writable.Writables;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-public class CompositePathIterableIT {
- @Rule
- public TemporaryPath tmpDir = TemporaryPaths.create();
-
- @Test
- public void testCreate_FilePresent() throws IOException {
- String inputFilePath = tmpDir.copyResourceFileName("set1.txt");
- Configuration conf = new Configuration();
- LocalFileSystem local = FileSystem.getLocal(conf);
-
- Iterable<String> iterable = CompositePathIterable.create(local, new Path(inputFilePath),
- new TextFileReaderFactory<String>(Writables.strings()));
-
- assertEquals(Lists.newArrayList("b", "c", "a", "e"), Lists.newArrayList(iterable));
-
- }
-
- @Test
- public void testCreate_DirectoryPresentButNoFiles() throws IOException {
- Path emptyInputDir = tmpDir.getRootPath();
-
- Configuration conf = new Configuration();
- LocalFileSystem local = FileSystem.getLocal(conf);
-
- Iterable<String> iterable = CompositePathIterable.create(local, emptyInputDir,
- new TextFileReaderFactory<String>(Writables.strings()));
-
- assertTrue(Lists.newArrayList(iterable).isEmpty());
- }
-
- @Test(expected = IOException.class)
- public void testCreate_DirectoryNotPresent() throws IOException {
- File nonExistentDir = tmpDir.getFile("not-there");
-
- // Sanity check
- assertFalse(nonExistentDir.exists());
-
- Configuration conf = new Configuration();
- LocalFileSystem local = FileSystem.getLocal(conf);
-
- CompositePathIterable.create(local, new Path(nonExistentDir.getAbsolutePath()), new TextFileReaderFactory<String>(
- Writables.strings()));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/io/NLineInputIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/io/NLineInputIT.java b/crunch/src/it/java/org/apache/crunch/io/NLineInputIT.java
deleted file mode 100644
index 52b8ff5..0000000
--- a/crunch/src/it/java/org/apache/crunch/io/NLineInputIT.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Emitter;
-import org.apache.crunch.PCollection;
-import org.apache.crunch.Pipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.text.NLineFileSource;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.writable.Writables;
-import org.apache.crunch.types.avro.Avros;
-import org.apache.hadoop.conf.Configuration;
-import org.junit.Rule;
-import org.junit.Test;
-
-public class NLineInputIT {
-
- @Rule
- public TemporaryPath tmpDir = TemporaryPaths.create();
-
- @Test
- public void testNLine() throws Exception {
- String urlsInputPath = tmpDir.copyResourceFileName("urls.txt");
- Configuration conf = new Configuration(tmpDir.getDefaultConfiguration());
- conf.setInt("io.sort.mb", 10);
- Pipeline pipeline = new MRPipeline(NLineInputIT.class, conf);
- PCollection<String> urls = pipeline.read(new NLineFileSource<String>(urlsInputPath,
- Writables.strings(), 2));
- assertEquals(new Integer(2),
- urls.parallelDo(new LineCountFn(), Avros.ints()).max().getValue());
- }
-
- private static class LineCountFn extends DoFn<String, Integer> {
-
- private int lineCount = 0;
-
- @Override
- public void initialize() {
- this.lineCount = 0;
- }
-
- @Override
- public void process(String input, Emitter<Integer> emitter) {
- lineCount++;
- }
-
- @Override
- public void cleanup(Emitter<Integer> emitter) {
- emitter.emit(lineCount);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/it/java/org/apache/crunch/io/TextFileTableIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/io/TextFileTableIT.java b/crunch/src/it/java/org/apache/crunch/io/TextFileTableIT.java
deleted file mode 100644
index bddc0b5..0000000
--- a/crunch/src/it/java/org/apache/crunch/io/TextFileTableIT.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io;
-
-import static org.apache.crunch.types.writable.Writables.*;
-import static org.junit.Assert.assertEquals;
-
-import java.util.Set;
-
-import org.apache.crunch.PTable;
-import org.apache.crunch.Pair;
-import org.apache.crunch.Pipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.text.TextFileTableSource;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.ImmutableSet;
-
-/**
- *
- */
-public class TextFileTableIT {
-
- @Rule
- public TemporaryPath tmpDir = TemporaryPaths.create();
-
- @Test
- public void testTextFileTable() throws Exception {
- String urlsFile = tmpDir.copyResourceFileName("urls.txt");
- Pipeline pipeline = new MRPipeline(TextFileTableIT.class, tmpDir.getDefaultConfiguration());
- PTable<String, String> urls = pipeline.read(
- new TextFileTableSource<String, String>(urlsFile, tableOf(strings(), strings())));
- Set<Pair<String, Long>> cnts = ImmutableSet.copyOf(urls.keys().count().materialize());
- assertEquals(ImmutableSet.of(Pair.of("www.A.com", 4L), Pair.of("www.B.com", 2L),
- Pair.of("www.C.com", 1L), Pair.of("www.D.com", 1L), Pair.of("www.E.com", 1L),
- Pair.of("www.F.com", 2L)), cnts);
- }
-}