You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2012/08/22 08:26:48 UTC

[9/9] git commit: Add dynamic Avro capabilities checking

Add dynamic Avro capabilities checking

Add dynamic checking of Avro capabilities to change how the system
deals with various versions of Avro. The specific capability that
is checked with this addition is the reading of SpecificData
including an array, using the ReflectDatumReader.

Signed-off-by: Josh Wills <jw...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/5d9b6986
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/5d9b6986
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/5d9b6986

Branch: refs/heads/master
Commit: 5d9b69869c39e9200829378280f871a35d6dd910
Parents: dfe70ca
Author: Gabriel Reid <ga...@gmail.com>
Authored: Tue Aug 21 21:25:21 2012 +0200
Committer: Josh Wills <jw...@cloudera.com>
Committed: Tue Aug 21 18:54:29 2012 -0700

----------------------------------------------------------------------
 .../org/apache/crunch/io/avro/AvroReflectIT.java   |   68 ++++++++--
 .../apache/crunch/types/avro/AvroCapabilities.java |  106 +++++++++++++++
 .../java/org/apache/crunch/types/avro/Avros.java   |   39 +++---
 3 files changed, 184 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5d9b6986/crunch/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java b/crunch/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java
index 7c93756..93e15c0 100644
--- a/crunch/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java
+++ b/crunch/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java
@@ -21,15 +21,20 @@ import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
+import org.apache.crunch.Pair;
 import org.apache.crunch.Pipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.lib.Aggregate;
+import org.apache.crunch.test.Person;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
 import org.apache.crunch.types.avro.Avros;
+import org.junit.Assume;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -87,30 +92,71 @@ public class AvroReflectIT implements Serializable {
     }
 
   }
+
   @Rule
   public transient TemporaryPath tmpDir = TemporaryPaths.create();
 
   @Test
   public void testReflection() throws IOException {
     Pipeline pipeline = new MRPipeline(AvroReflectIT.class, tmpDir.getDefaultConfiguration());
-    PCollection<StringWrapper> stringWrapperCollection = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt"))
-        .parallelDo(new MapFn<String, StringWrapper>() {
+    PCollection<StringWrapper> stringWrapperCollection = pipeline.readTextFile(
+        tmpDir.copyResourceFileName("set1.txt")).parallelDo(new MapFn<String, StringWrapper>() {
 
-          @Override
-          public StringWrapper map(String input) {
-            StringWrapper stringWrapper = new StringWrapper();
-            stringWrapper.setValue(input);
-            return stringWrapper;
-          }
-        }, Avros.reflects(StringWrapper.class));
+      @Override
+      public StringWrapper map(String input) {
+        StringWrapper stringWrapper = new StringWrapper();
+        stringWrapper.setValue(input);
+        return stringWrapper;
+      }
+    }, Avros.reflects(StringWrapper.class));
 
     List<StringWrapper> stringWrappers = Lists.newArrayList(stringWrapperCollection.materialize());
 
     pipeline.done();
 
-    assertEquals(Lists.newArrayList(new StringWrapper("b"), new StringWrapper("c"), new StringWrapper("a"),
-        new StringWrapper("e")), stringWrappers);
+    assertEquals(Lists.newArrayList(new StringWrapper("b"), new StringWrapper("c"),
+        new StringWrapper("a"), new StringWrapper("e")), stringWrappers);
 
   }
 
+  // Verify that running with a combination of reflect and specific schema
+  // doesn't crash
+  @Test
+  public void testCombinationOfReflectionAndSpecific() throws IOException {
+    Assume.assumeTrue(Avros.CAN_COMBINE_SPECIFIC_AND_REFLECT_SCHEMAS);
+    Pipeline pipeline = new MRPipeline(AvroReflectIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<Pair<StringWrapper, Person>> hybridPairCollection = pipeline.readTextFile(
+        tmpDir.copyResourceFileName("set1.txt")).parallelDo(
+        new MapFn<String, Pair<StringWrapper, Person>>() {
+
+          @Override
+          public Pair<StringWrapper, Person> map(String input) {
+            Person person = new Person();
+            person.name = input;
+            person.age = 42;
+            person.siblingnames = Lists.<CharSequence> newArrayList(input);
+
+            return Pair.of(new StringWrapper(input), person);
+          }
+        }, Avros.pairs(Avros.reflects(StringWrapper.class), Avros.records(Person.class)));
+
+    PCollection<Pair<String, Long>> countCollection = Aggregate.count(hybridPairCollection)
+        .parallelDo(
+        new MapFn<Pair<Pair<StringWrapper, Person>, Long>, Pair<String, Long>>() {
+
+          @Override
+          public Pair<String, Long> map(Pair<Pair<StringWrapper, Person>, Long> input) {
+            return Pair.of(input.first().first().getValue(), input.second());
+          }
+        }, Avros.pairs(Avros.strings(), Avros.longs()));
+
+    List<Pair<String, Long>> materialized = Lists
+        .newArrayList(countCollection.materialize());
+    List<Pair<String, Long>> expected = Lists.newArrayList(Pair.of("a", 1L), Pair.of("b", 1L),
+        Pair.of("c", 1L), Pair.of("e", 1L));
+    Collections.sort(materialized);
+
+    assertEquals(expected, materialized);
+    pipeline.done();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5d9b6986/crunch/src/main/java/org/apache/crunch/types/avro/AvroCapabilities.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroCapabilities.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroCapabilities.java
new file mode 100644
index 0000000..cc1636c
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroCapabilities.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.avro;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.reflect.ReflectDatumWriter;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Determines the capabilities of the Avro version that is currently being used.
+ */
+class AvroCapabilities {
+
+  public static class Record extends org.apache.avro.specific.SpecificRecordBase implements
+      org.apache.avro.specific.SpecificRecord {
+    public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser()
+        .parse("{\"type\":\"record\",\"name\":\"Record\",\"namespace\":\"org.apache.crunch.types.avro\",\"fields\":[{\"name\":\"subrecords\",\"type\":{\"type\":\"array\",\"items\":\"string\"}}]}");
+    @Deprecated
+    public java.util.List<java.lang.CharSequence> subrecords;
+
+    public java.lang.Object get(int field$) {
+      switch (field$) {
+      case 0:
+        return subrecords;
+      default:
+        throw new org.apache.avro.AvroRuntimeException("Bad index");
+      }
+    }
+
+    // Used by DatumReader. Applications should not call.
+    @SuppressWarnings(value = "unchecked")
+    public void put(int field$, java.lang.Object value$) {
+      switch (field$) {
+      case 0:
+        subrecords = (java.util.List<java.lang.CharSequence>) value$;
+        break;
+      default:
+        throw new org.apache.avro.AvroRuntimeException("Bad index");
+      }
+    }
+
+    @Override
+    public Schema getSchema() {
+      return SCHEMA$;
+    }
+  }
+
+  /**
+   * Determine if the current Avro version can use the ReflectDatumReader to
+   * read SpecificData that includes an array. The inability to do this was a
+   * bug that was fixed in Avro 1.7.0.
+   * 
+   * @return true if SpecificData can be properly read using a
+   *         ReflectDatumReader
+   */
+  static boolean canDecodeSpecificSchemaWithReflectDatumReader() {
+    ReflectDatumReader<Record> datumReader = new ReflectDatumReader(Record.SCHEMA$);
+    ReflectDatumWriter<Record> datumWriter = new ReflectDatumWriter(Record.SCHEMA$);
+
+    Record record = new Record();
+    record.subrecords = Lists.<CharSequence> newArrayList("a", "b");
+
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);
+
+    try {
+      datumWriter.write(record, encoder);
+      encoder.flush();
+      BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(
+          byteArrayOutputStream.toByteArray(), null);
+      datumReader.read(record, decoder);
+    } catch (IOException ioe) {
+      throw new RuntimeException("Error performing specific schema test", ioe);
+    } catch (ClassCastException cce) {
+      // This indicates that we're using a pre-1.7.0 version of Avro, as the
+      // ReflectDatumReader in those versions could not correctly handle an
+      // array in a SpecificData value
+      return false;
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5d9b6986/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
index b3a9b7a..969af1d 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
@@ -71,13 +71,18 @@ import com.google.common.collect.Maps;
 public class Avros {
 
   /**
-   * Older versions of Avro (i.e., before 1.7.0) do not support schemas that are composed of
-   * a mix of specific and reflection-based schemas. This bit controls whether or not we
-   * allow Crunch jobs to be created that involve mixing specific and reflection-based schemas
-   * and can be overridden by the client developer.
+   * Older versions of Avro (i.e., before 1.7.0) do not support schemas that are
+   * composed of a mix of specific and reflection-based schemas. This bit
+   * controls whether or not we allow Crunch jobs to be created that involve
+   * mixing specific and reflection-based schemas and can be overridden by the
+   * client developer.
    */
-  public static boolean CAN_COMBINE_SPECIFIC_AND_REFLECT_SCHEMAS = false;
-  
+  public static final boolean CAN_COMBINE_SPECIFIC_AND_REFLECT_SCHEMAS;
+
+  static {
+    CAN_COMBINE_SPECIFIC_AND_REFLECT_SCHEMAS = AvroCapabilities.canDecodeSpecificSchemaWithReflectDatumReader();
+  }
+
   /**
    * The instance we use for generating reflected schemas. May be modified by
    * clients (e.g., Scrunch.)
@@ -106,11 +111,10 @@ public class Avros {
           + " Please consider turning your reflection-based type into an avro-generated"
           + " type and using that generated type instead."
           + " If the version of Avro you are using is 1.7.0 or greater, you can enable"
-          + " combined schemas by setting the Avros.CAN_COMBINE_SPECIFIC_AND_REFLECT_SCHEMAS"
-          + " field to 'true'.");
+          + " combined schemas by setting the Avros.CAN_COMBINE_SPECIFIC_AND_REFLECT_SCHEMAS" + " field to 'true'.");
     }
   }
-  
+
   public static MapFn<CharSequence, String> UTF8_TO_STRING = new MapFn<CharSequence, String>() {
     @Override
     public String map(CharSequence input) {
@@ -224,7 +228,7 @@ public class Avros {
     Schema schema = t.getSchema();
     return new AvroType<T>(clazz, schema, new AvroDeepCopier.AvroSpecificDeepCopier<T>(clazz, schema));
   }
-  
+
   public static final <T> AvroType<T> reflects(Class<T> clazz) {
     Schema schema = REFLECT_DATA_FACTORY.getReflectData().getSchema(clazz);
     return new AvroType<T>(clazz, schema, new AvroDeepCopier.AvroReflectDeepCopier<T>(clazz, schema));
@@ -663,26 +667,25 @@ public class Avros {
    * TODO: Remove this once we no longer have to support 1.5.4.
    */
   private static int reflectAwareHashCode(Object o, Schema s) {
-    if (o == null) return 0;                      // incomplete datum
+    if (o == null)
+      return 0; // incomplete datum
     int hashCode = 1;
     switch (s.getType()) {
     case RECORD:
       for (Schema.Field f : s.getFields()) {
         if (f.order() == Schema.Field.Order.IGNORE)
           continue;
-        hashCode = hashCodeAdd(hashCode,
-            ReflectData.get().getField(o, f.name(), f.pos()), f.schema());
+        hashCode = hashCodeAdd(hashCode, ReflectData.get().getField(o, f.name(), f.pos()), f.schema());
       }
       return hashCode;
     case ARRAY:
-      Collection<?> a = (Collection<?>)o;
+      Collection<?> a = (Collection<?>) o;
       Schema elementType = s.getElementType();
       for (Object e : a)
         hashCode = hashCodeAdd(hashCode, e, elementType);
       return hashCode;
     case UNION:
-      return reflectAwareHashCode(
-          o, s.getTypes().get(ReflectData.get().resolveUnion(s, o)));
+      return reflectAwareHashCode(o, s.getTypes().get(ReflectData.get().resolveUnion(s, o)));
     case ENUM:
       return s.getEnumOrdinal(o.toString());
     case NULL:
@@ -696,9 +699,9 @@ public class Avros {
 
   /** Add the hash code for an object into an accumulated hash code. */
   private static int hashCodeAdd(int hashCode, Object o, Schema s) {
-    return 31*hashCode + reflectAwareHashCode(o, s);
+    return 31 * hashCode + reflectAwareHashCode(o, s);
   }
-  
+
   private Avros() {
   }
 }