You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/04/10 21:16:15 UTC
git commit: CRUNCH-193: Working GenericArrayWritable impl
Updated Branches:
refs/heads/master 3e513cfab -> 222dd76ac
CRUNCH-193: Working GenericArrayWritable impl
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/222dd76a
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/222dd76a
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/222dd76a
Branch: refs/heads/master
Commit: 222dd76ac2be2cdc786feeb9e75f86713ff8c559
Parents: 3e513cf
Author: Josh Wills <jw...@apache.org>
Authored: Mon Apr 8 21:05:39 2013 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Wed Apr 10 11:08:37 2013 -0700
----------------------------------------------------------------------
.../it/java/org/apache/crunch/lib/CogroupIT.java | 159 +++++----------
.../src/it/java/org/apache/crunch/test/Tests.java | 42 ++++
.../org/apache/crunch/lib/CogroupITData/src1.txt | 4 +
.../org/apache/crunch/lib/CogroupITData/src2.txt | 4 +
.../types/writable/GenericArrayWritable.java | 38 ++--
.../types/writable/GenericArrayWritableTest.java | 70 +++++++
6 files changed, 192 insertions(+), 125 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/222dd76a/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java b/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java
index af3329f..4b28da7 100644
--- a/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java
+++ b/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java
@@ -17,157 +17,96 @@
*/
package org.apache.crunch.lib;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
-import java.io.File;
import java.io.IOException;
-import java.nio.charset.Charset;
import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
+import java.util.Map;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
-import org.apache.crunch.MapFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
-import org.apache.crunch.Pipeline;
-import org.apache.crunch.fn.Aggregators;
-import org.apache.crunch.fn.MapKeysFn;
-import org.apache.crunch.fn.MapValuesFn;
import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.From;
-import org.apache.crunch.test.StringWrapper;
-import org.apache.crunch.test.StringWrapper.StringToStringWrapperMapFn;
import org.apache.crunch.test.TemporaryPath;
import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.test.Tests;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PTypeFamily;
import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.avro.Avros;
import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
-import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.io.Files;
+import com.google.common.collect.ImmutableMap;
+
public class CogroupIT {
@Rule
public TemporaryPath tmpDir = TemporaryPaths.create();
+ private MRPipeline pipeline;
+ private PCollection<String> lines1;
+ private PCollection<String> lines2;
- private static class WordSplit extends DoFn<String, Pair<String, Long>> {
- @Override
- public void process(String input, Emitter<Pair<String, Long>> emitter) {
- for (String word : Splitter.on(' ').split(input)) {
- emitter.emit(Pair.of(word, 1L));
- }
- }
+
+ @Before
+ public void setUp() throws IOException {
+ pipeline = new MRPipeline(CogroupIT.class, tmpDir.getDefaultConfiguration());
+ lines1 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src1.txt")));
+ lines2 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src2.txt")));
}
- public static PTable<String, Long> join(PCollection<String> w1, PCollection<String> w2, PTypeFamily ptf) {
- PTableType<String, Long> ntt = ptf.tableOf(ptf.strings(), ptf.longs());
- PTable<String, Long> ws1 = w1.parallelDo("ws1", new WordSplit(), ntt);
- PTable<String, Long> ws2 = w2.parallelDo("ws2", new WordSplit(), ntt);
- PTable<String, Pair<Collection<Long>, Collection<Long>>> cg = Cogroup.cogroup(ws1, ws2);
- PTable<String, Long> sums = cg.parallelDo("wc",
- new MapValuesFn<String, Pair<Collection<Long>, Collection<Long>>, Long>() {
- @Override
- public Long map(Pair<Collection<Long>, Collection<Long>> v) {
- long sum = 0L;
- for (Long value : v.first()) {
- sum += value;
- }
- for (Long value : v.second()) {
- sum += value;
- }
- return sum;
- }
- }, ntt);
- return sums.parallelDo("firstletters", new MapKeysFn<String, String, Long>() {
- @Override
- public String map(String k1) {
- if (k1.length() > 0) {
- return k1.substring(0, 1).toLowerCase();
- } else {
- return "";
- }
- }
- }, ntt).groupByKey().combineValues(Aggregators.SUM_LONGS());
+ @After
+ public void tearDown() {
+ pipeline.done();
}
@Test
- public void testWritableJoin() throws Exception {
- run(new MRPipeline(CogroupIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance());
+ public void testCogroupWritables() {
+ runCogroup(WritableTypeFamily.getInstance());
}
@Test
- public void testAvroJoin() throws Exception {
- run(new MRPipeline(CogroupIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance());
+ public void testCogroupAvro() {
+ runCogroup(AvroTypeFamily.getInstance());
}
- public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
- String shakesInputPath = tmpDir.copyResourceFileName("shakes.txt");
- String maughamInputPath = tmpDir.copyResourceFileName("maugham.txt");
- File output = tmpDir.getFile("output");
+ public void runCogroup(PTypeFamily ptf) {
+ PTableType<String, String> tt = ptf.tableOf(ptf.strings(), ptf.strings());
- PCollection<String> shakespeare = pipeline.read(From.textFile(shakesInputPath));
- PCollection<String> maugham = pipeline.read(From.textFile(maughamInputPath));
- pipeline.writeTextFile(join(shakespeare, maugham, typeFamily), output.getAbsolutePath());
- pipeline.done();
+ PTable<String, String> kv1 = lines1.parallelDo("kv1", new KeyValueSplit(), tt);
+ PTable<String, String> kv2 = lines2.parallelDo("kv2", new KeyValueSplit(), tt);
- File outputFile = new File(output, "part-r-00000");
- List<String> lines = Files.readLines(outputFile, Charset.defaultCharset());
- boolean passed = false;
- for (String line : lines) {
- if (line.equals("[j,705]")) {
- passed = true;
- break;
- }
- }
- assertTrue(passed);
+ PTable<String, Pair<Collection<String>, Collection<String>>> cg = Cogroup.cogroup(kv1, kv2);
+
+ Map<String, Pair<Collection<String>, Collection<String>>> actual = cg.materializeToMap();
+
+ Map<String, Pair<Collection<String>, Collection<String>>> expected = ImmutableMap.of(
+ "a", Pair.of(coll("1-1", "1-4"), coll()),
+ "b", Pair.of(coll("1-2"), coll("2-1")),
+ "c", Pair.of(coll("1-3"), coll("2-2", "2-3")),
+ "d", Pair.of(coll(), coll("2-4"))
+ );
+
+ assertThat(actual, is(expected));
}
-
- static class ConstantMapFn extends MapFn<StringWrapper, StringWrapper> {
+
+ private static class KeyValueSplit extends DoFn<String, Pair<String, String>> {
@Override
- public StringWrapper map(StringWrapper input) {
- return StringWrapper.wrap("key");
+ public void process(String input, Emitter<Pair<String, String>> emitter) {
+ String[] fields = input.split(",");
+ emitter.emit(Pair.of(fields[0], fields[1]));
}
-
}
-
- @Test
- public void testCogroup_CheckObjectResultOnRichObjects() throws IOException {
- Pipeline pipeline = new MRPipeline(CogroupIT.class, tmpDir.getDefaultConfiguration());
- PTable<StringWrapper, StringWrapper> tableA = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt"))
- .parallelDo(new StringToStringWrapperMapFn(), Avros.reflects(StringWrapper.class))
- .by(new ConstantMapFn(), Avros.reflects(StringWrapper.class));
- PTable<StringWrapper, StringWrapper> tableB = pipeline.readTextFile(tmpDir.copyResourceFileName("set2.txt"))
- .parallelDo(new StringToStringWrapperMapFn(), Avros.reflects(StringWrapper.class))
- .by(new ConstantMapFn(), Avros.reflects(StringWrapper.class));
-
- List<String> set1Values = Lists.newArrayList();
- List<String> set2Values = Lists.newArrayList();
- PTable<StringWrapper, Pair<Collection<StringWrapper>, Collection<StringWrapper>>> cogroup = Cogroup.cogroup(tableA, tableB);
- for (Pair<StringWrapper, Pair<Collection<StringWrapper>, Collection<StringWrapper>>> entry : cogroup.materialize()) {
- for (StringWrapper stringWrapper : entry.second().first()) {
- set1Values.add(stringWrapper.getValue());
- }
- for (StringWrapper stringWrapper : entry.second().second()) {
- set2Values.add(stringWrapper.getValue());
- }
- }
-
- Collections.sort(set1Values);
- Collections.sort(set2Values);
-
- assertEquals(ImmutableList.of("a", "b", "c", "e"), set1Values);
- assertEquals(ImmutableList.of("a", "c", "d"), set2Values);
-
+
+ private static Collection<String> coll(String... values) {
+ return ImmutableList.copyOf(values);
}
+
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/222dd76a/crunch/src/it/java/org/apache/crunch/test/Tests.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/test/Tests.java b/crunch/src/it/java/org/apache/crunch/test/Tests.java
index 4c979af..e381c1a 100644
--- a/crunch/src/it/java/org/apache/crunch/test/Tests.java
+++ b/crunch/src/it/java/org/apache/crunch/test/Tests.java
@@ -17,16 +17,21 @@
*/
package org.apache.crunch.test;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import java.io.IOException;
import java.util.Collection;
import org.apache.crunch.Pipeline;
import org.apache.crunch.impl.mem.MemPipeline;
import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.hadoop.io.Writable;
import org.junit.runners.Parameterized.Parameters;
import com.google.common.collect.ImmutableList;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
import com.google.common.io.Resources;
@@ -79,4 +84,41 @@ public final class Tests {
new Object[][] { { MemPipeline.getInstance() }, { new MRPipeline(testCase) }
});
}
+
+ /**
+ * Serialize the given Writable into a byte array.
+ *
+ * @param value The instance to serialize
+ * @return The serialized data
+ */
+ public static byte[] serialize(Writable value) {
+ checkNotNull(value);
+ try {
+ ByteArrayDataOutput out = ByteStreams.newDataOutput();
+ value.write(out);
+ return out.toByteArray();
+ } catch (IOException e) {
+ throw new IllegalStateException("cannot serialize", e);
+ }
+ }
+
+ /**
+ * Serialize the src Writable into a byte array, then deserialize it into dest.
+ * @param src The instance to serialize
+ * @param dest The instance to deserialize into
+ * @return dest, for convenience
+ */
+ public static <T extends Writable> T roundtrip(Writable src, T dest) {
+ checkNotNull(src);
+ checkNotNull(dest);
+ checkArgument(src != dest, "src and dest may not be the same instance");
+
+ try {
+ byte[] data = serialize(src);
+ dest.readFields(ByteStreams.newDataInput(data));
+ } catch (IOException e) {
+ throw new IllegalStateException("cannot deserialize", e);
+ }
+ return dest;
+ }
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/222dd76a/crunch/src/it/resources/org/apache/crunch/lib/CogroupITData/src1.txt
----------------------------------------------------------------------
diff --git a/crunch/src/it/resources/org/apache/crunch/lib/CogroupITData/src1.txt b/crunch/src/it/resources/org/apache/crunch/lib/CogroupITData/src1.txt
new file mode 100644
index 0000000..9f38eb9
--- /dev/null
+++ b/crunch/src/it/resources/org/apache/crunch/lib/CogroupITData/src1.txt
@@ -0,0 +1,4 @@
+a,1-1
+b,1-2
+c,1-3
+a,1-4
http://git-wip-us.apache.org/repos/asf/crunch/blob/222dd76a/crunch/src/it/resources/org/apache/crunch/lib/CogroupITData/src2.txt
----------------------------------------------------------------------
diff --git a/crunch/src/it/resources/org/apache/crunch/lib/CogroupITData/src2.txt b/crunch/src/it/resources/org/apache/crunch/lib/CogroupITData/src2.txt
new file mode 100644
index 0000000..ed9524e
--- /dev/null
+++ b/crunch/src/it/resources/org/apache/crunch/lib/CogroupITData/src2.txt
@@ -0,0 +1,4 @@
+b,2-1
+c,2-2
+c,2-3
+d,2-4
http://git-wip-us.apache.org/repos/asf/crunch/blob/222dd76a/crunch/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java b/crunch/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java
index 79c93be..8b54008 100644
--- a/crunch/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java
+++ b/crunch/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java
@@ -29,6 +29,12 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableUtils;
+/**
+ * A {@link Writable} for marshalling/unmarshalling Collections. Note that
+ * element order is <em>undefined</em>!
+ *
+ * @param <T> The value type
+ */
class GenericArrayWritable<T> implements Writable {
private Writable[] values;
private Class<? extends Writable> valueClass;
@@ -58,7 +64,7 @@ class GenericArrayWritable<T> implements Writable {
}
String valueType = Text.readString(in);
setValueType(valueType);
- for (int i = 0; i < values.length; i++) {
+ for (int i = 0; i < values.length - nulls; i++) {
Writable value = WritableFactories.newInstance(valueClass);
value.readFields(in); // read a value
values[i] = value; // store it in values
@@ -80,21 +86,23 @@ class GenericArrayWritable<T> implements Writable {
public void write(DataOutput out) throws IOException {
WritableUtils.writeVInt(out, values.length);
- int nulls = 0;
- for (int i = 0; i < values.length; i++) {
- if (values[i] == null) {
- nulls++;
- }
- }
- WritableUtils.writeVInt(out, nulls);
- if (values.length - nulls > 0) {
- if (valueClass == null) {
- throw new IllegalStateException("Value class not set by constructor or read");
- }
- Text.writeString(out, valueClass.getName());
+ if (values.length > 0) {
+ int nulls = 0;
for (int i = 0; i < values.length; i++) {
- if (values[i] != null) {
- values[i].write(out);
+ if (values[i] == null) {
+ nulls++;
+ }
+ }
+ WritableUtils.writeVInt(out, nulls);
+ if (values.length - nulls > 0) {
+ if (valueClass == null) {
+ throw new IllegalStateException("Value class not set by constructor or read");
+ }
+ Text.writeString(out, valueClass.getName());
+ for (int i = 0; i < values.length; i++) {
+ if (values[i] != null) {
+ values[i].write(out);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/222dd76a/crunch/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java b/crunch/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java
new file mode 100644
index 0000000..c807a90
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.writable;
+
+import static org.hamcrest.Matchers.hasItems;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.sameInstance;
+import static org.junit.Assert.assertThat;
+
+import java.util.Arrays;
+
+import org.apache.crunch.test.Tests;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.junit.Test;
+
+
+public class GenericArrayWritableTest {
+
+ @Test
+ public void testEmpty() {
+ GenericArrayWritable<Text> src = new GenericArrayWritable<Text>(Text.class);
+ src.set(new Text[0]);
+
+ GenericArrayWritable<Text> dest = Tests.roundtrip(src, new GenericArrayWritable<Text>());
+
+ assertThat(dest.get().length, is(0));
+ }
+
+ @Test
+ public void testNonEmpty() {
+ GenericArrayWritable<Text> src = new GenericArrayWritable<Text>(Text.class);
+ src.set(new Text[] { new Text("foo"), new Text("bar") });
+
+ GenericArrayWritable<Text> dest = Tests.roundtrip(src, new GenericArrayWritable<Text>());
+
+ assertThat(src.get(), not(sameInstance(dest.get())));
+ assertThat(dest.get().length, is(2));
+ assertThat(Arrays.asList(dest.get()), hasItems((Writable) new Text("foo"), new Text("bar")));
+ }
+
+ @Test
+ public void testNulls() {
+ GenericArrayWritable<Text> src = new GenericArrayWritable<Text>(Text.class);
+ src.set(new Text[] { new Text("a"), null, new Text("b") });
+
+ GenericArrayWritable<Text> dest = Tests.roundtrip(src, new GenericArrayWritable<Text>());
+
+ assertThat(src.get(), not(sameInstance(dest.get())));
+ assertThat(dest.get().length, is(3));
+ assertThat(Arrays.asList(dest.get()), hasItems((Writable) new Text("a"), new Text("b"), null));
+ }
+
+}