You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2014/06/17 00:08:08 UTC

git commit: CRUNCH-417: Avro/Scrunch integration improvements and tests

Repository: crunch
Updated Branches:
  refs/heads/master 9898ee92f -> 2ef8c1678


CRUNCH-417: Avro/Scrunch integration improvements and tests


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/2ef8c167
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/2ef8c167
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/2ef8c167

Branch: refs/heads/master
Commit: 2ef8c1678369fd84705a24ac99d35be7a744fada
Parents: 9898ee9
Author: Josh Wills <jw...@apache.org>
Authored: Fri Jun 13 20:40:29 2014 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Mon Jun 16 08:14:59 2014 -0700

----------------------------------------------------------------------
 .../types/avro/SafeAvroSerialization.java       |  2 +-
 .../crunch/scrunch/AvroReflectionTest.scala     | 55 ++++++++++++++++++++
 .../apache/crunch/scrunch/DeepCopyTest.scala    | 43 +++++++++++++--
 .../crunch/scrunch/ScalaSafeReflectData.java    |  6 +++
 .../scrunch/ScalaSafeReflectDatumReader.java    | 24 ++++-----
 .../org/apache/crunch/scrunch/PTypeFamily.scala | 12 ++++-
 6 files changed, 125 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/2ef8c167/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
index 9205056..59af994 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
@@ -58,7 +58,7 @@ class SafeAvroSerialization<T> extends Configured implements Serialization<AvroW
 
     DatumReader<T> datumReader = null;
     if (conf.getBoolean(AvroJob.MAP_OUTPUT_IS_REFLECT, false)) {
-      datumReader = AvroMode.REFLECT.getReader(schema);
+      datumReader = AvroMode.REFLECT.withFactoryFromConfiguration(conf).getReader(schema);
     } else {
       datumReader = AvroMode.fromShuffleConfiguration(conf).getReader(schema);
     }

http://git-wip-us.apache.org/repos/asf/crunch/blob/2ef8c167/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/AvroReflectionTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/AvroReflectionTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/AvroReflectionTest.scala
new file mode 100644
index 0000000..047df5d
--- /dev/null
+++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/AvroReflectionTest.scala
@@ -0,0 +1,55 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.crunch.scrunch
+
+import java.nio.ByteBuffer
+import org.junit.Test
+import org.apache.crunch.types.PType
+
+class AvroRecord1(var ba: Array[Byte], var bbl: Array[ByteBuffer]) {
+  def this() { this(null, null) }
+}
+
+class AvroReflectionTest extends CrunchSuite {
+
+  def assertEquals[T](t: T, pt: PType[T]) = {
+    t.equals(pt.getInputMapFn().map(pt.getOutputMapFn().map(t)))
+  }
+
+  @Test def testAvroRecord1 {
+    val pt = Avros.reflects[AvroRecord1]
+    val r = new AvroRecord1(Array[Byte](127),
+      Array[ByteBuffer](ByteBuffer.wrap(Array[Byte](4, 13, 12)), ByteBuffer.wrap(Array[Byte](13, 14, 10))))
+    assertEquals(r, pt)
+  }
+
+  @Test def runAvroRecord1 {
+    val shakes = tempDir.copyResourceFileName("shakes.txt")
+    val p = Pipeline.mapReduce(classOf[AvroReflectionTest], tempDir.getDefaultConfiguration)
+    p.read(From.textFile(shakes, Avros.strings))
+      .map(x => new AvroRecord1(Array[Byte](1), Array[ByteBuffer](ByteBuffer.wrap(Array[Byte](2)))))
+      .by(x => 1)
+      .groupByKey(1)
+      .ungroup()
+      .write(To.avroFile(tempDir.getFileName("out")))
+    p.done()
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/2ef8c167/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala
index 816993b..25febc2 100644
--- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala
+++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala
@@ -25,14 +25,51 @@ import org.apache.hadoop.conf.Configuration
 
 import _root_.org.junit.Assert._
 import _root_.org.junit.Test
+import java.nio.ByteBuffer
 
 case class Rec1(var k: Int, var v: String) { def this() = this(0, "") }
 case class Rec2(var k: Int, var k2: String, var v: Double) { def this() = this(0, "", 0.0) }
 case class Rec3(var k2: String, var v:Int) { def this() = this("", 0)}
 
+case class BBRec(var k: ByteBuffer, var ll: Array[ByteBuffer]) { def this() = this(null, null) }
+
+object DeepCopyTest {
+  def getIterator(bbr: BBRec) = new Iterator[(ByteBuffer, ByteBuffer)] {
+    val nested = bbr.ll.iterator
+
+    def hasNext() = nested.hasNext
+
+    def next() = (bbr.k, nested.next)
+  }
+}
+
 class DeepCopyTest extends CrunchSuite {
+  import DeepCopyTest._
+
   lazy val pipe = Pipeline.mapReduce[DeepCopyTest](tempDir.getDefaultConfiguration)
 
+  @Test def runDeepCopyBB {
+    val prefix = tempDir.getFileName("bytebuffers")
+    val bb1 = ByteBuffer.wrap(Array[Byte](1, 2))
+    val bb2 = ByteBuffer.wrap(Array[Byte](3, 4))
+    val bb3 = ByteBuffer.wrap(Array[Byte](5, 6))
+    val bb4 = ByteBuffer.wrap(Array[Byte](7, 8))
+
+    val ones = Seq(BBRec(bb1, Array(bb4, bb2)), BBRec(bb2, Array(bb1, bb3)))
+    val twos = Seq(BBRec(bb3, Array(bb1, bb2)), BBRec(bb4, Array(bb3, bb4)))
+    writeCollection(new Path(prefix + "/ones"), ones)
+    writeCollection(new Path(prefix + "/twos"), twos)
+
+    val oneF = pipe.read(from.avroFile(prefix + "/ones", Avros.reflects[BBRec]))
+    val twoF = pipe.read(from.avroFile(prefix + "/twos", Avros.reflects[BBRec]))
+
+    val m = oneF.flatMap(getIterator(_)).leftJoin(twoF.flatMap(getIterator(_)))
+      .keys
+      .materialize
+    assert(m.size == 4)
+    pipe.done()
+  }
+
   @Test def runDeepCopy {
     val prefix = tempDir.getFileName("isolation")
 
@@ -73,9 +110,9 @@ class DeepCopyTest extends CrunchSuite {
   @SuppressWarnings(Array("rawtypes", "unchecked"))
   private def writeAvroFile[T <: AnyRef](outputStream: FSDataOutputStream, records: Iterable[T]) {
     val r: AnyRef = records.iterator.next()
-    val schema = new ScalaReflectDataFactory().getData.getSchema(r.getClass)
-
-    val writer = new ScalaReflectDataFactory().getWriter[T](schema)
+    val factory = new ScalaReflectDataFactory()
+    val schema = factory.getData().getSchema(r.getClass)
+    val writer = factory.getWriter[T](schema)
     val dataFileWriter = new DataFileWriter(writer)
     dataFileWriter.create(schema, outputStream)
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/2ef8c167/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectData.java
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectData.java b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectData.java
index 6885f3e..fabc8ff 100644
--- a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectData.java
+++ b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectData.java
@@ -22,6 +22,7 @@ import java.lang.reflect.GenericArrayType;
 import java.lang.reflect.Modifier;
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.LinkedHashMap;
@@ -133,11 +134,16 @@ public class ScalaSafeReflectData extends ReflectData.AllowNull {
           return result;
         }
         Schema result = Schema.createArray(createSchema(component, names));
+        result.addProp(CLASS_PROP, c.getName());
+        result.addProp(ELEMENT_PROP, component.getName());
         setElement(result, component);
         return result;
       }
       if (CharSequence.class.isAssignableFrom(c))            // String
         return Schema.create(Schema.Type.STRING);
+      if (ByteBuffer.class.isAssignableFrom(c)) {
+        return Schema.create(Schema.Type.BYTES);
+      }
       String fullName = c.getName();
       Schema schema = names.get(fullName);
       if (schema == null) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/2ef8c167/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java
index 0db545e..c03d17f 100644
--- a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java
+++ b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java
@@ -83,32 +83,32 @@ public class ScalaSafeReflectDatumReader<T> extends ReflectDatumReader<T> {
   private static scala.collection.Iterable toIter(Object array) {
     return JavaConversions.collectionAsScalaIterable((Collection) array);
   }
-  
+
   @Override
   @SuppressWarnings("unchecked")
   protected Object newArray(Object old, int size, Schema schema) {
-    Class collectionClass =
-        ScalaSafeReflectData.getClassProp(schema, ScalaSafeReflectData.CLASS_PROP);
-    Class elementClass =
-        ScalaSafeReflectData.getClassProp(schema, ScalaSafeReflectData.ELEMENT_PROP);
-
+    Class collectionClass = ScalaSafeReflectData.getClassProp(schema,
+        ScalaSafeReflectData.CLASS_PROP);
+    Class elementClass = ScalaSafeReflectData.getClassProp(schema,
+        ScalaSafeReflectData.ELEMENT_PROP);
     if (collectionClass == null && elementClass == null)
-      return super.newArray(old, size, schema);   // use specific/generic
-
+      return super.newArray(old, size, schema); // use specific/generic
     ScalaSafeReflectData data = ScalaSafeReflectData.getInstance();
+
     if (collectionClass != null && !collectionClass.isArray()) {
       if (old instanceof Collection) {
         ((Collection)old).clear();
         return old;
       }
       if (scala.collection.Iterable.class.isAssignableFrom(collectionClass) ||
-          collectionClass.isAssignableFrom(ArrayList.class))
-        return new ArrayList();
+          collectionClass.isAssignableFrom(ArrayList.class)) {
+        return Lists.newArrayList();
+      }
       return data.newInstance(collectionClass, schema);
     }
-
-    if (elementClass == null)
+    if (elementClass == null) {
       elementClass = data.getClass(schema.getElementType());
+    }
     return Array.newInstance(elementClass, size);
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/2ef8c167/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
index aadb026..bd03f6f 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
@@ -36,7 +36,15 @@ class TMapFn[S, T](val f: S => T, val pt: Option[PType[S]] = None, var init: Boo
     }
   }
 
-  override def map(input: S) = if (init) f(pt.get.getDetachedValue(input)) else f(input)
+  override def map(input: S): T = {
+    if (input == null) {
+      return null.asInstanceOf[T]
+    } else if (init) {
+      return f(pt.get.getDetachedValue(input))
+    } else {
+      return f(input)
+    }
+  }
 }
 
 trait PTypeFamily {
@@ -174,6 +182,8 @@ object Avros extends PTypeFamily {
   override def writables[T <: Writable : ClassTag] = CAvros.writables(
     implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]])
 
+  override def records[T: ClassTag] = reflects()(implicitly[ClassTag[T]])
+
   def specifics[T <: SpecificRecord : ClassTag]() = {
     CAvros.specifics(implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]])
   }