You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2014/06/17 00:08:08 UTC
git commit: CRUNCH-417: Avro/Scrunch integration improvements and
tests
Repository: crunch
Updated Branches:
refs/heads/master 9898ee92f -> 2ef8c1678
CRUNCH-417: Avro/Scrunch integration improvements and tests
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/2ef8c167
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/2ef8c167
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/2ef8c167
Branch: refs/heads/master
Commit: 2ef8c1678369fd84705a24ac99d35be7a744fada
Parents: 9898ee9
Author: Josh Wills <jw...@apache.org>
Authored: Fri Jun 13 20:40:29 2014 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Mon Jun 16 08:14:59 2014 -0700
----------------------------------------------------------------------
.../types/avro/SafeAvroSerialization.java | 2 +-
.../crunch/scrunch/AvroReflectionTest.scala | 55 ++++++++++++++++++++
.../apache/crunch/scrunch/DeepCopyTest.scala | 43 +++++++++++++--
.../crunch/scrunch/ScalaSafeReflectData.java | 6 +++
.../scrunch/ScalaSafeReflectDatumReader.java | 24 ++++-----
.../org/apache/crunch/scrunch/PTypeFamily.scala | 12 ++++-
6 files changed, 125 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/2ef8c167/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
index 9205056..59af994 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
@@ -58,7 +58,7 @@ class SafeAvroSerialization<T> extends Configured implements Serialization<AvroW
DatumReader<T> datumReader = null;
if (conf.getBoolean(AvroJob.MAP_OUTPUT_IS_REFLECT, false)) {
- datumReader = AvroMode.REFLECT.getReader(schema);
+ datumReader = AvroMode.REFLECT.withFactoryFromConfiguration(conf).getReader(schema);
} else {
datumReader = AvroMode.fromShuffleConfiguration(conf).getReader(schema);
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/2ef8c167/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/AvroReflectionTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/AvroReflectionTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/AvroReflectionTest.scala
new file mode 100644
index 0000000..047df5d
--- /dev/null
+++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/AvroReflectionTest.scala
@@ -0,0 +1,55 @@
+/*
+ * *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.crunch.scrunch
+
+import java.nio.ByteBuffer
+import org.junit.Test
+import org.apache.crunch.types.PType
+
+class AvroRecord1(var ba: Array[Byte], var bbl: Array[ByteBuffer]) {
+ def this() { this(null, null) }
+}
+
+class AvroReflectionTest extends CrunchSuite {
+
+ def assertEquals[T](t: T, pt: PType[T]) = {
+ t.equals(pt.getInputMapFn().map(pt.getOutputMapFn().map(t)))
+ }
+
+ @Test def testAvroRecord1 {
+ val pt = Avros.reflects[AvroRecord1]
+ val r = new AvroRecord1(Array[Byte](127),
+ Array[ByteBuffer](ByteBuffer.wrap(Array[Byte](4, 13, 12)), ByteBuffer.wrap(Array[Byte](13, 14, 10))))
+ assertEquals(r, pt)
+ }
+
+ @Test def runAvroRecord1 {
+ val shakes = tempDir.copyResourceFileName("shakes.txt")
+ val p = Pipeline.mapReduce(classOf[AvroReflectionTest], tempDir.getDefaultConfiguration)
+ p.read(From.textFile(shakes, Avros.strings))
+ .map(x => new AvroRecord1(Array[Byte](1), Array[ByteBuffer](ByteBuffer.wrap(Array[Byte](2)))))
+ .by(x => 1)
+ .groupByKey(1)
+ .ungroup()
+ .write(To.avroFile(tempDir.getFileName("out")))
+ p.done()
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/2ef8c167/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala
index 816993b..25febc2 100644
--- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala
+++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala
@@ -25,14 +25,51 @@ import org.apache.hadoop.conf.Configuration
import _root_.org.junit.Assert._
import _root_.org.junit.Test
+import java.nio.ByteBuffer
case class Rec1(var k: Int, var v: String) { def this() = this(0, "") }
case class Rec2(var k: Int, var k2: String, var v: Double) { def this() = this(0, "", 0.0) }
case class Rec3(var k2: String, var v:Int) { def this() = this("", 0)}
+case class BBRec(var k: ByteBuffer, var ll: Array[ByteBuffer]) { def this() = this(null, null) }
+
+object DeepCopyTest {
+ def getIterator(bbr: BBRec) = new Iterator[(ByteBuffer, ByteBuffer)] {
+ val nested = bbr.ll.iterator
+
+ def hasNext() = nested.hasNext
+
+ def next() = (bbr.k, nested.next)
+ }
+}
+
class DeepCopyTest extends CrunchSuite {
+ import DeepCopyTest._
+
lazy val pipe = Pipeline.mapReduce[DeepCopyTest](tempDir.getDefaultConfiguration)
+ @Test def runDeepCopyBB {
+ val prefix = tempDir.getFileName("bytebuffers")
+ val bb1 = ByteBuffer.wrap(Array[Byte](1, 2))
+ val bb2 = ByteBuffer.wrap(Array[Byte](3, 4))
+ val bb3 = ByteBuffer.wrap(Array[Byte](5, 6))
+ val bb4 = ByteBuffer.wrap(Array[Byte](7, 8))
+
+ val ones = Seq(BBRec(bb1, Array(bb4, bb2)), BBRec(bb2, Array(bb1, bb3)))
+ val twos = Seq(BBRec(bb3, Array(bb1, bb2)), BBRec(bb4, Array(bb3, bb4)))
+ writeCollection(new Path(prefix + "/ones"), ones)
+ writeCollection(new Path(prefix + "/twos"), twos)
+
+ val oneF = pipe.read(from.avroFile(prefix + "/ones", Avros.reflects[BBRec]))
+ val twoF = pipe.read(from.avroFile(prefix + "/twos", Avros.reflects[BBRec]))
+
+ val m = oneF.flatMap(getIterator(_)).leftJoin(twoF.flatMap(getIterator(_)))
+ .keys
+ .materialize
+ assert(m.size == 4)
+ pipe.done()
+ }
+
@Test def runDeepCopy {
val prefix = tempDir.getFileName("isolation")
@@ -73,9 +110,9 @@ class DeepCopyTest extends CrunchSuite {
@SuppressWarnings(Array("rawtypes", "unchecked"))
private def writeAvroFile[T <: AnyRef](outputStream: FSDataOutputStream, records: Iterable[T]) {
val r: AnyRef = records.iterator.next()
- val schema = new ScalaReflectDataFactory().getData.getSchema(r.getClass)
-
- val writer = new ScalaReflectDataFactory().getWriter[T](schema)
+ val factory = new ScalaReflectDataFactory()
+ val schema = factory.getData().getSchema(r.getClass)
+ val writer = factory.getWriter[T](schema)
val dataFileWriter = new DataFileWriter(writer)
dataFileWriter.create(schema, outputStream)
http://git-wip-us.apache.org/repos/asf/crunch/blob/2ef8c167/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectData.java
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectData.java b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectData.java
index 6885f3e..fabc8ff 100644
--- a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectData.java
+++ b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectData.java
@@ -22,6 +22,7 @@ import java.lang.reflect.GenericArrayType;
import java.lang.reflect.Modifier;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
@@ -133,11 +134,16 @@ public class ScalaSafeReflectData extends ReflectData.AllowNull {
return result;
}
Schema result = Schema.createArray(createSchema(component, names));
+ result.addProp(CLASS_PROP, c.getName());
+ result.addProp(ELEMENT_PROP, component.getName());
setElement(result, component);
return result;
}
if (CharSequence.class.isAssignableFrom(c)) // String
return Schema.create(Schema.Type.STRING);
+ if (ByteBuffer.class.isAssignableFrom(c)) {
+ return Schema.create(Schema.Type.BYTES);
+ }
String fullName = c.getName();
Schema schema = names.get(fullName);
if (schema == null) {
http://git-wip-us.apache.org/repos/asf/crunch/blob/2ef8c167/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java
index 0db545e..c03d17f 100644
--- a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java
+++ b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java
@@ -83,32 +83,32 @@ public class ScalaSafeReflectDatumReader<T> extends ReflectDatumReader<T> {
private static scala.collection.Iterable toIter(Object array) {
return JavaConversions.collectionAsScalaIterable((Collection) array);
}
-
+
@Override
@SuppressWarnings("unchecked")
protected Object newArray(Object old, int size, Schema schema) {
- Class collectionClass =
- ScalaSafeReflectData.getClassProp(schema, ScalaSafeReflectData.CLASS_PROP);
- Class elementClass =
- ScalaSafeReflectData.getClassProp(schema, ScalaSafeReflectData.ELEMENT_PROP);
-
+ Class collectionClass = ScalaSafeReflectData.getClassProp(schema,
+ ScalaSafeReflectData.CLASS_PROP);
+ Class elementClass = ScalaSafeReflectData.getClassProp(schema,
+ ScalaSafeReflectData.ELEMENT_PROP);
if (collectionClass == null && elementClass == null)
- return super.newArray(old, size, schema); // use specific/generic
-
+ return super.newArray(old, size, schema); // use specific/generic
ScalaSafeReflectData data = ScalaSafeReflectData.getInstance();
+
if (collectionClass != null && !collectionClass.isArray()) {
if (old instanceof Collection) {
((Collection)old).clear();
return old;
}
if (scala.collection.Iterable.class.isAssignableFrom(collectionClass) ||
- collectionClass.isAssignableFrom(ArrayList.class))
- return new ArrayList();
+ collectionClass.isAssignableFrom(ArrayList.class)) {
+ return Lists.newArrayList();
+ }
return data.newInstance(collectionClass, schema);
}
-
- if (elementClass == null)
+ if (elementClass == null) {
elementClass = data.getClass(schema.getElementType());
+ }
return Array.newInstance(elementClass, size);
}
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/2ef8c167/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
index aadb026..bd03f6f 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
@@ -36,7 +36,15 @@ class TMapFn[S, T](val f: S => T, val pt: Option[PType[S]] = None, var init: Boo
}
}
- override def map(input: S) = if (init) f(pt.get.getDetachedValue(input)) else f(input)
+ override def map(input: S): T = {
+ if (input == null) {
+ return null.asInstanceOf[T]
+ } else if (init) {
+ return f(pt.get.getDetachedValue(input))
+ } else {
+ return f(input)
+ }
+ }
}
trait PTypeFamily {
@@ -174,6 +182,8 @@ object Avros extends PTypeFamily {
override def writables[T <: Writable : ClassTag] = CAvros.writables(
implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]])
+ override def records[T: ClassTag] = reflects()(implicitly[ClassTag[T]])
+
def specifics[T <: SpecificRecord : ClassTag]() = {
CAvros.specifics(implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]])
}