You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2015/01/28 07:17:43 UTC

crunch git commit: CRUNCH-492: Add create methods to Scrunch Pipeline APIs

Repository: crunch
Updated Branches:
  refs/heads/master 006cd72a3 -> cbb1b7e75


CRUNCH-492: Add create methods to Scrunch Pipeline APIs


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/cbb1b7e7
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/cbb1b7e7
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/cbb1b7e7

Branch: refs/heads/master
Commit: cbb1b7e757a7302b6b656ceaf1170033435df23c
Parents: 006cd72
Author: Josh Wills <jw...@apache.org>
Authored: Tue Jan 27 22:08:13 2015 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Tue Jan 27 22:08:13 2015 -0800

----------------------------------------------------------------------
 .../apache/crunch/scrunch/DeepCopyTest.scala    | 36 ++------------
 .../org/apache/crunch/scrunch/PTableTest.scala  |  7 +--
 .../apache/crunch/scrunch/PipelineLike.scala    | 51 ++++++++++++++++++++
 3 files changed, 57 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/cbb1b7e7/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala
index 25febc2..1659bf0 100644
--- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala
+++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala
@@ -57,11 +57,9 @@ class DeepCopyTest extends CrunchSuite {
 
     val ones = Seq(BBRec(bb1, Array(bb4, bb2)), BBRec(bb2, Array(bb1, bb3)))
     val twos = Seq(BBRec(bb3, Array(bb1, bb2)), BBRec(bb4, Array(bb3, bb4)))
-    writeCollection(new Path(prefix + "/ones"), ones)
-    writeCollection(new Path(prefix + "/twos"), twos)
 
-    val oneF = pipe.read(from.avroFile(prefix + "/ones", Avros.reflects[BBRec]))
-    val twoF = pipe.read(from.avroFile(prefix + "/twos", Avros.reflects[BBRec]))
+    val oneF = pipe.create(ones, Avros.reflects[BBRec])
+    val twoF = pipe.create(twos, Avros.reflects[BBRec])
 
     val m = oneF.flatMap(getIterator(_)).leftJoin(twoF.flatMap(getIterator(_)))
       .keys
@@ -77,13 +75,9 @@ class DeepCopyTest extends CrunchSuite {
     val twos = Seq(Rec2(1, "a", 0.4), Rec2(1, "a", 0.5), Rec2(1, "b", 0.6), Rec2(1, "b", 0.7), Rec2(2, "c", 9.9))
     val threes = Seq(Rec3("a", 4), Rec3("b", 5), Rec3("c", 6))
 
-    writeCollection(new Path(prefix + "/ones"), ones)
-    writeCollection(new Path(prefix + "/twos"), twos)
-    writeCollection(new Path(prefix + "/threes"), threes)
-
-    val oneF = pipe.read(from.avroFile(prefix + "/ones", A.reflects(classOf[Rec1])))
-    val twoF = pipe.read(from.avroFile(prefix + "/twos", A.reflects(classOf[Rec2])))
-    val threeF = pipe.read(from.avroFile(prefix + "/threes", A.reflects(classOf[Rec3])))
+    val oneF = pipe.create(ones, A.reflects(classOf[Rec1]))
+    val twoF = pipe.create(twos, A.reflects(classOf[Rec2]))
+    val threeF = pipe.create(threes, A.reflects(classOf[Rec3]))
     val res = (oneF.by(_.k)
       cogroup
       (twoF.by(_.k2)
@@ -102,24 +96,4 @@ class DeepCopyTest extends CrunchSuite {
     assertEquals(res.map(_._2.toSet), Seq(e12, e22))
     pipe.done()
   }
-
-  private def writeCollection(path: Path, records: Iterable[_ <: AnyRef]) {
-    writeAvroFile(path.getFileSystem(new Configuration()).create(path, true), records)
-  }
-
-  @SuppressWarnings(Array("rawtypes", "unchecked"))
-  private def writeAvroFile[T <: AnyRef](outputStream: FSDataOutputStream, records: Iterable[T]) {
-    val r: AnyRef = records.iterator.next()
-    val factory = new ScalaReflectDataFactory()
-    val schema = factory.getData().getSchema(r.getClass)
-    val writer = factory.getWriter[T](schema)
-    val dataFileWriter = new DataFileWriter(writer)
-    dataFileWriter.create(schema, outputStream)
-
-    for (record <- records) {
-      dataFileWriter.append(record)
-    }
-    dataFileWriter.close()
-    outputStream.close()
-  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/cbb1b7e7/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PTableTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PTableTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PTableTest.scala
index bae1b4e..150774d 100644
--- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PTableTest.scala
+++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PTableTest.scala
@@ -17,8 +17,6 @@
  */
 package org.apache.crunch.scrunch
 
-import org.apache.crunch.io.{From => from, To => to}
-
 import _root_.org.junit.Assert._
 import _root_.org.junit.Test
 
@@ -34,10 +32,7 @@ class PTableTest extends CrunchSuite {
    */
   private def tensCollection: PCollection[Int] = {
     val pipeline = Pipeline.mapReduce[PTableTest](tempDir.getDefaultConfiguration)
-    val input = tempDir.copyResourceFileName("tens.txt")
-    pipeline.read(from.textFile(input)).map { line =>
-      Integer.parseInt(line)
-    }
+    pipeline.create(List.fill(100)(10), Avros.ints)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/crunch/blob/cbb1b7e7/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala
index 08b4697..e948904 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala
@@ -23,6 +23,8 @@ import org.apache.crunch.{Pipeline => JPipeline, _}
 import org.apache.crunch.scrunch.interpreter.InterpreterRunner
 import org.apache.crunch.types.{PTableType, PType}
 
+import scala.collection.JavaConversions.asJavaCollection
+
 trait PipelineLike {
   def jpipeline: JPipeline
 
@@ -56,6 +58,16 @@ trait PipelineLike {
   def read[T](source: Source[T]): PCollection[T] = new PCollection(jpipeline.read(source))
 
   /**
+   * Reads a source into a [[org.apache.crunch.scrunch.PCollection]]
+   *
+   * @param source The source to read from.
+   * @param named A short name to use for the returned PCollection.
+   * @tparam T The type of the values being read.
+   * @return A PCollection containing data read from the specified source.
+   */
+  def read[T](source: Source[T], named: String): PCollection[T] = new PCollection(jpipeline.read(source, named))
+
+  /**
    * Reads a source into a [[org.apache.crunch.scrunch.PTable]]
    *
    * @param source The source to read from.
@@ -66,6 +78,17 @@ trait PipelineLike {
   def read[K, V](source: TableSource[K, V]): PTable[K, V] = new PTable(jpipeline.read(source))
 
   /**
+   * Reads a source into a [[org.apache.crunch.scrunch.PTable]]
+   *
+   * @param source The source to read from.
+   * @param named A short name to use for the return PTable.
+   * @tparam K The type of the keys being read.
+   * @tparam V The type of the values being read.
+   * @return A PTable containing data read from the specified source.
+   */
+  def read[K, V](source: TableSource[K, V], named: String): PTable[K, V] = new PTable(jpipeline.read(source, named))
+
+  /**
    * Writes a parallel collection to a target.
    *
    * @param collection The collection to write.
@@ -114,6 +137,34 @@ trait PipelineLike {
   def emptyPTable[K, V](pt: PTableType[K, V]) = new PTable[K, V](jpipeline.emptyPTable(pt))
 
   /**
+   * Creates a new PCollection from the given elements.
+   */
+  def create[T](elements: Iterable[T], pt: PType[T]) = {
+    new PCollection[T](jpipeline.create(asJavaCollection(elements), pt))
+  }
+
+  /**
+   * Creates a new PCollection from the given elements.
+   */
+  def create[T](elements: Iterable[T], pt: PType[T], options: CreateOptions) = {
+    new PCollection[T](jpipeline.create(asJavaCollection(elements), pt, options))
+  }
+
+  /**
+   *  Creates a new PTable from the given elements.
+   */
+  def create[K, V](elements: Iterable[(K, V)], pt: PTableType[K, V]) = {
+    new PTable[K, V](jpipeline.create(asJavaCollection(elements.map(t => Pair.of(t._1, t._2))), pt))
+  }
+
+  /**
+   *  Creates a new PTable from the given elements.
+   */
+  def create[K, V](elements: Iterable[(K, V)], pt: PTableType[K, V], options: CreateOptions) = {
+    new PTable[K, V](jpipeline.create(asJavaCollection(elements.map(t => Pair.of(t._1, t._2))), pt, options))
+  }
+
+  /**
    * Adds the given {@code SeqDoFn} to the pipeline execution and returns its output.
    */
   def sequentialDo[Output](seqDoFn: PipelineCallable[Output]) = jpipeline.sequentialDo(seqDoFn)