You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2015/01/09 22:23:22 UTC

crunch git commit: CRUNCH-485: Support Avro-related byte equality etc. checks inside of crunch-spark

Repository: crunch
Updated Branches:
  refs/heads/master e3141009a -> 48e4e7941


CRUNCH-485: Support Avro-related byte equality etc. checks inside of crunch-spark


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/48e4e794
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/48e4e794
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/48e4e794

Branch: refs/heads/master
Commit: 48e4e79412b83013bea3e8a7723d4a7917f6ce49
Parents: e314100
Author: Josh Wills <jw...@apache.org>
Authored: Fri Jan 9 13:22:27 2015 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Fri Jan 9 13:22:27 2015 -0800

----------------------------------------------------------------------
 .../org/apache/crunch/impl/spark/ByteArray.java | 14 ++--
 .../crunch/impl/spark/ByteArrayHelper.java      | 84 ++++++++++++++++++++
 .../apache/crunch/impl/spark/IntByteArray.java  |  8 +-
 .../crunch/impl/spark/fn/MapOutputFunction.java |  4 +-
 .../spark/fn/PartitionedMapOutputFunction.java  |  2 +-
 .../crunch/impl/spark/serde/AvroSerDe.java      |  8 +-
 .../apache/crunch/impl/spark/serde/SerDe.java   |  3 +-
 .../crunch/impl/spark/serde/WritableSerDe.java  |  6 +-
 .../crunch/impl/spark/AvroByteArrayTest.java    | 55 +++++++++++++
 9 files changed, 163 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/48e4e794/crunch-spark/src/main/java/org/apache/crunch/impl/spark/ByteArray.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/ByteArray.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/ByteArray.java
index c86835c..db9e3c9 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/ByteArray.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/ByteArray.java
@@ -17,34 +17,32 @@
  */
 package org.apache.crunch.impl.spark;
 
-import com.google.common.primitives.UnsignedBytes;
-
 import java.io.Serializable;
-import java.util.Arrays;
 
 public class ByteArray implements Serializable, Comparable<ByteArray> {
 
   public final byte[] value;
+  protected final ByteArrayHelper helper;
 
-  public ByteArray(byte[] value) {
+  public ByteArray(byte[] value, ByteArrayHelper helper) {
     this.value = value;
+    this.helper = helper;
   }
 
   @Override
   public boolean equals(Object o) {
     if (o == null || getClass() != o.getClass()) return false;
     ByteArray byteArray = (ByteArray) o;
-    if (!Arrays.equals(value, byteArray.value)) return false;
-    return true;
+    return helper.equal(value, byteArray.value);
   }
 
   @Override
   public int hashCode() {
-    return value != null ? Arrays.hashCode(value) : 0;
+    return helper.hashCode(value);
   }
 
   @Override
   public int compareTo(ByteArray other) {
-    return UnsignedBytes.lexicographicalComparator().compare(value, other.value);
+    return helper.compare(value, other.value);
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/48e4e794/crunch-spark/src/main/java/org/apache/crunch/impl/spark/ByteArrayHelper.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/ByteArrayHelper.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/ByteArrayHelper.java
new file mode 100644
index 0000000..a87cb66
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/ByteArrayHelper.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark;
+
+import com.google.common.primitives.UnsignedBytes;
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryData;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+public abstract class ByteArrayHelper implements Serializable {
+
+  public static final ByteArrayHelper WRITABLES = new ByteArrayHelper() {
+    @Override
+    boolean equal(byte[] left, byte[] right) {
+      return Arrays.equals(left, right);
+    }
+
+    @Override
+    int hashCode(byte[] value) {
+      return value != null ? Arrays.hashCode(value) : 0;
+    }
+
+    @Override
+    int compare(byte[] left, byte[] right) {
+      return UnsignedBytes.lexicographicalComparator().compare(left, right);
+    }
+  };
+
+  public static ByteArrayHelper forAvroSchema(Schema schema) {
+    return new AvroByteArrayHelper(schema);
+  }
+
+  abstract boolean equal(byte[] left, byte[] right);
+  abstract int hashCode(byte[] value);
+  abstract int compare(byte[] left, byte[] right); 
+
+  static class AvroByteArrayHelper extends ByteArrayHelper {
+    private String jsonSchema;
+    private transient Schema schema;
+
+    public AvroByteArrayHelper(Schema schema) {
+      this.jsonSchema = schema.toString();
+    }
+
+    private Schema getSchema() {
+      if (schema == null) {
+        schema = new Schema.Parser().parse(jsonSchema);
+      }
+      return schema;
+    }
+
+    @Override 
+    boolean equal(byte[] left, byte[] right) {
+      return compare(left, right) == 0;
+    }
+
+    @Override
+    int hashCode(byte[] value) {
+      return BinaryData.hashCode(value, 0, value.length, getSchema());
+    }
+
+    @Override
+    int compare(byte[] left, byte[] right) {
+      return BinaryData.compare(left, 0, right, 0, getSchema());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/48e4e794/crunch-spark/src/main/java/org/apache/crunch/impl/spark/IntByteArray.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/IntByteArray.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/IntByteArray.java
index 9af70ed..dfb9ed6 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/IntByteArray.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/IntByteArray.java
@@ -17,13 +17,11 @@
  */
 package org.apache.crunch.impl.spark;
 
-import java.io.Serializable;
-
-public class IntByteArray extends ByteArray implements Serializable {
+public class IntByteArray extends ByteArray {
   public final int partition;
 
-  public IntByteArray(int partition, byte[] bytes) {
-    super(bytes);
+  public IntByteArray(int partition, ByteArray delegate) {
+    super(delegate.value, delegate.helper);
     this.partition = partition;
   }
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/48e4e794/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/MapOutputFunction.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/MapOutputFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/MapOutputFunction.java
index b8cd7c6..080806c 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/MapOutputFunction.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/MapOutputFunction.java
@@ -36,7 +36,7 @@ public class MapOutputFunction<K, V> implements PairFunction<Pair<K, V>, ByteArr
   @Override
   public Tuple2<ByteArray, byte[]> call(Pair<K, V> p) throws Exception {
     return new Tuple2<ByteArray, byte[]>(
-        new ByteArray(keySerde.toBytes(p.first())),
-        valueSerde.toBytes(p.second()));
+        keySerde.toBytes(p.first()),
+        valueSerde.toBytes(p.second()).value);
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/48e4e794/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PartitionedMapOutputFunction.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PartitionedMapOutputFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PartitionedMapOutputFunction.java
index e88217d..e104b27 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PartitionedMapOutputFunction.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PartitionedMapOutputFunction.java
@@ -64,7 +64,7 @@ public class PartitionedMapOutputFunction<K, V> implements PairFunction<Pair<K,
     int partition = getPartitioner().getPartition(p.first(), p.second(), numPartitions);
     return new Tuple2<IntByteArray, byte[]>(
         new IntByteArray(partition, keySerde.toBytes(p.first())),
-        valueSerde.toBytes(p.second()));
+        valueSerde.toBytes(p.second()).value);
   }
 
   private Partitioner getPartitioner() {

http://git-wip-us.apache.org/repos/asf/crunch/blob/48e4e794/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/AvroSerDe.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/AvroSerDe.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/AvroSerDe.java
index f82ba8e..cef60e3 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/AvroSerDe.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/AvroSerDe.java
@@ -30,6 +30,8 @@ import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.crunch.impl.spark.ByteArray;
+import org.apache.crunch.impl.spark.ByteArrayHelper;
 import org.apache.crunch.types.avro.AvroMode;
 import org.apache.crunch.types.avro.AvroType;
 import org.apache.crunch.types.avro.Avros;
@@ -44,6 +46,7 @@ public class AvroSerDe<T> implements SerDe<T> {
 
   private AvroType<T> avroType;
   private Map<String, String> modeProperties;
+  private ByteArrayHelper helper;
   private transient AvroMode mode;
   private transient DatumWriter<T> writer;
   private transient DatumReader<T> reader;
@@ -54,6 +57,7 @@ public class AvroSerDe<T> implements SerDe<T> {
     if (avroType.hasReflect() && avroType.hasSpecific()) {
       Avros.checkCombiningSpecificAndReflectionSchemas();
     }
+    this.helper = ByteArrayHelper.forAvroSchema(avroType.getSchema());
   }
 
   private AvroMode getMode() {
@@ -85,13 +89,13 @@ public class AvroSerDe<T> implements SerDe<T> {
   }
 
   @Override
-  public byte[] toBytes(T obj) throws Exception {
+  public ByteArray toBytes(T obj) throws Exception {
     ByteArrayOutputStream out = new ByteArrayOutputStream();
     Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
     getWriter().write(obj, encoder);
     encoder.flush();
     out.close();
-    return out.toByteArray();
+    return new ByteArray(out.toByteArray(), helper);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/48e4e794/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/SerDe.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/SerDe.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/SerDe.java
index 887f656..354f348 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/SerDe.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/SerDe.java
@@ -18,13 +18,14 @@
 package org.apache.crunch.impl.spark.serde;
 
 import com.google.common.base.Function;
+import org.apache.crunch.impl.spark.ByteArray;
 import org.apache.hadoop.conf.Configuration;
 
 import java.io.Serializable;
 
 public interface SerDe<T> extends Serializable {
 
-  byte[] toBytes(T obj) throws Exception;
+  ByteArray toBytes(T obj) throws Exception;
 
   T fromBytes(byte[] bytes);
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/48e4e794/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/WritableSerDe.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/WritableSerDe.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/WritableSerDe.java
index d90007d..55f7404 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/WritableSerDe.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/WritableSerDe.java
@@ -18,6 +18,8 @@
 package org.apache.crunch.impl.spark.serde;
 
 import com.google.common.base.Function;
+import org.apache.crunch.impl.spark.ByteArray;
+import org.apache.crunch.impl.spark.ByteArrayHelper;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
 
@@ -37,12 +39,12 @@ public class WritableSerDe implements SerDe<Writable> {
   }
 
   @Override
-  public byte[] toBytes(Writable obj) throws Exception {
+  public ByteArray toBytes(Writable obj) throws Exception {
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     DataOutputStream dos = new DataOutputStream(baos);
     obj.write(dos);
     dos.close();
-    return baos.toByteArray();
+    return new ByteArray(baos.toByteArray(), ByteArrayHelper.WRITABLES);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/48e4e794/crunch-spark/src/test/java/org/apache/crunch/impl/spark/AvroByteArrayTest.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/test/java/org/apache/crunch/impl/spark/AvroByteArrayTest.java b/crunch-spark/src/test/java/org/apache/crunch/impl/spark/AvroByteArrayTest.java
new file mode 100644
index 0000000..ec6dd94
--- /dev/null
+++ b/crunch-spark/src/test/java/org/apache/crunch/impl/spark/AvroByteArrayTest.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark;
+
+import com.google.common.collect.Lists;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Field.Order;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.crunch.impl.spark.serde.AvroSerDe;
+import org.apache.crunch.types.avro.Avros;
+import org.codehaus.jackson.node.JsonNodeFactory;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+
+public class AvroByteArrayTest {
+  @Test
+  public void fieldsWithIgnoredSortOrderAreNotUsedInEquals() throws Exception {
+    Schema mySchema = Schema.createRecord("foo", "", "", false);
+    mySchema.setFields(Lists.newArrayList(new Field("field1",
+        Schema.create(Type.STRING),
+        null,
+        JsonNodeFactory.instance.textNode(""),
+        Order.ASCENDING), new Field("field2",
+        Schema.create(Type.STRING),
+        null,
+        JsonNodeFactory.instance.textNode(""),
+        Order.IGNORE)));
+
+    GenericRecordBuilder myGRB = new GenericRecordBuilder(mySchema);
+    Record myRecord1 = myGRB.set("field1", "hello").set("field2", "world").build();
+    Record myRecord2 = myGRB.set("field1", "hello").set("field2", "there").build();
+    assertEquals(myRecord1, myRecord2);
+
+    AvroSerDe serde = new AvroSerDe(Avros.generics(mySchema), null);
+    assertEquals(serde.toBytes(myRecord1), serde.toBytes(myRecord2));
+  }
+}