You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by ch...@apache.org on 2014/04/04 15:00:22 UTC

git commit: CRUNCH-368: Introduce TupleWritable.Comparator, which can compare two TupleWritable without deserialization

Repository: crunch
Updated Branches:
  refs/heads/master 3f86cf9e6 -> a6430bbb1


CRUNCH-368: Introduce TupleWritable.Comparator, which can compare two TupleWritable without deserialization


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/a6430bbb
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/a6430bbb
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/a6430bbb

Branch: refs/heads/master
Commit: a6430bbb18d617a32a3c8230ec9f51f4030cceb8
Parents: 3f86cf9
Author: Chao Shi <ch...@apache.org>
Authored: Tue Mar 25 22:31:56 2014 +0800
Committer: Chao Shi <ch...@apache.org>
Committed: Wed Apr 2 13:24:48 2014 +0800

----------------------------------------------------------------------
 .../crunch/types/writable/TupleWritable.java    | 155 +++++++++++++++----
 .../types/writable/TupleWritableTest.java       | 100 ++++++++++++
 2 files changed, 229 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/a6430bbb/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
index 1362132..12b2fb9 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
@@ -27,15 +27,26 @@ import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.crunch.CrunchRuntimeException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /**
- * A straight copy of the TupleWritable implementation in the join package,
- * added here because of its package visibility restrictions.
- * 
+ * A serialization format for {@link org.apache.crunch.Tuple}.
+ *
+ * <pre>
+ *   tuple_writable ::= card field+
+ *   card ::= vint
+ *   field ::= code [body_size body]
+ *   code ::= vint
+ *   body_size ::= vint
+ *   body ::= byte[]
+ * </pre>
  */
 public class TupleWritable extends Configured implements WritableComparable<TupleWritable> {
 
@@ -164,17 +175,18 @@ public class TupleWritable extends Configured implements WritableComparable<Tupl
   }
 
   /**
-   * Writes each Writable to <code>out</code>. TupleWritable format:
-   * {@code
-   *  <count><type1><type2>...<typen><obj1><obj2>...<objn>
-   * }
+   * Writes each Writable to <code>out</code>.
    */
   public void write(DataOutput out) throws IOException {
+    DataOutputBuffer tmp = new DataOutputBuffer();
     WritableUtils.writeVInt(out, values.length);
     for (int i = 0; i < values.length; ++i) {
       WritableUtils.writeVInt(out, written[i]);
       if (written[i] != 0) {
-        values[i].write(out);
+        tmp.reset();
+        values[i].write(tmp);
+        WritableUtils.writeVInt(out, tmp.getLength());
+        out.write(tmp.getData(), 0, tmp.getLength());
       }
     }
   }
@@ -190,6 +202,7 @@ public class TupleWritable extends Configured implements WritableComparable<Tupl
       written[i] = WritableUtils.readVInt(in);
       if (written[i] != 0) {
         values[i] = getWritable(written[i], getConf());
+        WritableUtils.readVInt(in); // skip "bodySize"
         values[i].readFields(in);
       }
     }
@@ -213,30 +226,120 @@ public class TupleWritable extends Configured implements WritableComparable<Tupl
   }
 
   @Override
-  public int compareTo(TupleWritable o) {
-    for (int i = 0; i < values.length; ++i) {
-      if (has(i) && !o.has(i)) {
+  public int compareTo(TupleWritable that) {
+    for (int i = 0; i < Math.min(this.size(), that.size()); i++) {
+      if (!this.has(i) && !that.has(i)) {
+        continue;
+      }
+      if (this.has(i) && !that.has(i)) {
         return 1;
-      } else if (!has(i) && o.has(i)) {
+      }
+      if (!this.has(i) && that.has(i)) {
         return -1;
+      }
+      if (this.written[i] != that.written[i]) {
+        return this.written[i] - that.written[i];
+      }
+      Writable v1 = this.values[i];
+      Writable v2 = that.values[i];
+      int cmp;
+      if (v1 instanceof WritableComparable && v2 instanceof WritableComparable) {
+        cmp = ((WritableComparable) v1).compareTo(v2);
       } else {
-        Writable v1 = values[i];
-        Writable v2 = o.values[i];
-        if (v1 != v2 && (v1 != null && !v1.equals(v2))) {
-          if (v1 instanceof WritableComparable && v2 instanceof WritableComparable) {
-            int cmp = ((WritableComparable) v1).compareTo((WritableComparable) v2);
-            if (cmp != 0) {
-              return cmp;
-            }
-          } else {
-            int cmp = v1.hashCode() - v2.hashCode();
-            if (cmp != 0) {
-              return cmp;
-            }
+        cmp = v1.hashCode() - v2.hashCode();
+      }
+      if (cmp != 0) {
+        return cmp;
+      }
+    }
+    return this.size() - that.size();
+  }
+
+  public static class Comparator extends WritableComparator {
+
+    private static final Comparator INSTANCE = new Comparator();
+
+    public static Comparator getInstance() {
+      return INSTANCE;
+    }
+
+    private Comparator() {
+      super(TupleWritable.class);
+    }
+
+    @Override
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+      DataInputBuffer buffer1 = new DataInputBuffer();
+      DataInputBuffer buffer2 = new DataInputBuffer();
+
+      try {
+        buffer1.reset(b1, s1, l1);
+        buffer2.reset(b2, s2, l2);
+
+        int card1 = WritableUtils.readVInt(buffer1);
+        int card2 = WritableUtils.readVInt(buffer2);
+        int minCard = Math.min(card1, card2);
+
+        for (int i = 0; i < minCard; i++) {
+          int cmp = compareField(buffer1, buffer2);
+          if (cmp != 0) {
+            return cmp;
           }
         }
+        return card1 - card2;
+      } catch (IOException e) {
+        throw new CrunchRuntimeException(e);
       }
     }
-    return values.length - o.values.length;
+
+    private int compareField(DataInputBuffer buffer1, DataInputBuffer buffer2) throws IOException {
+      int written1 = WritableUtils.readVInt(buffer1);
+      int written2 = WritableUtils.readVInt(buffer2);
+      boolean hasValue1 = (written1 != 0);
+      boolean hasValue2 = (written2 != 0);
+      if (!hasValue1 && !hasValue2) {
+        return 0;
+      }
+      if (hasValue1 && !hasValue2) {
+        return 1;
+      }
+      if (!hasValue1 && hasValue2) {
+        return -1;
+      }
+
+      // both side have value
+      if (written1 != written2) {
+        return written1 - written2;
+      }
+      int bodySize1 = WritableUtils.readVInt(buffer1);
+      int bodySize2 = WritableUtils.readVInt(buffer2);
+      Class<? extends Writable> clazz = Writables.WRITABLE_CODES.get(written1);
+      if (WritableComparable.class.isAssignableFrom(clazz)) {
+        int cmp = WritableComparator.get(clazz.asSubclass(WritableComparable.class)).compare(
+            buffer1.getData(), buffer1.getPosition(), bodySize1,
+            buffer2.getData(), buffer2.getPosition(), bodySize2);
+        buffer1.skip(bodySize1);
+        buffer2.skip(bodySize2);
+        return cmp;
+      } else {
+        // fallback to deserialization
+        Writable w1 = ReflectionUtils.newInstance(clazz, null);
+        Writable w2 = ReflectionUtils.newInstance(clazz, null);
+        w1.readFields(buffer1);
+        w2.readFields(buffer2);
+        return w1.hashCode() - w2.hashCode();
+      }
+    }
+
+    @Override
+    public int compare(WritableComparable a, WritableComparable b) {
+      return super.compare(a, b);
+    }
+  }
+
+  static {
+    // Register the comparator to Hadoop. It will be used to perform fast comparison over buffers
+    // without any deserialization overhead.
+    WritableComparator.define(TupleWritable.class, Comparator.getInstance());
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/a6430bbb/crunch-core/src/test/java/org/apache/crunch/types/writable/TupleWritableTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/types/writable/TupleWritableTest.java b/crunch-core/src/test/java/org/apache/crunch/types/writable/TupleWritableTest.java
new file mode 100644
index 0000000..ab9ee88
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/types/writable/TupleWritableTest.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.writable;
+
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class TupleWritableTest {
+
+  @Test
+  public void testSerialization() throws IOException {
+    TupleWritable t1 = new TupleWritable(
+        new Writable[] { new IntWritable(10), null, new Text("hello"), new Text("world") });
+    TupleWritable t2 = new TupleWritable();
+    t2.readFields(new DataInputStream(new ByteArrayInputStream(WritableUtils.toByteArray(t1))));
+    assertTrue(t2.has(0));
+    assertEquals(new IntWritable(10), t2.get(0));
+    assertFalse(t2.has(1));
+    assertNull(t2.get(1));
+    assertTrue(t2.has(2));
+    assertEquals(new Text("hello"), t2.get(2));
+    assertTrue(t2.has(3));
+    assertEquals(new Text("world"), t2.get(3));
+  }
+
+  @Test
+  public void testCompare() throws IOException {
+    doTestCompare(
+        new TupleWritable(new Writable[] { new IntWritable(1) }),
+        new TupleWritable(new Writable[] { new IntWritable(2) }),
+        -1);
+    doTestCompare(
+        new TupleWritable(new Writable[] { new IntWritable(1) }),
+        new TupleWritable(new Writable[] { new IntWritable(1) }),
+        0);
+    doTestCompare(
+        new TupleWritable(new Writable[] { new IntWritable(1), new IntWritable(1) }),
+        new TupleWritable(new Writable[] { new IntWritable(1), new IntWritable(2) }),
+        -1);
+    doTestCompare(
+        new TupleWritable(new Writable[] { new IntWritable(1), new IntWritable(2) }),
+        new TupleWritable(new Writable[] { new IntWritable(1), new IntWritable(2) }),
+        0);
+    doTestCompare(
+        new TupleWritable(new Writable[] { null  }),
+        new TupleWritable(new Writable[] { new IntWritable(1) }),
+        -1);
+    doTestCompare(
+        new TupleWritable(new Writable[] { new IntWritable(1) }),
+        new TupleWritable(new Writable[] { new Text("1") }),
+        1); // code for IntWritable is larger than code for Text
+    doTestCompare(
+        new TupleWritable(new Writable[] { new IntWritable(1) }),
+        new TupleWritable(new Writable[] { new IntWritable(1), new IntWritable(2) }),
+        -1); // shorter is less
+  }
+
+  private void doTestCompare(TupleWritable t1, TupleWritable t2, int result) throws IOException {
+    // test comparing objects
+    TupleWritable.Comparator comparator = TupleWritable.Comparator.getInstance();
+    assertEquals(result, comparator.compare(t1, t2));
+
+    // test comparing buffers
+    DataOutputBuffer buffer1 = new DataOutputBuffer();
+    DataOutputBuffer buffer2 = new DataOutputBuffer();
+    t1.write(buffer1);
+    t2.write(buffer2);
+    assertEquals(result, comparator.compare(
+        buffer1.getData(), 0, buffer1.getLength(),
+        buffer2.getData(), 0, buffer2.getLength()));
+  }
+}