You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/11/21 21:58:45 UTC
git commit: CRUNCH-301: Clever deep copies in Scrunch code
Updated Branches:
refs/heads/master 12dea675b -> fb172fd84
CRUNCH-301: Clever deep copies in Scrunch code
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/fb172fd8
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/fb172fd8
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/fb172fd8
Branch: refs/heads/master
Commit: fb172fd84e35b4a81c4d344961259acd05f936e8
Parents: 12dea67
Author: Josh Wills <jw...@apache.org>
Authored: Wed Nov 20 18:26:38 2013 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Thu Nov 21 12:57:24 2013 -0800
----------------------------------------------------------------------
.../src/main/java/org/apache/crunch/DoFn.java | 19 ++++-
.../org/apache/crunch/types/avro/AvroType.java | 7 ++
.../crunch/types/writable/WritableType.java | 2 +
.../apache/crunch/scrunch/DeepCopyTest.scala | 89 ++++++++++++++++++++
.../org/apache/crunch/scrunch/PTypeFamily.scala | 13 ++-
5 files changed, 126 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/fb172fd8/crunch-core/src/main/java/org/apache/crunch/DoFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/DoFn.java b/crunch-core/src/main/java/org/apache/crunch/DoFn.java
index a052d09..c2ed35d 100644
--- a/crunch-core/src/main/java/org/apache/crunch/DoFn.java
+++ b/crunch-core/src/main/java/org/apache/crunch/DoFn.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.mapreduce.TaskInputOutputContext;
*/
public abstract class DoFn<S, T> implements Serializable {
private transient TaskInputOutputContext<?, ?, ?, ?> context;
+ private transient Configuration conf;
/**
* Configure this DoFn. Subclasses may override this method to modify the
@@ -106,6 +107,16 @@ public abstract class DoFn<S, T> implements Serializable {
}
/**
+ * Called during the setup of an initialized {@link org.apache.crunch.types.PType} that
+ * relies on this instance.
+ *
+ * @param conf The configuration for the {@code PType} being initialized
+ */
+ public void setConfiguration(Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
* Returns an estimate of how applying this function to a {@link PCollection}
* will cause it to change in side. The optimizer uses these estimates to
* decide where to break up dependent MR jobs into separate Map and Reduce
@@ -137,7 +148,13 @@ public abstract class DoFn<S, T> implements Serializable {
}
protected Configuration getConfiguration() {
- return context.getConfiguration();
+ if (conf != null) {
+ return conf;
+ } else if (context != null) {
+ return context.getConfiguration();
+ } else {
+ return null;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/crunch/blob/fb172fd8/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java
index a92b0d0..aea4951 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java
@@ -167,7 +167,14 @@ public class AvroType<T> implements PType<T> {
@Override
public void initialize(Configuration conf) {
+ baseInputMapFn.setConfiguration(conf);
+ baseInputMapFn.initialize();
+ baseOutputMapFn.setConfiguration(conf);
+ baseOutputMapFn.initialize();
deepCopier.initialize(conf);
+ for (PType ptype : subTypes) {
+ ptype.initialize(conf);
+ }
initialized = true;
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/fb172fd8/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableType.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableType.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableType.java
index 734946c..a7a9968 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableType.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableType.java
@@ -106,7 +106,9 @@ public class WritableType<T, W extends Writable> implements PType<T> {
@Override
public void initialize(Configuration conf) {
+ this.inputFn.setConfiguration(conf);
this.inputFn.initialize();
+ this.inputFn.setConfiguration(conf);
this.outputFn.initialize();
for (PType subType : subTypes) {
subType.initialize(conf);
http://git-wip-us.apache.org/repos/asf/crunch/blob/fb172fd8/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala
new file mode 100644
index 0000000..441a9c6
--- /dev/null
+++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.scrunch
+
+import org.apache.crunch.impl.mr.MRPipeline
+import org.apache.crunch.io.{From => from, To => to}
+import org.apache.crunch.types.avro.{Avros => A}
+import org.apache.avro.file.DataFileWriter
+import org.apache.hadoop.fs.{Path, FSDataOutputStream}
+import org.apache.hadoop.conf.Configuration
+
+import _root_.org.junit.Assert._
+import _root_.org.junit.Test
+
+case class Rec1(var k: Int, var v: String) { def this() = this(0, "") }
+case class Rec2(var k: Int, var k2: String, var v: Double) { def this() = this(0, "", 0.0) }
+case class Rec3(var k2: String, var v:Int) { def this() = this("", 0)}
+
+class DeepCopyTest extends CrunchSuite {
+ lazy val pipe = Pipeline.mapReduce[DeepCopyTest](tempDir.getDefaultConfiguration)
+
+ @Test def runDeepCopy {
+ val prefix = tempDir.getFileName("isolation")
+
+ val ones = Seq(Rec1(1, "hello"), Rec1(1, "tjena"), Rec1(2, "goodbye"))
+ val twos = Seq(Rec2(1, "a", 0.4), Rec2(1, "a", 0.5), Rec2(1, "b", 0.6), Rec2(1, "b", 0.7), Rec2(2, "c", 9.9))
+ val threes = Seq(Rec3("a", 4), Rec3("b", 5), Rec3("c", 6))
+
+ writeCollection(new Path(prefix + "/ones"), ones)
+ writeCollection(new Path(prefix + "/twos"), twos)
+ writeCollection(new Path(prefix + "/threes"), threes)
+
+ val oneF = pipe.read(from.avroFile(prefix + "/ones", A.reflects(classOf[Rec1])))
+ val twoF = pipe.read(from.avroFile(prefix + "/twos", A.reflects(classOf[Rec2])))
+ val threeF = pipe.read(from.avroFile(prefix + "/threes", A.reflects(classOf[Rec3])))
+ val res = (oneF.by(_.k)
+ cogroup
+ (twoF.by(_.k2)
+ innerJoin threeF.by(_.k2))
+ .values()
+ .by(_._1.k))
+ .values()
+ .materialize
+ .toList
+
+ // Expected results vs. actual
+ val e12 = Seq((Rec2(1, "a", 0.4), Rec3("a", 4)), (Rec2(1, "a", 0.5), Rec3("a", 4)), (Rec2(1, "b", 0.6), Rec3("b", 5)),
+ (Rec2(1, "b", 0.7), Rec3("b", 5)))
+ val e22 = Seq((Rec2(2, "c", 9.9),Rec3("c", 6)))
+ assertEquals(2, res.size)
+ assertEquals(res.map(_._2.toList), Seq(e12, e22))
+ pipe.done()
+ }
+
+ private def writeCollection(path: Path, records: Iterable[_ <: AnyRef]) {
+ writeAvroFile(path.getFileSystem(new Configuration()).create(path, true), records)
+ }
+
+ @SuppressWarnings(Array("rawtypes", "unchecked"))
+ private def writeAvroFile[T <: AnyRef](outputStream: FSDataOutputStream, records: Iterable[T]) {
+ val r: AnyRef = records.iterator.next()
+ val schema = new ScalaReflectDataFactory().getReflectData.getSchema(r.getClass)
+
+ val writer = new ScalaReflectDataFactory().getWriter[T](schema)
+ val dataFileWriter = new DataFileWriter(writer)
+ dataFileWriter.create(schema, outputStream)
+
+ for (record <- records) {
+ dataFileWriter.append(record)
+ }
+ dataFileWriter.close()
+ outputStream.close()
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/fb172fd8/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
index 40f2070..ea69d4f 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
@@ -25,8 +25,15 @@ import java.lang.{Long => JLong, Double => JDouble, Integer => JInt, Float => JF
import java.util.{Collection => JCollection}
import scala.collection.JavaConversions._
-class TMapFn[S, T](f: S => T) extends MapFn[S, T] {
- override def map(input: S) = f(input)
+class TMapFn[S, T](val f: S => T, val pt: Option[PType[S]] = None, var init: Boolean = false) extends MapFn[S, T] {
+ override def initialize() {
+ if (!pt.isEmpty && getConfiguration() != null) {
+ pt.get.initialize(getConfiguration())
+ init = true
+ }
+ }
+
+ override def map(input: S) = if (init) f(pt.get.getDetachedValue(input)) else f(input)
}
trait PTypeFamily {
@@ -40,7 +47,7 @@ trait PTypeFamily {
def records[T: ClassManifest] = ptf.records(classManifest[T].erasure)
def derived[S, T](cls: java.lang.Class[T], in: S => T, out: T => S, pt: PType[S]) = {
- ptf.derived(cls, new TMapFn[S, T](in), new TMapFn[T, S](out), pt)
+ ptf.derived(cls, new TMapFn[S, T](in, Some(pt)), new TMapFn[T, S](out), pt)
}
val longs = {