You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2014/01/04 03:09:37 UTC

[3/5] git commit: CRUNCH-314: Separate shuffle and bundle AvroMode configuration.

CRUNCH-314: Separate shuffle and bundle AvroMode configuration.

This adds an integration test, AvroModeIT, that catches the behavior
described in CRUNCH-314. The solution is to separate the
AvroMode#configure methods into configure, for sources and targets, and
configureShuffle, for SafeAvroSerialization and AvroGroupedTableType.

AvroDeepCopier and Avros also used a configure method to set the reflect
factory, which has been updated to the more specific configureFactory.

This also changes the default AvroMode to REFLECT because it is the most
general.

Signed-off-by: Josh Wills <jw...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/9d7b9e46
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/9d7b9e46
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/9d7b9e46

Branch: refs/heads/apache-crunch-0.8
Commit: 9d7b9e4615d8aaf60babebb898a9165c8119d284
Parents: f65176f
Author: Ryan Blue <rb...@cloudera.com>
Authored: Fri Dec 20 18:12:48 2013 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Fri Jan 3 17:38:42 2014 -0800

----------------------------------------------------------------------
 .../org/apache/crunch/io/avro/AvroModeIT.java   | 144 +++++++++++++++++++
 crunch-core/src/it/resources/strings-100.avro   | Bin 0 -> 451 bytes
 .../crunch/types/avro/AvroDeepCopier.java       |   5 +-
 .../crunch/types/avro/AvroGroupedTableType.java |   2 +-
 .../org/apache/crunch/types/avro/AvroMode.java  |  25 +++-
 .../org/apache/crunch/types/avro/Avros.java     |   2 +-
 .../types/avro/SafeAvroSerialization.java       |   4 +-
 pom.xml                                         |   1 +
 8 files changed, 171 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/9d7b9e46/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroModeIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroModeIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroModeIT.java
new file mode 100644
index 0000000..ff66fd7
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroModeIT.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.crunch.io.avro;
+
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Random;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.crunch.Aggregator;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.Source;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.From;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.avro.AvroMode;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AvroModeIT implements Serializable {
+
+  public static final Schema GENERIC_SCHEMA = new Schema.Parser().parse("{\n" +
+      "  \"name\": \"mystring\",\n" +
+      "  \"type\": \"record\",\n" +
+      "  \"fields\": [\n" +
+      "    { \"name\": \"text\", \"type\": \"string\" }\n" +
+      "  ]\n" +
+      "}");
+
+  static final class FloatArray {
+    private final float[] values;
+    FloatArray() {
+      this(null);
+    }
+    FloatArray(float[] values) {
+      this.values = values;
+    }
+    float[] getValues() {
+      return values;
+    }
+  }
+
+  public static AvroType<float[]> FLOAT_ARRAY = Avros.derived(float[].class,
+      new MapFn<FloatArray, float[]>() {
+        @Override
+        public float[] map(FloatArray input) {
+          return input.getValues();
+        }
+      },
+      new MapFn<float[], FloatArray>() {
+        @Override
+        public FloatArray map(float[] input) {
+          return new FloatArray(input);
+        }
+      }, Avros.reflects(FloatArray.class));
+
+  @Rule
+  public transient TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Test
+  public void testGenericReflectConflict() throws IOException {
+    final Random rand = new Random();
+    rand.setSeed(12345);
+    Configuration conf = new Configuration();
+    Pipeline pipeline = new MRPipeline(AvroModeIT.class, conf);
+    Source<GenericData.Record> source = From.avroFile(
+        tmpDir.copyResourceFileName("strings-100.avro"),
+        Avros.generics(GENERIC_SCHEMA));
+    PTable<Long, float[]> mapPhase = pipeline
+        .read(source)
+        .parallelDo(new DoFn<GenericData.Record, Pair<Long, float[]>>() {
+          @Override
+          public void process(GenericData.Record input, Emitter<Pair<Long, float[]>> emitter) {
+            emitter.emit(Pair.of(
+                Long.valueOf(input.get("text").toString().length()),
+                new float[] {rand.nextFloat(), rand.nextFloat()}));
+          }
+        }, Avros.tableOf(Avros.longs(), FLOAT_ARRAY));
+
+    PTable<Long, float[]> result = mapPhase
+        .groupByKey()
+        .combineValues(new Aggregator<float[]>() {
+          float[] accumulator = null;
+
+          @Override
+          public Iterable<float[]> results() {
+            return ImmutableList.of(accumulator);
+          }
+
+          @Override
+          public void initialize(Configuration conf) {
+          }
+
+          @Override
+          public void reset() {
+            this.accumulator = null;
+          }
+
+          @Override
+          public void update(float[] value) {
+            if (accumulator == null) {
+              accumulator = Arrays.copyOf(value, 2);
+            } else {
+              for (int i = 0; i < value.length; i += 1) {
+                accumulator[i] += value[i];
+              }
+            }
+          }
+        });
+
+    pipeline.writeTextFile(result, tmpDir.getFileName("unused"));
+    Assert.assertTrue("Should succeed", pipeline.done().succeeded());
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/9d7b9e46/crunch-core/src/it/resources/strings-100.avro
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/resources/strings-100.avro b/crunch-core/src/it/resources/strings-100.avro
new file mode 100755
index 0000000..c968b97
Binary files /dev/null and b/crunch-core/src/it/resources/strings-100.avro differ

http://git-wip-us.apache.org/repos/asf/crunch/blob/9d7b9e46/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
index 9e4b0a1..21dae45 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
@@ -152,13 +152,14 @@ abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable {
 
     @Override
     protected DatumReader<T> createDatumReader(Configuration conf) {
-      AvroMode.REFLECT.configure(conf);
+      AvroMode.REFLECT.configureFactory(conf);
       return AvroMode.REFLECT.getReader(getSchema());
     }
 
     @Override
     protected DatumWriter<T> createDatumWriter(Configuration conf) {
-      return AvroMode.fromConfiguration(conf).getWriter(getSchema());
+      AvroMode.REFLECT.setFromConfiguration(conf);
+      return AvroMode.REFLECT.getWriter(getSchema());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/9d7b9e46/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
index 62e6db4..a97f917 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
@@ -101,7 +101,7 @@ class AvroGroupedTableType<K, V> extends PGroupedTableType<K, V> {
       options.configure(job);
     }
 
-    AvroMode.fromType(att).configure(conf);
+    AvroMode.fromType(att).configureShuffle(conf);
 
     Collection<String> serializations = job.getConfiguration().getStringCollection(
         "io.serializations");

http://git-wip-us.apache.org/repos/asf/crunch/blob/9d7b9e46/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java
index 77eece1..e2646cd 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java
@@ -40,9 +40,16 @@ public enum AvroMode implements ReaderWriterFactory {
   GENERIC ("crunch.genericfactory");
 
   public static final String AVRO_MODE_PROPERTY = "crunch.avro.mode";
+  public static final String AVRO_SHUFFLE_MODE_PROPERTY = "crunch.avro.shuffle.mode";
 
   public static AvroMode fromConfiguration(Configuration conf) {
-    AvroMode mode = conf.getEnum(AVRO_MODE_PROPERTY, GENERIC);
+    AvroMode mode = conf.getEnum(AVRO_MODE_PROPERTY, REFLECT);
+    mode.setFromConfiguration(conf);
+    return mode;
+  }
+
+  public static AvroMode fromShuffleConfiguration(Configuration conf) {
+    AvroMode mode = conf.getEnum(AVRO_SHUFFLE_MODE_PROPERTY, REFLECT);
     mode.setFromConfiguration(conf);
     return mode;
   }
@@ -137,11 +144,9 @@ public enum AvroMode implements ReaderWriterFactory {
     }
   }
 
-  public void configure(Configuration conf) {
-    conf.setEnum(AVRO_MODE_PROPERTY, this);
-    if (factory != null) {
-      conf.setClass(propName, factory.getClass(), ReaderWriterFactory.class);
-    }
+  public void configureShuffle(Configuration conf) {
+    conf.setEnum(AVRO_SHUFFLE_MODE_PROPERTY, this);
+    configureFactory(conf);
   }
 
   public void configure(FormatBundle bundle) {
@@ -151,8 +156,16 @@ public enum AvroMode implements ReaderWriterFactory {
     }
   }
 
+  public void configureFactory(Configuration conf) {
+    if (factory != null) {
+      conf.setClass(propName, factory.getClass(), ReaderWriterFactory.class);
+    }
+  }
+
   @SuppressWarnings("unchecked")
   void setFromConfiguration(Configuration conf) {
+    // although the shuffle and input/output use different properties for mode,
+    // this is shared - only one ReaderWriterFactory can be used.
     Class<?> factoryClass = conf.getClass(propName, this.getClass());
     if (factoryClass != this.getClass()) {
       this.factory = (ReaderWriterFactory)

http://git-wip-us.apache.org/repos/asf/crunch/blob/9d7b9e46/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
index 3d6b04f..2cf63e8 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
@@ -116,7 +116,7 @@ public class Avros {
   @Deprecated
   public static void configureReflectDataFactory(Configuration conf) {
     AvroMode.REFLECT.override(REFLECT_DATA_FACTORY);
-    AvroMode.REFLECT.configure(conf);
+    AvroMode.REFLECT.configureFactory(conf);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/crunch/blob/9d7b9e46/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
index f56991e..9205056 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
@@ -60,7 +60,7 @@ class SafeAvroSerialization<T> extends Configured implements Serialization<AvroW
     if (conf.getBoolean(AvroJob.MAP_OUTPUT_IS_REFLECT, false)) {
       datumReader = AvroMode.REFLECT.getReader(schema);
     } else {
-      datumReader = AvroMode.fromConfiguration(conf).getReader(schema);
+      datumReader = AvroMode.fromShuffleConfiguration(conf).getReader(schema);
     }
     return new AvroWrapperDeserializer(datumReader, isKey);
   }
@@ -105,7 +105,7 @@ class SafeAvroSerialization<T> extends Configured implements Serialization<AvroW
     Schema schema = isFinalOutput ? AvroJob.getOutputSchema(conf) : (AvroKey.class.isAssignableFrom(c) ? Pair
         .getKeySchema(AvroJob.getMapOutputSchema(conf)) : Pair.getValueSchema(AvroJob.getMapOutputSchema(conf)));
 
-    ReaderWriterFactory factory = AvroMode.fromConfiguration(conf);
+    ReaderWriterFactory factory = AvroMode.fromShuffleConfiguration(conf);
     DatumWriter<T> writer = factory.getWriter(schema);
     return new AvroWrapperSerializer(writer);
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/9d7b9e46/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2694c50..98d7378 100644
--- a/pom.xml
+++ b/pom.xml
@@ -629,6 +629,7 @@ under the License.
             <exclude>.idea/**</exclude>
             <exclude>**/resources/*.txt</exclude>
             <exclude>**/resources/**/*.txt</exclude>
+            <exclude>**/resources/*.avro</exclude>
             <exclude>**/goal.txt</exclude>
             <exclude>**/target/generated-test-sources/**</exclude>
             <exclude>**/scripts/scrunch</exclude>