You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2012/08/22 08:26:48 UTC

[8/9] git commit: CRUNCH-16: Updates so that Avro 1.5.4 will work with the latest and greatest Crunch stuff

CRUNCH-16: Updates so that Avro 1.5.4 will work with the latest and greatest Crunch stuff

Signed-off-by: Josh Wills <jw...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/b05f1836
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/b05f1836
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/b05f1836

Branch: refs/heads/master
Commit: b05f1836cb4ee4c9bf34b202d57a536f251f82bc
Parents: 6531106
Author: jwills <jw...@apache.org>
Authored: Sat Aug 18 14:52:19 2012 -0700
Committer: Josh Wills <jw...@cloudera.com>
Committed: Tue Aug 21 18:54:29 2012 -0700

----------------------------------------------------------------------
 .../scrunch/ScalaSafeReflectDatumReader.java       |    3 +-
 .../src/it/java/org/apache/crunch/lib/SortIT.java  |    1 +
 .../apache/crunch/lib/SpecificAvroGroupByIT.java   |   16 ------
 .../java/org/apache/crunch/types/avro/Avros.java   |   43 ++++++++++++++-
 4 files changed, 44 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/b05f1836/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java
index 75db945..80f265c 100644
--- a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java
+++ b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import org.apache.avro.Schema;
 import org.apache.avro.io.ResolvingDecoder;
 import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.hadoop.util.ReflectionUtils;
 
 import scala.collection.JavaConversions;
 
@@ -111,7 +112,7 @@ public class ScalaSafeReflectDatumReader<T> extends ReflectDatumReader<T> {
           collectionClass.isAssignableFrom(ArrayList.class)) {
         return new ArrayList();
       }
-      return data.newInstance(collectionClass, schema);
+      return ReflectionUtils.newInstance(collectionClass, null);
     }
     Class elementClass = ScalaSafeReflectData.getClassProp(schema,
         ScalaSafeReflectData.ELEMENT_PROP);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/b05f1836/crunch/src/it/java/org/apache/crunch/lib/SortIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/SortIT.java b/crunch/src/it/java/org/apache/crunch/lib/SortIT.java
index dc025a7..4a22a51 100644
--- a/crunch/src/it/java/org/apache/crunch/lib/SortIT.java
+++ b/crunch/src/it/java/org/apache/crunch/lib/SortIT.java
@@ -150,6 +150,7 @@ public class SortIT implements Serializable {
   @Test
   public void testAvroReflectSortPair() throws IOException {
     Pipeline pipeline = new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration());
+    pipeline.enableDebug();
     PCollection<Pair<String, StringWrapper>> sorted = pipeline.readTextFile(tmpDir.copyResourceFileName("set2.txt"))
         .parallelDo(new MapFn<String, Pair<String, StringWrapper>>() {
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/b05f1836/crunch/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java b/crunch/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java
index 7b61813..5167dec 100644
--- a/crunch/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java
+++ b/crunch/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java
@@ -26,7 +26,6 @@ import java.io.Serializable;
 import java.util.List;
 
 import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.mapred.AvroJob;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
@@ -70,22 +69,7 @@ public class SpecificAvroGroupByIT implements Serializable {
 
   @Test
   public void testGrouByWithSpecificAvroType() throws Exception {
-
-    MRPipeline pipeline = new MRPipeline(SpecificAvroGroupByIT.class, tmpDir.getDefaultConfiguration());
-
-    testSpecificAvro(pipeline);
-  }
-
-  @Test
-  public void testGrouByOnSpecificAvroButReflectionDatumReader() throws Exception {
     MRPipeline pipeline = new MRPipeline(SpecificAvroGroupByIT.class, tmpDir.getDefaultConfiguration());
-
-    // https://issues.apache.org/jira/browse/AVRO-1046 resolves
-    // the ClassCastException when reading specific Avro types with
-    // ReflectDatumReader
-
-    pipeline.getConfiguration().setBoolean(AvroJob.MAP_OUTPUT_IS_REFLECT, true);
-
     testSpecificAvro(pipeline);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/b05f1836/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
index 9f4a21d..24391ed 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
@@ -628,11 +628,50 @@ public class Avros {
 
     @Override
     public int hashCode() {
-      return ReflectData.get().hashCode(this, getSchema());
+      return reflectAwareHashCode(this, getSchema());
     }
-
   }
 
+  /*
+   * TODO: Remove this once we no longer have to support 1.5.4.
+   */
+  private static int reflectAwareHashCode(Object o, Schema s) {
+    if (o == null) return 0;                      // incomplete datum
+    int hashCode = 1;
+    switch (s.getType()) {
+    case RECORD:
+      for (Schema.Field f : s.getFields()) {
+        if (f.order() == Schema.Field.Order.IGNORE)
+          continue;
+        hashCode = hashCodeAdd(hashCode,
+            ReflectData.get().getField(o, f.name(), f.pos()), f.schema());
+      }
+      return hashCode;
+    case ARRAY:
+      Collection<?> a = (Collection<?>)o;
+      Schema elementType = s.getElementType();
+      for (Object e : a)
+        hashCode = hashCodeAdd(hashCode, e, elementType);
+      return hashCode;
+    case UNION:
+      return reflectAwareHashCode(
+          o, s.getTypes().get(ReflectData.get().resolveUnion(s, o)));
+    case ENUM:
+      return s.getEnumOrdinal(o.toString());
+    case NULL:
+      return 0;
+    case STRING:
+      return (o instanceof Utf8 ? o : new Utf8(o.toString())).hashCode();
+    default:
+      return o.hashCode();
+    }
+  }
+
+  /** Add the hash code for an object into an accumulated hash code. */
+  private static int hashCodeAdd(int hashCode, Object o, Schema s) {
+    return 31*hashCode + reflectAwareHashCode(o, s);
+  }
+  
   private Avros() {
   }
 }