You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/11/21 21:58:45 UTC

git commit: CRUNCH-301: Clever deep copies in Scrunch code

Updated Branches:
  refs/heads/master 12dea675b -> fb172fd84


CRUNCH-301: Clever deep copies in Scrunch code


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/fb172fd8
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/fb172fd8
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/fb172fd8

Branch: refs/heads/master
Commit: fb172fd84e35b4a81c4d344961259acd05f936e8
Parents: 12dea67
Author: Josh Wills <jw...@apache.org>
Authored: Wed Nov 20 18:26:38 2013 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Thu Nov 21 12:57:24 2013 -0800

----------------------------------------------------------------------
 .../src/main/java/org/apache/crunch/DoFn.java   | 19 ++++-
 .../org/apache/crunch/types/avro/AvroType.java  |  7 ++
 .../crunch/types/writable/WritableType.java     |  2 +
 .../apache/crunch/scrunch/DeepCopyTest.scala    | 89 ++++++++++++++++++++
 .../org/apache/crunch/scrunch/PTypeFamily.scala | 13 ++-
 5 files changed, 126 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/fb172fd8/crunch-core/src/main/java/org/apache/crunch/DoFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/DoFn.java b/crunch-core/src/main/java/org/apache/crunch/DoFn.java
index a052d09..c2ed35d 100644
--- a/crunch-core/src/main/java/org/apache/crunch/DoFn.java
+++ b/crunch-core/src/main/java/org/apache/crunch/DoFn.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.mapreduce.TaskInputOutputContext;
  */
 public abstract class DoFn<S, T> implements Serializable {
   private transient TaskInputOutputContext<?, ?, ?, ?> context;
+  private transient Configuration conf;
 
   /**
    * Configure this DoFn. Subclasses may override this method to modify the
@@ -106,6 +107,16 @@ public abstract class DoFn<S, T> implements Serializable {
   }
 
   /**
+   * Called during the setup of an initialized {@link org.apache.crunch.types.PType} that
+   * relies on this instance.
+   *
+   * @param conf The configuration for the {@code PType} being initialized
+   */
+  public void setConfiguration(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
    * Returns an estimate of how applying this function to a {@link PCollection}
    * will cause it to change in side. The optimizer uses these estimates to
    * decide where to break up dependent MR jobs into separate Map and Reduce
@@ -137,7 +148,13 @@ public abstract class DoFn<S, T> implements Serializable {
   }
 
   protected Configuration getConfiguration() {
-    return context.getConfiguration();
+    if (conf != null) {
+      return conf;
+    } else if (context != null) {
+      return context.getConfiguration();
+    } else {
+      return null;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/crunch/blob/fb172fd8/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java
index a92b0d0..aea4951 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java
@@ -167,7 +167,14 @@ public class AvroType<T> implements PType<T> {
 
   @Override
   public void initialize(Configuration conf) {
+    baseInputMapFn.setConfiguration(conf);
+    baseInputMapFn.initialize();
+    baseOutputMapFn.setConfiguration(conf);
+    baseOutputMapFn.initialize();
     deepCopier.initialize(conf);
+    for (PType ptype : subTypes) {
+      ptype.initialize(conf);
+    }
     initialized = true;
   }
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/fb172fd8/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableType.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableType.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableType.java
index 734946c..a7a9968 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableType.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableType.java
@@ -106,7 +106,9 @@ public class WritableType<T, W extends Writable> implements PType<T> {
 
   @Override
   public void initialize(Configuration conf) {
+    this.inputFn.setConfiguration(conf);
     this.inputFn.initialize();
+    this.inputFn.setConfiguration(conf);
     this.outputFn.initialize();
     for (PType subType : subTypes) {
       subType.initialize(conf);

http://git-wip-us.apache.org/repos/asf/crunch/blob/fb172fd8/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala
new file mode 100644
index 0000000..441a9c6
--- /dev/null
+++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.scrunch
+
+import org.apache.crunch.impl.mr.MRPipeline
+import org.apache.crunch.io.{From => from, To => to}
+import org.apache.crunch.types.avro.{Avros => A}
+import org.apache.avro.file.DataFileWriter
+import org.apache.hadoop.fs.{Path, FSDataOutputStream}
+import org.apache.hadoop.conf.Configuration
+
+import _root_.org.junit.Assert._
+import _root_.org.junit.Test
+
+case class Rec1(var k: Int, var v: String) { def this() = this(0, "") }
+case class Rec2(var k: Int, var k2: String, var v: Double) { def this() = this(0, "", 0.0) }
+case class Rec3(var k2: String, var v:Int) { def this() = this("", 0)}
+
+class DeepCopyTest extends CrunchSuite {
+  lazy val pipe = Pipeline.mapReduce[DeepCopyTest](tempDir.getDefaultConfiguration)
+
+  @Test def runDeepCopy {
+    val prefix = tempDir.getFileName("isolation")
+
+    val ones = Seq(Rec1(1, "hello"), Rec1(1, "tjena"), Rec1(2, "goodbye"))
+    val twos = Seq(Rec2(1, "a", 0.4), Rec2(1, "a", 0.5), Rec2(1, "b", 0.6), Rec2(1, "b", 0.7), Rec2(2, "c", 9.9))
+    val threes = Seq(Rec3("a", 4), Rec3("b", 5), Rec3("c", 6))
+
+    writeCollection(new Path(prefix + "/ones"), ones)
+    writeCollection(new Path(prefix + "/twos"), twos)
+    writeCollection(new Path(prefix + "/threes"), threes)
+
+    val oneF = pipe.read(from.avroFile(prefix + "/ones", A.reflects(classOf[Rec1])))
+    val twoF = pipe.read(from.avroFile(prefix + "/twos", A.reflects(classOf[Rec2])))
+    val threeF = pipe.read(from.avroFile(prefix + "/threes", A.reflects(classOf[Rec3])))
+    val res = (oneF.by(_.k)
+      cogroup
+      (twoF.by(_.k2)
+         innerJoin threeF.by(_.k2))
+        .values()
+        .by(_._1.k))
+      .values()
+      .materialize
+      .toList
+
+    // Expected results vs. actual
+    val e12 = Seq((Rec2(1, "a", 0.4), Rec3("a", 4)), (Rec2(1, "a", 0.5), Rec3("a", 4)), (Rec2(1, "b", 0.6), Rec3("b", 5)),
+        (Rec2(1, "b", 0.7), Rec3("b", 5)))
+    val e22 = Seq((Rec2(2, "c", 9.9),Rec3("c", 6)))
+    assertEquals(2, res.size)
+    assertEquals(res.map(_._2.toList), Seq(e12, e22))
+    pipe.done()
+  }
+
+  private def writeCollection(path: Path, records: Iterable[_ <: AnyRef]) {
+    writeAvroFile(path.getFileSystem(new Configuration()).create(path, true), records)
+  }
+
+  @SuppressWarnings(Array("rawtypes", "unchecked"))
+  private def writeAvroFile[T <: AnyRef](outputStream: FSDataOutputStream, records: Iterable[T]) {
+    val r: AnyRef = records.iterator.next()
+    val schema = new ScalaReflectDataFactory().getReflectData.getSchema(r.getClass)
+
+    val writer = new ScalaReflectDataFactory().getWriter[T](schema)
+    val dataFileWriter = new DataFileWriter(writer)
+    dataFileWriter.create(schema, outputStream)
+
+    for (record <- records) {
+      dataFileWriter.append(record)
+    }
+    dataFileWriter.close()
+    outputStream.close()
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/fb172fd8/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
index 40f2070..ea69d4f 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
@@ -25,8 +25,15 @@ import java.lang.{Long => JLong, Double => JDouble, Integer => JInt, Float => JF
 import java.util.{Collection => JCollection}
 import scala.collection.JavaConversions._
 
-class TMapFn[S, T](f: S => T) extends MapFn[S, T] {
-  override def map(input: S) = f(input)
+class TMapFn[S, T](val f: S => T, val pt: Option[PType[S]] = None, var init: Boolean = false) extends MapFn[S, T] {
+  override def initialize() {
+    if (!pt.isEmpty && getConfiguration() != null) {
+      pt.get.initialize(getConfiguration())
+      init = true
+    }
+  }
+
+  override def map(input: S) = if (init) f(pt.get.getDetachedValue(input)) else f(input)
 }
 
 trait PTypeFamily {
@@ -40,7 +47,7 @@ trait PTypeFamily {
   def records[T: ClassManifest] = ptf.records(classManifest[T].erasure)
 
   def derived[S, T](cls: java.lang.Class[T], in: S => T, out: T => S, pt: PType[S]) = {
-    ptf.derived(cls, new TMapFn[S, T](in), new TMapFn[T, S](out), pt)
+    ptf.derived(cls, new TMapFn[S, T](in, Some(pt)), new TMapFn[T, S](out), pt)
   }
 
   val longs = {