You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2015/01/28 07:17:43 UTC
crunch git commit: CRUNCH-492: Add create methods to Scrunch Pipeline
APIs
Repository: crunch
Updated Branches:
refs/heads/master 006cd72a3 -> cbb1b7e75
CRUNCH-492: Add create methods to Scrunch Pipeline APIs
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/cbb1b7e7
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/cbb1b7e7
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/cbb1b7e7
Branch: refs/heads/master
Commit: cbb1b7e757a7302b6b656ceaf1170033435df23c
Parents: 006cd72
Author: Josh Wills <jw...@apache.org>
Authored: Tue Jan 27 22:08:13 2015 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Tue Jan 27 22:08:13 2015 -0800
----------------------------------------------------------------------
.../apache/crunch/scrunch/DeepCopyTest.scala | 36 ++------------
.../org/apache/crunch/scrunch/PTableTest.scala | 7 +--
.../apache/crunch/scrunch/PipelineLike.scala | 51 ++++++++++++++++++++
3 files changed, 57 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/cbb1b7e7/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala
index 25febc2..1659bf0 100644
--- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala
+++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala
@@ -57,11 +57,9 @@ class DeepCopyTest extends CrunchSuite {
val ones = Seq(BBRec(bb1, Array(bb4, bb2)), BBRec(bb2, Array(bb1, bb3)))
val twos = Seq(BBRec(bb3, Array(bb1, bb2)), BBRec(bb4, Array(bb3, bb4)))
- writeCollection(new Path(prefix + "/ones"), ones)
- writeCollection(new Path(prefix + "/twos"), twos)
- val oneF = pipe.read(from.avroFile(prefix + "/ones", Avros.reflects[BBRec]))
- val twoF = pipe.read(from.avroFile(prefix + "/twos", Avros.reflects[BBRec]))
+ val oneF = pipe.create(ones, Avros.reflects[BBRec])
+ val twoF = pipe.create(twos, Avros.reflects[BBRec])
val m = oneF.flatMap(getIterator(_)).leftJoin(twoF.flatMap(getIterator(_)))
.keys
@@ -77,13 +75,9 @@ class DeepCopyTest extends CrunchSuite {
val twos = Seq(Rec2(1, "a", 0.4), Rec2(1, "a", 0.5), Rec2(1, "b", 0.6), Rec2(1, "b", 0.7), Rec2(2, "c", 9.9))
val threes = Seq(Rec3("a", 4), Rec3("b", 5), Rec3("c", 6))
- writeCollection(new Path(prefix + "/ones"), ones)
- writeCollection(new Path(prefix + "/twos"), twos)
- writeCollection(new Path(prefix + "/threes"), threes)
-
- val oneF = pipe.read(from.avroFile(prefix + "/ones", A.reflects(classOf[Rec1])))
- val twoF = pipe.read(from.avroFile(prefix + "/twos", A.reflects(classOf[Rec2])))
- val threeF = pipe.read(from.avroFile(prefix + "/threes", A.reflects(classOf[Rec3])))
+ val oneF = pipe.create(ones, A.reflects(classOf[Rec1]))
+ val twoF = pipe.create(twos, A.reflects(classOf[Rec2]))
+ val threeF = pipe.create(threes, A.reflects(classOf[Rec3]))
val res = (oneF.by(_.k)
cogroup
(twoF.by(_.k2)
@@ -102,24 +96,4 @@ class DeepCopyTest extends CrunchSuite {
assertEquals(res.map(_._2.toSet), Seq(e12, e22))
pipe.done()
}
-
- private def writeCollection(path: Path, records: Iterable[_ <: AnyRef]) {
- writeAvroFile(path.getFileSystem(new Configuration()).create(path, true), records)
- }
-
- @SuppressWarnings(Array("rawtypes", "unchecked"))
- private def writeAvroFile[T <: AnyRef](outputStream: FSDataOutputStream, records: Iterable[T]) {
- val r: AnyRef = records.iterator.next()
- val factory = new ScalaReflectDataFactory()
- val schema = factory.getData().getSchema(r.getClass)
- val writer = factory.getWriter[T](schema)
- val dataFileWriter = new DataFileWriter(writer)
- dataFileWriter.create(schema, outputStream)
-
- for (record <- records) {
- dataFileWriter.append(record)
- }
- dataFileWriter.close()
- outputStream.close()
- }
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/cbb1b7e7/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PTableTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PTableTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PTableTest.scala
index bae1b4e..150774d 100644
--- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PTableTest.scala
+++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PTableTest.scala
@@ -17,8 +17,6 @@
*/
package org.apache.crunch.scrunch
-import org.apache.crunch.io.{From => from, To => to}
-
import _root_.org.junit.Assert._
import _root_.org.junit.Test
@@ -34,10 +32,7 @@ class PTableTest extends CrunchSuite {
*/
private def tensCollection: PCollection[Int] = {
val pipeline = Pipeline.mapReduce[PTableTest](tempDir.getDefaultConfiguration)
- val input = tempDir.copyResourceFileName("tens.txt")
- pipeline.read(from.textFile(input)).map { line =>
- Integer.parseInt(line)
- }
+ pipeline.create(List.fill(100)(10), Avros.ints)
}
/**
http://git-wip-us.apache.org/repos/asf/crunch/blob/cbb1b7e7/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala
index 08b4697..e948904 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala
@@ -23,6 +23,8 @@ import org.apache.crunch.{Pipeline => JPipeline, _}
import org.apache.crunch.scrunch.interpreter.InterpreterRunner
import org.apache.crunch.types.{PTableType, PType}
+import scala.collection.JavaConversions.asJavaCollection
+
trait PipelineLike {
def jpipeline: JPipeline
@@ -56,6 +58,16 @@ trait PipelineLike {
def read[T](source: Source[T]): PCollection[T] = new PCollection(jpipeline.read(source))
/**
+ * Reads a source into a [[org.apache.crunch.scrunch.PCollection]]
+ *
+ * @param source The source to read from.
+ * @param named A short name to use for the returned PCollection.
+ * @tparam T The type of the values being read.
+ * @return A PCollection containing data read from the specified source.
+ */
+ def read[T](source: Source[T], named: String): PCollection[T] = new PCollection(jpipeline.read(source, named))
+
+ /**
* Reads a source into a [[org.apache.crunch.scrunch.PTable]]
*
* @param source The source to read from.
@@ -66,6 +78,17 @@ trait PipelineLike {
def read[K, V](source: TableSource[K, V]): PTable[K, V] = new PTable(jpipeline.read(source))
/**
+ * Reads a source into a [[org.apache.crunch.scrunch.PTable]]
+ *
+ * @param source The source to read from.
+ * @param named A short name to use for the return PTable.
+ * @tparam K The type of the keys being read.
+ * @tparam V The type of the values being read.
+ * @return A PTable containing data read from the specified source.
+ */
+ def read[K, V](source: TableSource[K, V], named: String): PTable[K, V] = new PTable(jpipeline.read(source, named))
+
+ /**
* Writes a parallel collection to a target.
*
* @param collection The collection to write.
@@ -114,6 +137,34 @@ trait PipelineLike {
def emptyPTable[K, V](pt: PTableType[K, V]) = new PTable[K, V](jpipeline.emptyPTable(pt))
/**
+ * Creates a new PCollection from the given elements.
+ */
+ def create[T](elements: Iterable[T], pt: PType[T]) = {
+ new PCollection[T](jpipeline.create(asJavaCollection(elements), pt))
+ }
+
+ /**
+ * Creates a new PCollection from the given elements.
+ */
+ def create[T](elements: Iterable[T], pt: PType[T], options: CreateOptions) = {
+ new PCollection[T](jpipeline.create(asJavaCollection(elements), pt, options))
+ }
+
+ /**
+ * Creates a new PTable from the given elements.
+ */
+ def create[K, V](elements: Iterable[(K, V)], pt: PTableType[K, V]) = {
+ new PTable[K, V](jpipeline.create(asJavaCollection(elements.map(t => Pair.of(t._1, t._2))), pt))
+ }
+
+ /**
+ * Creates a new PTable from the given elements.
+ */
+ def create[K, V](elements: Iterable[(K, V)], pt: PTableType[K, V], options: CreateOptions) = {
+ new PTable[K, V](jpipeline.create(asJavaCollection(elements.map(t => Pair.of(t._1, t._2))), pt, options))
+ }
+
+ /**
* Adds the given {@code SeqDoFn} to the pipeline execution and returns its output.
*/
def sequentialDo[Output](seqDoFn: PipelineCallable[Output]) = jpipeline.sequentialDo(seqDoFn)