You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2012/08/22 08:26:48 UTC
[8/9] git commit: CRUNCH-16: Updates so that Avro 1.5.4 will work
with the latest and greatest Crunch stuff
CRUNCH-16: Updates so that Avro 1.5.4 will work with the latest and greatest Crunch stuff
Signed-off-by: Josh Wills <jw...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/b05f1836
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/b05f1836
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/b05f1836
Branch: refs/heads/master
Commit: b05f1836cb4ee4c9bf34b202d57a536f251f82bc
Parents: 6531106
Author: jwills <jw...@apache.org>
Authored: Sat Aug 18 14:52:19 2012 -0700
Committer: Josh Wills <jw...@cloudera.com>
Committed: Tue Aug 21 18:54:29 2012 -0700
----------------------------------------------------------------------
.../scrunch/ScalaSafeReflectDatumReader.java | 3 +-
.../src/it/java/org/apache/crunch/lib/SortIT.java | 1 +
.../apache/crunch/lib/SpecificAvroGroupByIT.java | 16 ------
.../java/org/apache/crunch/types/avro/Avros.java | 43 ++++++++++++++-
4 files changed, 44 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/b05f1836/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java
index 75db945..80f265c 100644
--- a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java
+++ b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java
@@ -26,6 +26,7 @@ import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.io.ResolvingDecoder;
import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.hadoop.util.ReflectionUtils;
import scala.collection.JavaConversions;
@@ -111,7 +112,7 @@ public class ScalaSafeReflectDatumReader<T> extends ReflectDatumReader<T> {
collectionClass.isAssignableFrom(ArrayList.class)) {
return new ArrayList();
}
- return data.newInstance(collectionClass, schema);
+ return ReflectionUtils.newInstance(collectionClass, null);
}
Class elementClass = ScalaSafeReflectData.getClassProp(schema,
ScalaSafeReflectData.ELEMENT_PROP);
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/b05f1836/crunch/src/it/java/org/apache/crunch/lib/SortIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/SortIT.java b/crunch/src/it/java/org/apache/crunch/lib/SortIT.java
index dc025a7..4a22a51 100644
--- a/crunch/src/it/java/org/apache/crunch/lib/SortIT.java
+++ b/crunch/src/it/java/org/apache/crunch/lib/SortIT.java
@@ -150,6 +150,7 @@ public class SortIT implements Serializable {
@Test
public void testAvroReflectSortPair() throws IOException {
Pipeline pipeline = new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration());
+ pipeline.enableDebug();
PCollection<Pair<String, StringWrapper>> sorted = pipeline.readTextFile(tmpDir.copyResourceFileName("set2.txt"))
.parallelDo(new MapFn<String, Pair<String, StringWrapper>>() {
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/b05f1836/crunch/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java b/crunch/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java
index 7b61813..5167dec 100644
--- a/crunch/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java
+++ b/crunch/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java
@@ -26,7 +26,6 @@ import java.io.Serializable;
import java.util.List;
import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.mapred.AvroJob;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.crunch.MapFn;
import org.apache.crunch.PCollection;
@@ -70,22 +69,7 @@ public class SpecificAvroGroupByIT implements Serializable {
@Test
public void testGrouByWithSpecificAvroType() throws Exception {
-
- MRPipeline pipeline = new MRPipeline(SpecificAvroGroupByIT.class, tmpDir.getDefaultConfiguration());
-
- testSpecificAvro(pipeline);
- }
-
- @Test
- public void testGrouByOnSpecificAvroButReflectionDatumReader() throws Exception {
MRPipeline pipeline = new MRPipeline(SpecificAvroGroupByIT.class, tmpDir.getDefaultConfiguration());
-
- // https://issues.apache.org/jira/browse/AVRO-1046 resolves
- // the ClassCastException when reading specific Avro types with
- // ReflectDatumReader
-
- pipeline.getConfiguration().setBoolean(AvroJob.MAP_OUTPUT_IS_REFLECT, true);
-
testSpecificAvro(pipeline);
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/b05f1836/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
index 9f4a21d..24391ed 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
@@ -628,11 +628,50 @@ public class Avros {
@Override
public int hashCode() {
- return ReflectData.get().hashCode(this, getSchema());
+ return reflectAwareHashCode(this, getSchema());
}
-
}
+ /*
+ * TODO: Remove this once we no longer have to support 1.5.4.
+ */
+ private static int reflectAwareHashCode(Object o, Schema s) {
+ if (o == null) return 0; // incomplete datum
+ int hashCode = 1;
+ switch (s.getType()) {
+ case RECORD:
+ for (Schema.Field f : s.getFields()) {
+ if (f.order() == Schema.Field.Order.IGNORE)
+ continue;
+ hashCode = hashCodeAdd(hashCode,
+ ReflectData.get().getField(o, f.name(), f.pos()), f.schema());
+ }
+ return hashCode;
+ case ARRAY:
+ Collection<?> a = (Collection<?>)o;
+ Schema elementType = s.getElementType();
+ for (Object e : a)
+ hashCode = hashCodeAdd(hashCode, e, elementType);
+ return hashCode;
+ case UNION:
+ return reflectAwareHashCode(
+ o, s.getTypes().get(ReflectData.get().resolveUnion(s, o)));
+ case ENUM:
+ return s.getEnumOrdinal(o.toString());
+ case NULL:
+ return 0;
+ case STRING:
+ return (o instanceof Utf8 ? o : new Utf8(o.toString())).hashCode();
+ default:
+ return o.hashCode();
+ }
+ }
+
+ /** Add the hash code for an object into an accumulated hash code. */
+ private static int hashCodeAdd(int hashCode, Object o, Schema s) {
+ return 31*hashCode + reflectAwareHashCode(o, s);
+ }
+
private Avros() {
}
}