You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/04/10 21:16:15 UTC

git commit: CRUNCH-193: Working GenericArrayWritable impl

Updated Branches:
  refs/heads/master 3e513cfab -> 222dd76ac


CRUNCH-193: Working GenericArrayWritable impl


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/222dd76a
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/222dd76a
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/222dd76a

Branch: refs/heads/master
Commit: 222dd76ac2be2cdc786feeb9e75f86713ff8c559
Parents: 3e513cf
Author: Josh Wills <jw...@apache.org>
Authored: Mon Apr 8 21:05:39 2013 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Wed Apr 10 11:08:37 2013 -0700

----------------------------------------------------------------------
 .../it/java/org/apache/crunch/lib/CogroupIT.java   |  159 +++++----------
 .../src/it/java/org/apache/crunch/test/Tests.java  |   42 ++++
 .../org/apache/crunch/lib/CogroupITData/src1.txt   |    4 +
 .../org/apache/crunch/lib/CogroupITData/src2.txt   |    4 +
 .../types/writable/GenericArrayWritable.java       |   38 ++--
 .../types/writable/GenericArrayWritableTest.java   |   70 +++++++
 6 files changed, 192 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/222dd76a/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java b/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java
index af3329f..4b28da7 100644
--- a/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java
+++ b/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java
@@ -17,157 +17,96 @@
  */
 package org.apache.crunch.lib;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
 
-import java.io.File;
 import java.io.IOException;
-import java.nio.charset.Charset;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
+import java.util.Map;
 
 import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
-import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
-import org.apache.crunch.Pipeline;
-import org.apache.crunch.fn.Aggregators;
-import org.apache.crunch.fn.MapKeysFn;
-import org.apache.crunch.fn.MapValuesFn;
 import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.From;
-import org.apache.crunch.test.StringWrapper;
-import org.apache.crunch.test.StringWrapper.StringToStringWrapperMapFn;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.test.Tests;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.avro.Avros;
 import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 
-import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.io.Files;
+import com.google.common.collect.ImmutableMap;
+
 
 public class CogroupIT {
   @Rule
   public TemporaryPath tmpDir = TemporaryPaths.create();
+  private MRPipeline pipeline;
+  private PCollection<String> lines1;
+  private PCollection<String> lines2;
 
-  private static class WordSplit extends DoFn<String, Pair<String, Long>> {
-    @Override
-    public void process(String input, Emitter<Pair<String, Long>> emitter) {
-      for (String word : Splitter.on(' ').split(input)) {
-        emitter.emit(Pair.of(word, 1L));
-      }
-    }
+
+  @Before
+  public void setUp() throws IOException {
+    pipeline = new MRPipeline(CogroupIT.class, tmpDir.getDefaultConfiguration());
+    lines1 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src1.txt")));
+    lines2 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src2.txt")));
   }
 
-  public static PTable<String, Long> join(PCollection<String> w1, PCollection<String> w2, PTypeFamily ptf) {
-    PTableType<String, Long> ntt = ptf.tableOf(ptf.strings(), ptf.longs());
-    PTable<String, Long> ws1 = w1.parallelDo("ws1", new WordSplit(), ntt);
-    PTable<String, Long> ws2 = w2.parallelDo("ws2", new WordSplit(), ntt);
-    PTable<String, Pair<Collection<Long>, Collection<Long>>> cg = Cogroup.cogroup(ws1, ws2);
-    PTable<String, Long> sums = cg.parallelDo("wc",
-        new MapValuesFn<String, Pair<Collection<Long>, Collection<Long>>, Long>() {
-          @Override
-          public Long map(Pair<Collection<Long>, Collection<Long>> v) {
-            long sum = 0L;
-            for (Long value : v.first()) {
-              sum += value;
-            }
-            for (Long value : v.second()) {
-              sum += value;
-            }
-            return sum;
-          }
-        }, ntt);
-    return sums.parallelDo("firstletters", new MapKeysFn<String, String, Long>() {
-      @Override
-      public String map(String k1) {
-        if (k1.length() > 0) {
-          return k1.substring(0, 1).toLowerCase();
-        } else {
-          return "";
-        }
-      }
-    }, ntt).groupByKey().combineValues(Aggregators.SUM_LONGS());
+  @After
+  public void tearDown() {
+    pipeline.done();
   }
 
   @Test
-  public void testWritableJoin() throws Exception {
-    run(new MRPipeline(CogroupIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance());
+  public void testCogroupWritables() {
+    runCogroup(WritableTypeFamily.getInstance());
   }
 
   @Test
-  public void testAvroJoin() throws Exception {
-    run(new MRPipeline(CogroupIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance());
+  public void testCogroupAvro() {
+    runCogroup(AvroTypeFamily.getInstance());
   }
 
-  public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
-    String shakesInputPath = tmpDir.copyResourceFileName("shakes.txt");
-    String maughamInputPath = tmpDir.copyResourceFileName("maugham.txt");
-    File output = tmpDir.getFile("output");
+  public void runCogroup(PTypeFamily ptf) {
+    PTableType<String, String> tt = ptf.tableOf(ptf.strings(), ptf.strings());
 
-    PCollection<String> shakespeare = pipeline.read(From.textFile(shakesInputPath));
-    PCollection<String> maugham = pipeline.read(From.textFile(maughamInputPath));
-    pipeline.writeTextFile(join(shakespeare, maugham, typeFamily), output.getAbsolutePath());
-    pipeline.done();
+    PTable<String, String> kv1 = lines1.parallelDo("kv1", new KeyValueSplit(), tt);
+    PTable<String, String> kv2 = lines2.parallelDo("kv2", new KeyValueSplit(), tt);
 
-    File outputFile = new File(output, "part-r-00000");
-    List<String> lines = Files.readLines(outputFile, Charset.defaultCharset());
-    boolean passed = false;
-    for (String line : lines) {
-      if (line.equals("[j,705]")) {
-        passed = true;
-        break;
-      }
-    }
-    assertTrue(passed);
+    PTable<String, Pair<Collection<String>, Collection<String>>> cg = Cogroup.cogroup(kv1, kv2);
+
+    Map<String, Pair<Collection<String>, Collection<String>>> actual = cg.materializeToMap();
+
+    Map<String, Pair<Collection<String>, Collection<String>>> expected = ImmutableMap.of(
+        "a", Pair.of(coll("1-1", "1-4"), coll()),
+        "b", Pair.of(coll("1-2"), coll("2-1")),
+        "c", Pair.of(coll("1-3"), coll("2-2", "2-3")),
+        "d", Pair.of(coll(), coll("2-4"))
+    );
+
+    assertThat(actual, is(expected));
   }
-  
-  static class ConstantMapFn extends MapFn<StringWrapper, StringWrapper> {
 
+
+  private static class KeyValueSplit extends DoFn<String, Pair<String, String>> {
     @Override
-    public StringWrapper map(StringWrapper input) {
-      return StringWrapper.wrap("key");
+    public void process(String input, Emitter<Pair<String, String>> emitter) {
+      String[] fields = input.split(",");
+      emitter.emit(Pair.of(fields[0], fields[1]));
     }
-    
   }
-  
-  @Test
-  public void testCogroup_CheckObjectResultOnRichObjects() throws IOException {
-    Pipeline pipeline = new MRPipeline(CogroupIT.class, tmpDir.getDefaultConfiguration());
-    PTable<StringWrapper, StringWrapper> tableA = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt"))
-      .parallelDo(new StringToStringWrapperMapFn(), Avros.reflects(StringWrapper.class))
-      .by(new ConstantMapFn(), Avros.reflects(StringWrapper.class));
-    PTable<StringWrapper, StringWrapper> tableB = pipeline.readTextFile(tmpDir.copyResourceFileName("set2.txt"))
-        .parallelDo(new StringToStringWrapperMapFn(), Avros.reflects(StringWrapper.class))
-        .by(new ConstantMapFn(), Avros.reflects(StringWrapper.class));
-    
-    List<String> set1Values = Lists.newArrayList();
-    List<String> set2Values = Lists.newArrayList();
-    PTable<StringWrapper, Pair<Collection<StringWrapper>, Collection<StringWrapper>>> cogroup = Cogroup.cogroup(tableA, tableB);
-    for (Pair<StringWrapper, Pair<Collection<StringWrapper>, Collection<StringWrapper>>> entry : cogroup.materialize()) {
-      for (StringWrapper stringWrapper : entry.second().first()) {
-        set1Values.add(stringWrapper.getValue());
-      }
-      for (StringWrapper stringWrapper : entry.second().second()) {
-        set2Values.add(stringWrapper.getValue());
-      }
-    }
-    
-    Collections.sort(set1Values);
-    Collections.sort(set2Values);
-    
-    assertEquals(ImmutableList.of("a", "b", "c", "e"), set1Values);
-    assertEquals(ImmutableList.of("a", "c", "d"), set2Values);
-    
+
+  private static Collection<String> coll(String... values) {
+    return ImmutableList.copyOf(values);
   }
+  
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/222dd76a/crunch/src/it/java/org/apache/crunch/test/Tests.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/test/Tests.java b/crunch/src/it/java/org/apache/crunch/test/Tests.java
index 4c979af..e381c1a 100644
--- a/crunch/src/it/java/org/apache/crunch/test/Tests.java
+++ b/crunch/src/it/java/org/apache/crunch/test/Tests.java
@@ -17,16 +17,21 @@
  */
 package org.apache.crunch.test;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import java.io.IOException;
 import java.util.Collection;
 
 import org.apache.crunch.Pipeline;
 import org.apache.crunch.impl.mem.MemPipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.hadoop.io.Writable;
 import org.junit.runners.Parameterized.Parameters;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
 import com.google.common.io.Resources;
 
 
@@ -79,4 +84,41 @@ public final class Tests {
         new Object[][] { { MemPipeline.getInstance() }, { new MRPipeline(testCase) }
     });
   }
+
+  /**
+   * Serialize the given Writable into a byte array.
+   *
+   * @param value The instance to serialize
+   * @return The serialized data
+   */
+  public static byte[] serialize(Writable value) {
+    checkNotNull(value);
+    try {
+      ByteArrayDataOutput out = ByteStreams.newDataOutput();
+      value.write(out);
+      return out.toByteArray();
+    } catch (IOException e) {
+      throw new IllegalStateException("cannot serialize", e);
+    }
+  }
+
+  /**
+   * Serialize the src Writable into a byte array, then deserialize it into dest.
+   * @param src The instance to serialize
+   * @param dest The instance to deserialize into
+   * @return dest, for convenience
+   */
+  public static <T extends Writable> T roundtrip(Writable src, T dest) {
+    checkNotNull(src);
+    checkNotNull(dest);
+    checkArgument(src != dest, "src and dest may not be the same instance");
+
+    try {
+      byte[] data = serialize(src);
+      dest.readFields(ByteStreams.newDataInput(data));
+    } catch (IOException e) {
+      throw new IllegalStateException("cannot deserialize", e);
+    }
+    return dest;
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/222dd76a/crunch/src/it/resources/org/apache/crunch/lib/CogroupITData/src1.txt
----------------------------------------------------------------------
diff --git a/crunch/src/it/resources/org/apache/crunch/lib/CogroupITData/src1.txt b/crunch/src/it/resources/org/apache/crunch/lib/CogroupITData/src1.txt
new file mode 100644
index 0000000..9f38eb9
--- /dev/null
+++ b/crunch/src/it/resources/org/apache/crunch/lib/CogroupITData/src1.txt
@@ -0,0 +1,4 @@
+a,1-1
+b,1-2
+c,1-3
+a,1-4

http://git-wip-us.apache.org/repos/asf/crunch/blob/222dd76a/crunch/src/it/resources/org/apache/crunch/lib/CogroupITData/src2.txt
----------------------------------------------------------------------
diff --git a/crunch/src/it/resources/org/apache/crunch/lib/CogroupITData/src2.txt b/crunch/src/it/resources/org/apache/crunch/lib/CogroupITData/src2.txt
new file mode 100644
index 0000000..ed9524e
--- /dev/null
+++ b/crunch/src/it/resources/org/apache/crunch/lib/CogroupITData/src2.txt
@@ -0,0 +1,4 @@
+b,2-1
+c,2-2
+c,2-3
+d,2-4

http://git-wip-us.apache.org/repos/asf/crunch/blob/222dd76a/crunch/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java b/crunch/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java
index 79c93be..8b54008 100644
--- a/crunch/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java
+++ b/crunch/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java
@@ -29,6 +29,12 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableUtils;
 
+/**
+ * A {@link Writable} for marshalling/unmarshalling Collections. Note that
+ * element order is <em>undefined</em>!
+ *
+ * @param <T> The value type
+ */
 class GenericArrayWritable<T> implements Writable {
   private Writable[] values;
   private Class<? extends Writable> valueClass;
@@ -58,7 +64,7 @@ class GenericArrayWritable<T> implements Writable {
       }
       String valueType = Text.readString(in);
       setValueType(valueType);
-      for (int i = 0; i < values.length; i++) {
+      for (int i = 0; i < values.length - nulls; i++) {
         Writable value = WritableFactories.newInstance(valueClass);
         value.readFields(in); // read a value
         values[i] = value; // store it in values
@@ -80,21 +86,23 @@ class GenericArrayWritable<T> implements Writable {
 
   public void write(DataOutput out) throws IOException {
     WritableUtils.writeVInt(out, values.length);
-    int nulls = 0;
-    for (int i = 0; i < values.length; i++) {
-      if (values[i] == null) {
-        nulls++;
-      }
-    }
-    WritableUtils.writeVInt(out, nulls);
-    if (values.length - nulls > 0) {
-      if (valueClass == null) {
-        throw new IllegalStateException("Value class not set by constructor or read");
-      }
-      Text.writeString(out, valueClass.getName());
+    if (values.length > 0) {
+      int nulls = 0;
       for (int i = 0; i < values.length; i++) {
-        if (values[i] != null) {
-          values[i].write(out);
+        if (values[i] == null) {
+          nulls++;
+        }
+      }
+      WritableUtils.writeVInt(out, nulls);
+      if (values.length - nulls > 0) {
+        if (valueClass == null) {
+          throw new IllegalStateException("Value class not set by constructor or read");
+        }
+        Text.writeString(out, valueClass.getName());
+        for (int i = 0; i < values.length; i++) {
+          if (values[i] != null) {
+            values[i].write(out);
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/crunch/blob/222dd76a/crunch/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java b/crunch/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java
new file mode 100644
index 0000000..c807a90
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.writable;
+
+import static org.hamcrest.Matchers.hasItems;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.sameInstance;
+import static org.junit.Assert.assertThat;
+
+import java.util.Arrays;
+
+import org.apache.crunch.test.Tests;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.junit.Test;
+
+
+public class GenericArrayWritableTest {
+
+  @Test
+  public void testEmpty() {
+    GenericArrayWritable<Text> src = new GenericArrayWritable<Text>(Text.class);
+    src.set(new Text[0]);
+
+    GenericArrayWritable<Text> dest = Tests.roundtrip(src, new GenericArrayWritable<Text>());
+
+    assertThat(dest.get().length, is(0));
+  }
+
+  @Test
+  public void testNonEmpty() {
+    GenericArrayWritable<Text> src = new GenericArrayWritable<Text>(Text.class);
+    src.set(new Text[] { new Text("foo"), new Text("bar") });
+
+    GenericArrayWritable<Text> dest = Tests.roundtrip(src, new GenericArrayWritable<Text>());
+
+    assertThat(src.get(), not(sameInstance(dest.get())));
+    assertThat(dest.get().length, is(2));
+    assertThat(Arrays.asList(dest.get()), hasItems((Writable) new Text("foo"), new Text("bar")));
+  }
+
+  @Test
+  public void testNulls() {
+    GenericArrayWritable<Text> src = new GenericArrayWritable<Text>(Text.class);
+    src.set(new Text[] { new Text("a"), null, new Text("b") });
+
+    GenericArrayWritable<Text> dest = Tests.roundtrip(src, new GenericArrayWritable<Text>());
+
+    assertThat(src.get(), not(sameInstance(dest.get())));
+    assertThat(dest.get().length, is(3));
+    assertThat(Arrays.asList(dest.get()), hasItems((Writable) new Text("a"), new Text("b"), null));
+  }
+
+}