You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by ch...@apache.org on 2014/04/04 15:00:22 UTC
git commit: CRUNCH-368: Introduce TupleWritable.Comparator,
which can compare two TupleWritable without deserialization
Repository: crunch
Updated Branches:
refs/heads/master 3f86cf9e6 -> a6430bbb1
CRUNCH-368: Introduce TupleWritable.Comparator, which can compare two TupleWritable without deserialization
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/a6430bbb
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/a6430bbb
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/a6430bbb
Branch: refs/heads/master
Commit: a6430bbb18d617a32a3c8230ec9f51f4030cceb8
Parents: 3f86cf9
Author: Chao Shi <ch...@apache.org>
Authored: Tue Mar 25 22:31:56 2014 +0800
Committer: Chao Shi <ch...@apache.org>
Committed: Wed Apr 2 13:24:48 2014 +0800
----------------------------------------------------------------------
.../crunch/types/writable/TupleWritable.java | 155 +++++++++++++++----
.../types/writable/TupleWritableTest.java | 100 ++++++++++++
2 files changed, 229 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/a6430bbb/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
index 1362132..12b2fb9 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
@@ -27,15 +27,26 @@ import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.ReflectionUtils;
/**
- * A straight copy of the TupleWritable implementation in the join package,
- * added here because of its package visibility restrictions.
- *
+ * A serialization format for {@link org.apache.crunch.Tuple}.
+ *
+ * <pre>
+ * tuple_writable ::= card field+
+ * card ::= vint
+ * field ::= code [body_size body]
+ * code ::= vint
+ * body_size ::= vint
+ * body ::= byte[]
+ * </pre>
*/
public class TupleWritable extends Configured implements WritableComparable<TupleWritable> {
@@ -164,17 +175,18 @@ public class TupleWritable extends Configured implements WritableComparable<Tupl
}
/**
- * Writes each Writable to <code>out</code>. TupleWritable format:
- * {@code
- * <count><type1><type2>...<typen><obj1><obj2>...<objn>
- * }
+ * Writes each Writable to <code>out</code>.
*/
public void write(DataOutput out) throws IOException {
+ DataOutputBuffer tmp = new DataOutputBuffer();
WritableUtils.writeVInt(out, values.length);
for (int i = 0; i < values.length; ++i) {
WritableUtils.writeVInt(out, written[i]);
if (written[i] != 0) {
- values[i].write(out);
+ tmp.reset();
+ values[i].write(tmp);
+ WritableUtils.writeVInt(out, tmp.getLength());
+ out.write(tmp.getData(), 0, tmp.getLength());
}
}
}
@@ -190,6 +202,7 @@ public class TupleWritable extends Configured implements WritableComparable<Tupl
written[i] = WritableUtils.readVInt(in);
if (written[i] != 0) {
values[i] = getWritable(written[i], getConf());
+ WritableUtils.readVInt(in); // skip "bodySize"
values[i].readFields(in);
}
}
@@ -213,30 +226,120 @@ public class TupleWritable extends Configured implements WritableComparable<Tupl
}
@Override
- public int compareTo(TupleWritable o) {
- for (int i = 0; i < values.length; ++i) {
- if (has(i) && !o.has(i)) {
+ public int compareTo(TupleWritable that) {
+ for (int i = 0; i < Math.min(this.size(), that.size()); i++) {
+ if (!this.has(i) && !that.has(i)) {
+ continue;
+ }
+ if (this.has(i) && !that.has(i)) {
return 1;
- } else if (!has(i) && o.has(i)) {
+ }
+ if (!this.has(i) && that.has(i)) {
return -1;
+ }
+ if (this.written[i] != that.written[i]) {
+ return this.written[i] - that.written[i];
+ }
+ Writable v1 = this.values[i];
+ Writable v2 = that.values[i];
+ int cmp;
+ if (v1 instanceof WritableComparable && v2 instanceof WritableComparable) {
+ cmp = ((WritableComparable) v1).compareTo(v2);
} else {
- Writable v1 = values[i];
- Writable v2 = o.values[i];
- if (v1 != v2 && (v1 != null && !v1.equals(v2))) {
- if (v1 instanceof WritableComparable && v2 instanceof WritableComparable) {
- int cmp = ((WritableComparable) v1).compareTo((WritableComparable) v2);
- if (cmp != 0) {
- return cmp;
- }
- } else {
- int cmp = v1.hashCode() - v2.hashCode();
- if (cmp != 0) {
- return cmp;
- }
+ cmp = v1.hashCode() - v2.hashCode();
+ }
+ if (cmp != 0) {
+ return cmp;
+ }
+ }
+ return this.size() - that.size();
+ }
+
+ public static class Comparator extends WritableComparator {
+
+ private static final Comparator INSTANCE = new Comparator();
+
+ public static Comparator getInstance() {
+ return INSTANCE;
+ }
+
+ private Comparator() {
+ super(TupleWritable.class);
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ DataInputBuffer buffer1 = new DataInputBuffer();
+ DataInputBuffer buffer2 = new DataInputBuffer();
+
+ try {
+ buffer1.reset(b1, s1, l1);
+ buffer2.reset(b2, s2, l2);
+
+ int card1 = WritableUtils.readVInt(buffer1);
+ int card2 = WritableUtils.readVInt(buffer2);
+ int minCard = Math.min(card1, card2);
+
+ for (int i = 0; i < minCard; i++) {
+ int cmp = compareField(buffer1, buffer2);
+ if (cmp != 0) {
+ return cmp;
}
}
+ return card1 - card2;
+ } catch (IOException e) {
+ throw new CrunchRuntimeException(e);
}
}
- return values.length - o.values.length;
+
+ private int compareField(DataInputBuffer buffer1, DataInputBuffer buffer2) throws IOException {
+ int written1 = WritableUtils.readVInt(buffer1);
+ int written2 = WritableUtils.readVInt(buffer2);
+ boolean hasValue1 = (written1 != 0);
+ boolean hasValue2 = (written2 != 0);
+ if (!hasValue1 && !hasValue2) {
+ return 0;
+ }
+ if (hasValue1 && !hasValue2) {
+ return 1;
+ }
+ if (!hasValue1 && hasValue2) {
+ return -1;
+ }
+
+ // both side have value
+ if (written1 != written2) {
+ return written1 - written2;
+ }
+ int bodySize1 = WritableUtils.readVInt(buffer1);
+ int bodySize2 = WritableUtils.readVInt(buffer2);
+ Class<? extends Writable> clazz = Writables.WRITABLE_CODES.get(written1);
+ if (WritableComparable.class.isAssignableFrom(clazz)) {
+ int cmp = WritableComparator.get(clazz.asSubclass(WritableComparable.class)).compare(
+ buffer1.getData(), buffer1.getPosition(), bodySize1,
+ buffer2.getData(), buffer2.getPosition(), bodySize2);
+ buffer1.skip(bodySize1);
+ buffer2.skip(bodySize2);
+ return cmp;
+ } else {
+ // fallback to deserialization
+ Writable w1 = ReflectionUtils.newInstance(clazz, null);
+ Writable w2 = ReflectionUtils.newInstance(clazz, null);
+ w1.readFields(buffer1);
+ w2.readFields(buffer2);
+ return w1.hashCode() - w2.hashCode();
+ }
+ }
+
+ @Override
+ public int compare(WritableComparable a, WritableComparable b) {
+ return super.compare(a, b);
+ }
+ }
+
+ static {
+ // Register the comparator to Hadoop. It will be used to perform fast comparison over buffers
+ // without any deserialization overhead.
+ WritableComparator.define(TupleWritable.class, Comparator.getInstance());
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/a6430bbb/crunch-core/src/test/java/org/apache/crunch/types/writable/TupleWritableTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/types/writable/TupleWritableTest.java b/crunch-core/src/test/java/org/apache/crunch/types/writable/TupleWritableTest.java
new file mode 100644
index 0000000..ab9ee88
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/types/writable/TupleWritableTest.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.writable;
+
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class TupleWritableTest {
+
+ @Test
+ public void testSerialization() throws IOException {
+ TupleWritable t1 = new TupleWritable(
+ new Writable[] { new IntWritable(10), null, new Text("hello"), new Text("world") });
+ TupleWritable t2 = new TupleWritable();
+ t2.readFields(new DataInputStream(new ByteArrayInputStream(WritableUtils.toByteArray(t1))));
+ assertTrue(t2.has(0));
+ assertEquals(new IntWritable(10), t2.get(0));
+ assertFalse(t2.has(1));
+ assertNull(t2.get(1));
+ assertTrue(t2.has(2));
+ assertEquals(new Text("hello"), t2.get(2));
+ assertTrue(t2.has(3));
+ assertEquals(new Text("world"), t2.get(3));
+ }
+
+ @Test
+ public void testCompare() throws IOException {
+ doTestCompare(
+ new TupleWritable(new Writable[] { new IntWritable(1) }),
+ new TupleWritable(new Writable[] { new IntWritable(2) }),
+ -1);
+ doTestCompare(
+ new TupleWritable(new Writable[] { new IntWritable(1) }),
+ new TupleWritable(new Writable[] { new IntWritable(1) }),
+ 0);
+ doTestCompare(
+ new TupleWritable(new Writable[] { new IntWritable(1), new IntWritable(1) }),
+ new TupleWritable(new Writable[] { new IntWritable(1), new IntWritable(2) }),
+ -1);
+ doTestCompare(
+ new TupleWritable(new Writable[] { new IntWritable(1), new IntWritable(2) }),
+ new TupleWritable(new Writable[] { new IntWritable(1), new IntWritable(2) }),
+ 0);
+ doTestCompare(
+ new TupleWritable(new Writable[] { null }),
+ new TupleWritable(new Writable[] { new IntWritable(1) }),
+ -1);
+ doTestCompare(
+ new TupleWritable(new Writable[] { new IntWritable(1) }),
+ new TupleWritable(new Writable[] { new Text("1") }),
+ 1); // code for IntWritable is larger than code for Text
+ doTestCompare(
+ new TupleWritable(new Writable[] { new IntWritable(1) }),
+ new TupleWritable(new Writable[] { new IntWritable(1), new IntWritable(2) }),
+ -1); // shorter is less
+ }
+
+ private void doTestCompare(TupleWritable t1, TupleWritable t2, int result) throws IOException {
+ // test comparing objects
+ TupleWritable.Comparator comparator = TupleWritable.Comparator.getInstance();
+ assertEquals(result, comparator.compare(t1, t2));
+
+ // test comparing buffers
+ DataOutputBuffer buffer1 = new DataOutputBuffer();
+ DataOutputBuffer buffer2 = new DataOutputBuffer();
+ t1.write(buffer1);
+ t2.write(buffer2);
+ assertEquals(result, comparator.compare(
+ buffer1.getData(), 0, buffer1.getLength(),
+ buffer2.getData(), 0, buffer2.getLength()));
+ }
+}