You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2012/08/08 02:47:06 UTC
[9/10] CRUNCH-32: Clean up namespaces.
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/it/resources/urls.txt
----------------------------------------------------------------------
diff --git a/scrunch/src/it/resources/urls.txt b/scrunch/src/it/resources/urls.txt
deleted file mode 100644
index 827e711..0000000
--- a/scrunch/src/it/resources/urls.txt
+++ /dev/null
@@ -1,11 +0,0 @@
-www.A.com www.B.com
-www.A.com www.C.com
-www.A.com www.D.com
-www.A.com www.E.com
-www.B.com www.D.com
-www.B.com www.E.com
-www.C.com www.D.com
-www.D.com www.B.com
-www.E.com www.A.com
-www.F.com www.B.com
-www.F.com www.C.com
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/it/scala/org/apache/scrunch/CogroupTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/it/scala/org/apache/scrunch/CogroupTest.scala b/scrunch/src/it/scala/org/apache/scrunch/CogroupTest.scala
deleted file mode 100644
index de20cd9..0000000
--- a/scrunch/src/it/scala/org/apache/scrunch/CogroupTest.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-import org.apache.crunch.io.{From => from}
-import org.apache.crunch.test.CrunchTestSupport
-
-import org.scalatest.junit.JUnitSuite
-import _root_.org.junit.Test
-
-class CogroupTest extends CrunchTestSupport with JUnitSuite {
- lazy val pipeline = Pipeline.mapReduce[CogroupTest](tempDir.getDefaultConfiguration)
-
- def wordCount(fileName: String) = {
- pipeline.read(from.textFile(fileName))
- .flatMap(_.toLowerCase.split("\\W+")).count
- }
-
- @Test def cogroup {
- val shakespeare = tempDir.copyResourceFileName("shakes.txt")
- val maugham = tempDir.copyResourceFileName("maugham.txt")
- val diffs = wordCount(shakespeare).cogroup(wordCount(maugham))
- .map((k, v) => (k, (v._1.sum - v._2.sum))).materialize
- assert(diffs.exists(_ == ("the", -11390)))
- pipeline.done
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/it/scala/org/apache/scrunch/JoinTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/it/scala/org/apache/scrunch/JoinTest.scala b/scrunch/src/it/scala/org/apache/scrunch/JoinTest.scala
deleted file mode 100644
index 397ca65..0000000
--- a/scrunch/src/it/scala/org/apache/scrunch/JoinTest.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-import org.apache.crunch.io.{From => from, To => to}
-import org.apache.crunch.test.CrunchTestSupport
-
-import org.scalatest.junit.JUnitSuite
-import _root_.org.junit.Test
-
-class JoinTest extends CrunchTestSupport with JUnitSuite {
- lazy val pipeline = Pipeline.mapReduce[CogroupTest](tempDir.getDefaultConfiguration)
-
- def wordCount(fileName: String) = {
- pipeline.read(from.textFile(fileName))
- .flatMap(_.toLowerCase.split("\\W+")).count
- }
-
- @Test def join {
- val shakespeare = tempDir.copyResourceFileName("shakes.txt")
- val maugham = tempDir.copyResourceFileName("maugham.txt")
- val output = tempDir.getFile("output")
- val filtered = wordCount(shakespeare).join(wordCount(maugham))
- .map((k, v) => (k, v._1 - v._2))
- .write(to.textFile(output.getAbsolutePath()))
- .filter((k, d) => d > 0).materialize
- assert(filtered.exists(_ == ("macbeth", 66)))
- pipeline.done
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/it/scala/org/apache/scrunch/PageRankClassTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/it/scala/org/apache/scrunch/PageRankClassTest.scala b/scrunch/src/it/scala/org/apache/scrunch/PageRankClassTest.scala
deleted file mode 100644
index d2822db..0000000
--- a/scrunch/src/it/scala/org/apache/scrunch/PageRankClassTest.scala
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-import Avros._
-
-import org.apache.crunch.{DoFn, Emitter, Pair => P}
-import org.apache.crunch.io.{From => from}
-import org.apache.crunch.test.CrunchTestSupport
-
-import scala.collection.mutable.HashMap
-
-import org.scalatest.junit.JUnitSuite
-import _root_.org.junit.Assert._
-import _root_.org.junit.Test
-
-case class PageRankData(pr: Float, oldpr: Float, urls: Array[String]) {
- def this() = this(0f, 0f, null)
-
- def scaledPageRank = pr / urls.length
-
- def next(newPageRank: Float) = new PageRankData(newPageRank, pr, urls)
-
- def delta = math.abs(pr - oldpr)
-}
-
-class CachingPageRankClassFn extends DoFn[P[String, PageRankData], P[String, Float]] {
- val cache = new HashMap[String, Float] {
- override def default(key: String) = 0f
- }
-
- override def process(input: P[String, PageRankData], emitFn: Emitter[P[String, Float]]) {
- val prd = input.second()
- if (prd.urls.length > 0) {
- val newpr = prd.pr / prd.urls.length
- prd.urls.foreach(url => cache.put(url, cache(url) + newpr))
- if (cache.size > 5000) {
- cleanup(emitFn)
- }
- }
- }
-
- override def cleanup(emitFn: Emitter[P[String, Float]]) {
- cache.foreach(kv => emitFn.emit(P.of(kv._1, kv._2)))
- cache.clear
- }
-}
-
-class PageRankClassTest extends CrunchTestSupport with JUnitSuite {
-
- lazy val pipeline = Pipeline.mapReduce[PageRankTest](tempDir.getDefaultConfiguration)
-
- def initialInput(fileName: String) = {
- pipeline.read(from.textFile(fileName))
- .map(line => { val urls = line.split("\\t"); (urls(0), urls(1)) })
- .groupByKey
- .map((url, links) => (url, PageRankData(1f, 0f, links.filter(x => x != null).toArray)))
- }
-
- def update(prev: PTable[String, PageRankData], d: Float) = {
- val outbound = prev.flatMap((url, prd) => {
- prd.urls.map(link => (link, prd.scaledPageRank))
- })
- cg(prev, outbound, d)
- }
-
- def cg(prev: PTable[String, PageRankData],
- out: PTable[String, Float], d: Float) = {
- prev.cogroup(out).map((url, v) => {
- val (p, o) = v
- val prd = p.head
- (url, prd.next((1 - d) + d * o.sum))
- })
- }
-
- def fastUpdate(prev: PTable[String, PageRankData], d: Float) = {
- val outbound = prev.parallelDo(new CachingPageRankClassFn(), tableOf(strings, floats))
- cg(prev, outbound, d)
- }
-
- @Test def testPageRank {
- pipeline.getConfiguration.set("crunch.debug", "true")
- var prev = initialInput(tempDir.copyResourceFileName("urls.txt"))
- var delta = 1.0f
- while (delta > 0.01f) {
- prev = update(prev, 0.5f)
- delta = prev.values.map(_.delta).max.materialize.head
- }
- assertEquals(0.0048, delta, 0.001)
- pipeline.done
- }
-
- def testFastPageRank {
- pipeline.getConfiguration.set("crunch.debug", "true")
- var prev = initialInput(tempDir.copyResourceFileName("urls.txt"))
- var delta = 1.0f
- while (delta > 0.01f) {
- prev = fastUpdate(prev, 0.5f)
- delta = prev.values.map(_.delta).max.materialize.head
- }
- assertEquals(0.0048, delta, 0.001)
- pipeline.done
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/it/scala/org/apache/scrunch/PageRankTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/it/scala/org/apache/scrunch/PageRankTest.scala b/scrunch/src/it/scala/org/apache/scrunch/PageRankTest.scala
deleted file mode 100644
index 9cf05e1..0000000
--- a/scrunch/src/it/scala/org/apache/scrunch/PageRankTest.scala
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-import Avros._
-
-import org.apache.crunch.{DoFn, Emitter, Pair => P}
-import org.apache.crunch.io.{From => from}
-import org.apache.crunch.test.CrunchTestSupport
-
-import scala.collection.mutable.HashMap
-
-import org.scalatest.junit.JUnitSuite
-import _root_.org.junit.Assert._
-import _root_.org.junit.Test
-
-class CachingPageRankFn extends DoFn[P[String, (Float, Float, List[String])], P[String, Float]] {
- val cache = new HashMap[String, Float] {
- override def default(key: String) = 0f
- }
-
- override def process(input: P[String, (Float, Float, List[String])], emitFn: Emitter[P[String, Float]]) {
- val (pr, oldpr, urls) = input.second()
- val newpr = pr / urls.size
- urls.foreach(url => cache.put(url, cache(url) + newpr))
- if (cache.size > 5000) {
- cleanup(emitFn)
- }
- }
-
- override def cleanup(emitFn: Emitter[P[String, Float]]) {
- cache.foreach(kv => emitFn.emit(P.of(kv._1, kv._2)))
- cache.clear
- }
-}
-
-class PageRankTest extends CrunchTestSupport with JUnitSuite {
- lazy val pipeline = Pipeline.mapReduce[PageRankTest](tempDir.getDefaultConfiguration)
-
- def initialInput(fileName: String) = {
- pipeline.read(from.textFile(fileName))
- .map(line => { val urls = line.split("\\t"); (urls(0), urls(1)) })
- .groupByKey
- .map((url, links) => (url, (1f, 0f, links.toList)))
- }
-
- def update(prev: PTable[String, (Float, Float, List[String])], d: Float) = {
- val outbound = prev.flatMap((url, v) => {
- val (pr, oldpr, links) = v
- links.map(link => (link, pr / links.size))
- })
- cg(prev, outbound, d)
- }
-
- def cg(prev: PTable[String, (Float, Float, List[String])],
- out: PTable[String, Float], d: Float) = {
- prev.cogroup(out).map((url, v) => {
- val (p, o) = v
- val (pr, oldpr, links) = p.head
- (url, ((1 - d) + d * o.sum, pr, links))
- })
- }
-
- def fastUpdate(prev: PTable[String, (Float, Float, List[String])], d: Float) = {
- val outbound = prev.parallelDo(new CachingPageRankFn(), tableOf(strings, floats))
- cg(prev, outbound, d)
- }
-
- @Test def testPageRank {
- var prev = initialInput(tempDir.copyResourceFileName("urls.txt"))
- var delta = 1.0f
- while (delta > 0.01f) {
- prev = update(prev, 0.5f)
- delta = prev.map((k, v) => math.abs(v._1 - v._2)).max.materialize.head
- }
- assertEquals(0.0048, delta, 0.001)
- pipeline.done
- }
-
- @Test def testFastPageRank {
- var prev = initialInput(tempDir.copyResourceFileName("urls.txt"))
- var delta = 1.0f
- while (delta > 0.01f) {
- prev = fastUpdate(prev, 0.5f)
- delta = prev.map((k, v) => math.abs(v._1 - v._2)).max.materialize.head
- }
- assertEquals(0.0048, delta, 0.001)
- pipeline.done
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/it/scala/org/apache/scrunch/PipelineAppTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/it/scala/org/apache/scrunch/PipelineAppTest.scala b/scrunch/src/it/scala/org/apache/scrunch/PipelineAppTest.scala
deleted file mode 100644
index 50d6fc1..0000000
--- a/scrunch/src/it/scala/org/apache/scrunch/PipelineAppTest.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-import org.apache.crunch.test.CrunchTestSupport
-
-import org.scalatest.junit.JUnitSuite
-import _root_.org.junit.Test
-
-object WordCount extends PipelineApp {
-
- def wordSplit(line: String) = line.split("\\W+").filter(!_.isEmpty())
-
- def countWords(filename: String) = {
- val lines = read(from.textFile(filename))
- val words = lines.flatMap(wordSplit)
- words.count
- }
-
- val w1 = countWords(args(0))
- val w2 = countWords(args(1))
- cogroup(w1, w2).write(to.textFile(args(2)))
-}
-
-class PipelineAppTest extends CrunchTestSupport with JUnitSuite {
- @Test def run {
- val args = new Array[String](3)
- args(0) = tempDir.copyResourceFileName("shakes.txt")
- args(1) = tempDir.copyResourceFileName("maugham.txt")
- args(2) = tempDir.getFileName("output")
- tempDir.overridePathProperties(WordCount.configuration)
- WordCount.main(args)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/it/scala/org/apache/scrunch/TopTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/it/scala/org/apache/scrunch/TopTest.scala b/scrunch/src/it/scala/org/apache/scrunch/TopTest.scala
deleted file mode 100644
index f9db0e5..0000000
--- a/scrunch/src/it/scala/org/apache/scrunch/TopTest.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-import org.apache.crunch.io.{From => from, To => to}
-import org.apache.crunch.test.CrunchTestSupport
-
-import org.scalatest.junit.JUnitSuite
-import _root_.org.junit.Test
-
-class TopTest extends CrunchTestSupport with JUnitSuite {
-
- @Test def topInMem {
- val ptable = Mem.tableOf(("foo", 17), ("bar", 29), ("baz", 1729))
- assert(ptable.top(1, true).materialize.head == ("baz", 1729))
- }
-
- @Test def top2 {
- val pipeline = Pipeline.mapReduce[TopTest](tempDir.getDefaultConfiguration)
- val input = tempDir.copyResourceFileName("shakes.txt")
-
- val wc = pipeline.read(from.textFile(input))
- .flatMap(_.toLowerCase.split("\\s+"))
- .filter(!_.isEmpty()).count
- assert(wc.top(10, true).materialize.exists(_ == ("is", 205)))
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/it/scala/org/apache/scrunch/UnionTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/it/scala/org/apache/scrunch/UnionTest.scala b/scrunch/src/it/scala/org/apache/scrunch/UnionTest.scala
deleted file mode 100644
index dd0a651..0000000
--- a/scrunch/src/it/scala/org/apache/scrunch/UnionTest.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-import org.apache.crunch.io.{From => from}
-import org.apache.crunch.test.CrunchTestSupport
-
-import org.scalatest.junit.JUnitSuite
-import _root_.org.junit.Test
-
-class UnionTest extends CrunchTestSupport with JUnitSuite {
- lazy val pipeline = Pipeline.mapReduce[UnionTest](tempDir.getDefaultConfiguration)
-
- def wordCount(col: PCollection[String]) = {
- col.flatMap(_.toLowerCase.split("\\W+")).count
- }
-
- @Test def testUnionCollection {
- val shakespeare = tempDir.copyResourceFileName("shakes.txt")
- val maugham = tempDir.copyResourceFileName("maugham.txt")
- val union = pipeline.read(from.textFile(shakespeare)).union(
- pipeline.read(from.textFile(maugham)))
- val wc = wordCount(union).materialize
- assert(wc.exists(_ == ("you", 3691)))
- pipeline.done
- }
-
- @Test def testUnionTable {
- val shakespeare = tempDir.copyResourceFileName("shakes.txt")
- val maugham = tempDir.copyResourceFileName("maugham.txt")
- val wcs = wordCount(pipeline.read(from.textFile(shakespeare)))
- val wcm = wordCount(pipeline.read(from.textFile(maugham)))
- val wc = wcs.union(wcm).groupByKey.combine(v => v.sum).materialize
- assert(wc.exists(_ == ("you", 3691)))
- pipeline.done
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/it/scala/org/apache/scrunch/WordCountTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/it/scala/org/apache/scrunch/WordCountTest.scala b/scrunch/src/it/scala/org/apache/scrunch/WordCountTest.scala
deleted file mode 100644
index 3edb08b..0000000
--- a/scrunch/src/it/scala/org/apache/scrunch/WordCountTest.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-import org.apache.crunch.io.{From => from, To => to}
-import org.apache.crunch.test.CrunchTestSupport
-
-import org.scalatest.junit.JUnitSuite
-import _root_.org.junit.Test
-
-class WordCountTest extends CrunchTestSupport with JUnitSuite {
- @Test def wordCount {
- val pipeline = Pipeline.mapReduce[WordCountTest](tempDir.getDefaultConfiguration)
- val input = tempDir.copyResourceFileName("shakes.txt")
- val wordCountOut = tempDir.getFileName("output")
-
- val fcc = pipeline.read(from.textFile(input))
- .flatMap(_.toLowerCase.split("\\s+"))
- .filter(!_.isEmpty()).count
- .write(to.textFile(wordCountOut)) // Word counts
- .map((w, c) => (w.slice(0, 1), c))
- .groupByKey.combine(v => v.sum).materialize
- assert(fcc.exists(_ == ("w", 1404)))
-
- pipeline.done
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/it/scala/org/apache/scrunch/interpreter/InterpreterJarTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/it/scala/org/apache/scrunch/interpreter/InterpreterJarTest.scala b/scrunch/src/it/scala/org/apache/scrunch/interpreter/InterpreterJarTest.scala
deleted file mode 100644
index 519dae4..0000000
--- a/scrunch/src/it/scala/org/apache/scrunch/interpreter/InterpreterJarTest.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch.interpreter
-
-import java.io.File
-import java.io.FileOutputStream
-import java.util.jar.JarFile
-import java.util.jar.JarOutputStream
-
-import scala.tools.nsc.io.VirtualDirectory
-
-import com.google.common.io.Files
-import org.junit.Assert.assertNotNull
-import org.junit.Test
-import org.apache.crunch.test.CrunchTestSupport
-import org.scalatest.junit.JUnitSuite
-
-/**
- * Tests creating jars from a {@link scala.tools.nsc.io.VirtualDirectory}.
- */
-class InterpreterJarTest extends CrunchTestSupport with JUnitSuite {
-
- /**
- * Tests transforming a virtual directory into a temporary jar file.
- */
- @Test def virtualDirToJar: Unit = {
- // Create a virtual directory and populate with some mock content.
- val root = new VirtualDirectory("testDir", None)
- // Add some subdirectories to the root.
- (1 to 10).foreach { i =>
- val subdir = root.subdirectoryNamed("subdir" + i).asInstanceOf[VirtualDirectory]
- // Add some classfiles to each sub directory.
- (1 to 10).foreach { j =>
- subdir.fileNamed("MyClass" + j + ".class")
- }
- }
-
- // Now generate a jar file from the virtual directory.
- val tempJar = new File(tempDir.getRootFile(), "replJar.jar")
- val jarStream = new JarOutputStream(new FileOutputStream(tempJar))
- InterpreterRunner.addVirtualDirectoryToJar(root, "top/pack/name/", jarStream)
- jarStream.close()
-
- // Verify the contents of the jar.
- val jarFile = new JarFile(tempJar)
- (1 to 10).foreach { i =>
- (1 to 10).foreach { j =>
- val entryName = "top/pack/name/subdir" + i + "/MyClass" + j + ".class"
- val entry = jarFile.getEntry(entryName)
- assertNotNull("Jar entry " + entryName + " not found in generated jar.", entry)
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/assembly/release.xml
----------------------------------------------------------------------
diff --git a/scrunch/src/main/assembly/release.xml b/scrunch/src/main/assembly/release.xml
deleted file mode 100644
index e740f32..0000000
--- a/scrunch/src/main/assembly/release.xml
+++ /dev/null
@@ -1,93 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
-http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<!--
- Assembly configuration for the release bundle.
--->
-<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
- <id>release</id>
- <formats>
- <format>dir</format>
- <format>tar.gz</format>
- </formats>
- <includeBaseDirectory>true</includeBaseDirectory>
- <fileSets>
- <fileSet>
- <!-- readme -->
- <useDefaultExcludes>false</useDefaultExcludes>
- <outputDirectory>/</outputDirectory>
- <fileMode>0644</fileMode>
- <includes>
- <include>README.md</include>
- </includes>
- <filtered>true</filtered>
- </fileSet>
- <fileSet>
- <!-- scripts -->
- <useDefaultExcludes>false</useDefaultExcludes>
- <outputDirectory>bin</outputDirectory>
- <directory>src/main/scripts</directory>
- <fileMode>0755</fileMode>
- <excludes>
- <exclude>*~</exclude>
- <exclude>*.swp</exclude>
- </excludes>
- <filtered>true</filtered>
- </fileSet>
- <fileSet>
- <!-- conf dir -->
- <useDefaultExcludes>false</useDefaultExcludes>
- <outputDirectory>conf</outputDirectory>
- <directory>src/main/conf</directory>
- <fileMode>0644</fileMode>
- <excludes>
- <exclude>*~</exclude>
- <exclude>*.swp</exclude>
- </excludes>
- <filtered>true</filtered>
- </fileSet>
- <fileSet>
- <!-- examples dir -->
- <useDefaultExcludes>false</useDefaultExcludes>
- <outputDirectory>examples</outputDirectory>
- <directory>src/main/examples</directory>
- <fileMode>0644</fileMode>
- <excludes>
- <exclude>*~</exclude>
- <exclude>*.swp</exclude>
- </excludes>
- <filtered>true</filtered>
- </fileSet>
- </fileSets>
- <dependencySets>
- <dependencySet>
- <outputDirectory>lib</outputDirectory>
- <scope>runtime</scope>
- <useTransitiveFiltering>true</useTransitiveFiltering>
- <fileMode>0644</fileMode>
- <!--
- <excludes>
- <exclude>org.apache.hadoop:hadoop-core</exclude>
- <exclude>org.apache.hbase:hbase</exclude>
- </excludes>
- -->
- </dependencySet>
- </dependencySets>
-</assembly>
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/scrunch/src/main/conf/log4j.properties b/scrunch/src/main/conf/log4j.properties
deleted file mode 100644
index 448bb77..0000000
--- a/scrunch/src/main/conf/log4j.properties
+++ /dev/null
@@ -1,24 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# ***** Set root logger level to INFO and its only appender to A.
-log4j.logger.org.apache.scrunch=info, A
-
-# ***** A is set to be a ConsoleAppender.
-log4j.appender.A=org.apache.log4j.ConsoleAppender
-# ***** A uses PatternLayout.
-log4j.appender.A.layout=org.apache.log4j.PatternLayout
-log4j.appender.A.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/examples/ClassyPageRank.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/examples/ClassyPageRank.scala b/scrunch/src/main/examples/ClassyPageRank.scala
deleted file mode 100644
index 6c819a5..0000000
--- a/scrunch/src/main/examples/ClassyPageRank.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import org.apache.scrunch._
-import org.apache.scrunch.Mem._
-
-case class UrlData(pageRank: Float, oldPageRank: Float, links: List[String]) {
- def this() = this(1.0f, 0.0f, Nil)
-
- def this(links: String*) = this(1.0f, 0.0f, List(links:_*))
-
- def this(links: Iterable[String]) = this(1.0f, 0.0f, links.toList)
-
- def delta = math.abs(pageRank - oldPageRank)
-
- def next(newPageRank: Float) = new UrlData(newPageRank, pageRank, links)
-
- def outboundScores = links.map(link => (link, pageRank / links.size))
-}
-
-object ClassyPageRank extends PipelineApp {
-
- def initialize(file: String) = {
- read(from.textFile(file))
- .map(line => { val urls = line.split("\\s+"); (urls(0), urls(2)) })
- .groupByKey
- .map((url, links) => (url, new UrlData(links)))
- }
-
- def update(prev: PTable[String, UrlData], d: Float) = {
- val outbound = prev.values.flatMap(_.outboundScores)
-
- cogroup(prev, outbound).mapValues(data => {
- val (prd, outboundScores) = data
- val newPageRank = (1 - d) + d * outboundScores.sum
- if (!prd.isEmpty) {
- prd.head.next(newPageRank)
- } else {
- new UrlData(newPageRank, 0, Nil)
- }
- })
- }
-
- var index = 0
- var delta = 10.0f
- fs.mkdirs("prank/")
- var curr = initialize(args(0))
- while (delta > 1.0f) {
- index = index + 1
- curr = update(curr, 0.5f)
- write(curr, to.avroFile("prank/" + index))
- delta = curr.values.map(_.delta).max.materialize.head
- println("Current delta = " + delta)
- }
- fs.rename("prank/" + index, args(1))
- fs.delete("prank/", true)
-}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/examples/PageRank.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/examples/PageRank.scala b/scrunch/src/main/examples/PageRank.scala
deleted file mode 100644
index 7de26e6..0000000
--- a/scrunch/src/main/examples/PageRank.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import org.apache.scrunch._
-
-object PageRank extends PipelineApp {
- def initialize(file: String) = {
- read(from.textFile(file))
- .map(line => { val urls = line.split("\\s+"); (urls(0), urls(2)) })
- .groupByKey
- .map((url, links) => (url, (1f, 0f, links.toList)))
- }
-
- def update(prev: PTable[String, (Float, Float, List[String])], d: Float) = {
- val outbound = prev.flatMap((url, data) => {
- val (pagerank, old_pagerank, links) = data
- links.map(link => (link, pagerank / links.size))
- })
-
- cogroup(prev, outbound).mapValues(data => {
- val (prev_data, outbound_data) = data
- val new_pagerank = (1 - d) + d * outbound_data.sum
- var cur_pagerank = 0f
- var links: List[String] = Nil
- if (!prev_data.isEmpty) {
- val (cur_pr, old_pr, l) = prev_data.head
- cur_pagerank = cur_pr
- links = l
- }
- (new_pagerank, cur_pagerank, links)
- })
- }
-
- var index = 0
- var delta = 10.0f
- fs.mkdirs("prank/")
- var curr = initialize(args(0))
- while (delta > 1.0f) {
- index = index + 1
- curr = update(curr, 0.5f)
- write(curr, to.avroFile("prank/" + index))
- delta = curr.values.map(v => math.abs(v._1 - v._2)).max.materialize.head
- println("Current delta = " + delta)
- }
- fs.rename("prank/" + index, args(1))
- fs.delete("prank/", true)
-}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/examples/WordCount.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/examples/WordCount.scala b/scrunch/src/main/examples/WordCount.scala
deleted file mode 100644
index 10780e8..0000000
--- a/scrunch/src/main/examples/WordCount.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-import org.apache.scrunch.PipelineApp
-
-object WordCount extends PipelineApp {
-
- def countWords(file: String) = {
- read(from.textFile(file))
- .flatMap(_.split("\\W+").filter(!_.isEmpty()))
- .count
- }
-
- val counts = join(countWords(args(0)), countWords(args(1)))
- write(counts, to.textFile(args(2)))
-}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/java/org/apache/scrunch/ScalaReflectDataFactory.java
----------------------------------------------------------------------
diff --git a/scrunch/src/main/java/org/apache/scrunch/ScalaReflectDataFactory.java b/scrunch/src/main/java/org/apache/scrunch/ScalaReflectDataFactory.java
deleted file mode 100644
index 0180585..0000000
--- a/scrunch/src/main/java/org/apache/scrunch/ScalaReflectDataFactory.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch;
-
-import org.apache.avro.Schema;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.reflect.ReflectDatumWriter;
-
-import org.apache.crunch.types.avro.ReflectDataFactory;
-
-/**
- * An implementation of the {@code ReflectDataFactory} class to work with Scala classes.
- */
-public class ScalaReflectDataFactory extends ReflectDataFactory {
-
- public ReflectData getReflectData() { return ScalaSafeReflectData.get(); }
-
- public <T> ReflectDatumReader<T> getReader(Schema schema) {
- return new ScalaSafeReflectDatumReader<T>(schema);
- }
-
- public <T> ReflectDatumWriter<T> getWriter() {
- return new ScalaSafeReflectDatumWriter<T>();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/java/org/apache/scrunch/ScalaSafeReflectData.java
----------------------------------------------------------------------
diff --git a/scrunch/src/main/java/org/apache/scrunch/ScalaSafeReflectData.java b/scrunch/src/main/java/org/apache/scrunch/ScalaSafeReflectData.java
deleted file mode 100644
index 55bacda..0000000
--- a/scrunch/src/main/java/org/apache/scrunch/ScalaSafeReflectData.java
+++ /dev/null
@@ -1,292 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.GenericArrayType;
-import java.lang.reflect.Modifier;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.avro.AvroRuntimeException;
-import org.apache.avro.AvroTypeException;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericFixed;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.avro.reflect.Stringable;
-import org.apache.avro.reflect.Union;
-import org.apache.avro.specific.FixedSize;
-import org.codehaus.jackson.JsonNode;
-import org.codehaus.jackson.node.NullNode;
-
-/**
- * Scala-oriented support class for serialization via reflection.
- */
-public class ScalaSafeReflectData extends ReflectData.AllowNull {
-
- private static final ScalaSafeReflectData INSTANCE = new ScalaSafeReflectData();
-
- public static ScalaSafeReflectData get() { return INSTANCE; }
-
- static final String CLASS_PROP = "java-class";
- static final String ELEMENT_PROP = "java-element-class";
-
- static Class getClassProp(Schema schema, String prop) {
- String name = schema.getProp(prop);
- if (name == null) return null;
- try {
- return Class.forName(name);
- } catch (ClassNotFoundException e) {
- throw new AvroRuntimeException(e);
- }
- }
-
- /**
- * This method is the whole reason for this class to exist, so that I can
- * hack around a problem where calling getSimpleName on a class that is
- * defined inside of the Scala REPL can cause an internal language error,
- * which I'm not a huge fan of.
- *
- * @param clazz
- * @return
- */
- private String getSimpleName(Class clazz) {
- try {
- return clean(clazz.getSimpleName());
- } catch (InternalError ie) {
- // This can happen in Scala when we're using the Console. Crazy, right?
- String fullName = clazz.getName();
- String[] pieces = fullName.split("\\.");
- return clean(pieces[pieces.length - 1]);
- }
- }
-
- @Override
- @SuppressWarnings(value="unchecked")
- protected Schema createSchema(Type type, Map<String,Schema> names) {
- if (type instanceof GenericArrayType) { // generic array
- Type component = ((GenericArrayType)type).getGenericComponentType();
- if (component == Byte.TYPE) // byte array
- return Schema.create(Schema.Type.BYTES);
- Schema result = Schema.createArray(createSchema(component, names));
- setElement(result, component);
- return result;
- } else if (type instanceof ParameterizedType) {
- ParameterizedType ptype = (ParameterizedType)type;
- Class raw = (Class)ptype.getRawType();
- Type[] params = ptype.getActualTypeArguments();
- if (java.util.Map.class.isAssignableFrom(raw) ||
- scala.collection.Map.class.isAssignableFrom(raw)) {
- Type key = params[0];
- Type value = params[1];
- if (!(key == String.class))
- throw new AvroTypeException("Map key class not String: "+key);
- Schema schema = Schema.createMap(createSchema(value, names));
- schema.addProp(CLASS_PROP, raw.getName());
- return schema;
- } else if (Collection.class.isAssignableFrom(raw) ||
- scala.collection.Iterable.class.isAssignableFrom(raw)) { // Collection
- if (params.length != 1)
- throw new AvroTypeException("No array type specified.");
- Schema schema = Schema.createArray(createSchema(params[0], names));
- schema.addProp(CLASS_PROP, raw.getName());
- return schema;
- } else {
- throw new AvroTypeException("Could not convert type: " + type);
- }
- } else if ((type == Short.class) || (type == Short.TYPE)) {
- Schema result = Schema.create(Schema.Type.INT);
- result.addProp(CLASS_PROP, Short.class.getName());
- return result;
- } else if (type instanceof Class) { // Class
- Class<?> c = (Class<?>)type;
- if (c.isPrimitive() || Number.class.isAssignableFrom(c)
- || c == Void.class || c == Boolean.class) // primitive
- return super.createSchema(type, names);
- if (c.isArray()) { // array
- Class component = c.getComponentType();
- if (component == Byte.TYPE) // byte array
- return Schema.create(Schema.Type.BYTES);
- Schema result = Schema.createArray(createSchema(component, names));
- setElement(result, component);
- return result;
- }
- if (CharSequence.class.isAssignableFrom(c)) // String
- return Schema.create(Schema.Type.STRING);
- String fullName = c.getName();
- Schema schema = names.get(fullName);
- if (schema == null) {
- String name = getSimpleName(c);
- String space = c.getPackage() == null ? "" : c.getPackage().getName();
- if (c.getEnclosingClass() != null) // nested class
- space = c.getEnclosingClass().getName() + "$";
- Union union = c.getAnnotation(Union.class);
- if (union != null) { // union annotated
- return getAnnotatedUnion(union, names);
- } else if (c.isAnnotationPresent(Stringable.class)){ // Stringable
- Schema result = Schema.create(Schema.Type.STRING);
- result.addProp(CLASS_PROP, c.getName());
- return result;
- } else if (c.isEnum()) { // Enum
- List<String> symbols = new ArrayList<String>();
- Enum[] constants = (Enum[])c.getEnumConstants();
- for (int i = 0; i < constants.length; i++)
- symbols.add(constants[i].name());
- schema = Schema.createEnum(name, null /* doc */, space, symbols);
- } else if (GenericFixed.class.isAssignableFrom(c)) { // fixed
- int size = c.getAnnotation(FixedSize.class).value();
- schema = Schema.createFixed(name, null /* doc */, space, size);
- } else if (IndexedRecord.class.isAssignableFrom(c)) { // specific
- return super.createSchema(type, names);
- } else { // record
- List<Schema.Field> fields = new ArrayList<Schema.Field>();
- boolean error = Throwable.class.isAssignableFrom(c);
- schema = Schema.createRecord(name, null /* doc */, space, error);
- names.put(c.getName(), schema);
- for (Field field : getFields(c))
- if ((field.getModifiers()&(Modifier.TRANSIENT|Modifier.STATIC))==0){
- Schema fieldSchema = createFieldSchema(field, names);
- JsonNode defaultValue = null;
- if (fieldSchema.getType() == Schema.Type.UNION) {
- Schema defaultType = fieldSchema.getTypes().get(0);
- if (defaultType.getType() == Schema.Type.NULL) {
- defaultValue = NullNode.getInstance();
- }
- }
- fields.add(new Schema.Field(clean(field.getName()),
- fieldSchema, null /* doc */, defaultValue));
- }
- if (error) // add Throwable message
- fields.add(new Schema.Field("detailMessage", THROWABLE_MESSAGE,
- null, null));
- schema.setFields(fields);
- }
- names.put(fullName, schema);
- }
- return schema;
- }
- return super.createSchema(type, names);
- }
-
- private static final Schema THROWABLE_MESSAGE =
- makeNullable(Schema.create(Schema.Type.STRING));
-
-
- @Override
- public Object getField(Object record, String name, int position) {
- if (record instanceof IndexedRecord)
- return super.getField(record, name, position);
- try {
- return getField(record.getClass(), name).get(record);
- } catch (IllegalAccessException e) {
- throw new AvroRuntimeException(e);
- }
- }
-
- private static final Map<Class,Map<String,Field>> FIELD_CACHE =
- new ConcurrentHashMap<Class,Map<String,Field>>();
-
- private static Field getField(Class c, String name) {
- Map<String,Field> fields = FIELD_CACHE.get(c);
- if (fields == null) {
- fields = new ConcurrentHashMap<String,Field>();
- FIELD_CACHE.put(c, fields);
- }
- Field f = fields.get(name);
- if (f == null) {
- f = findField(c, name);
- fields.put(name, f);
- }
- return f;
- }
-
- private static Field findField(Class original, String name) {
- Class c = original;
- do {
- try {
- Field f = c.getDeclaredField(dirty(name));
- f.setAccessible(true);
- return f;
- } catch (NoSuchFieldException e) {}
- c = c.getSuperclass();
- } while (c != null);
- throw new AvroRuntimeException("No field named "+name+" in: "+original);
- }
-
- private static String clean(String dirty) {
- return dirty.replace('$', '_');
- }
-
- private static String dirty(String clean) {
- return clean.replace('_', '$');
- }
-
- // Return of this class and its superclasses to serialize.
- // Not cached, since this is only used to create schemas, which are cached.
- private Collection<Field> getFields(Class recordClass) {
- Map<String,Field> fields = new LinkedHashMap<String,Field>();
- Class c = recordClass;
- do {
- if (c.getPackage() != null
- && c.getPackage().getName().startsWith("java."))
- break; // skip java built-in classes
- for (Field field : c.getDeclaredFields())
- if ((field.getModifiers() & (Modifier.TRANSIENT|Modifier.STATIC)) == 0)
- if (fields.put(field.getName(), field) != null)
- throw new AvroTypeException(c+" contains two fields named: "+field);
- c = c.getSuperclass();
- } while (c != null);
- return fields.values();
- }
-
- @SuppressWarnings(value="unchecked")
- private void setElement(Schema schema, Type element) {
- if (!(element instanceof Class)) return;
- Class<?> c = (Class<?>)element;
- Union union = c.getAnnotation(Union.class);
- if (union != null) // element is annotated union
- schema.addProp(ELEMENT_PROP, c.getName());
- }
-
- // construct a schema from a union annotation
- private Schema getAnnotatedUnion(Union union, Map<String,Schema> names) {
- List<Schema> branches = new ArrayList<Schema>();
- for (Class branch : union.value())
- branches.add(createSchema(branch, names));
- return Schema.createUnion(branches);
- }
-
- @Override
- protected boolean isArray(Object datum) {
- if (datum == null) return false;
- return (datum instanceof Collection) || datum.getClass().isArray() ||
- (datum instanceof scala.collection.Iterable);
- }
-
- @Override
- protected boolean isMap(Object datum) {
- return (datum instanceof java.util.Map) || (datum instanceof scala.collection.Map);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/java/org/apache/scrunch/ScalaSafeReflectDatumReader.java
----------------------------------------------------------------------
diff --git a/scrunch/src/main/java/org/apache/scrunch/ScalaSafeReflectDatumReader.java b/scrunch/src/main/java/org/apache/scrunch/ScalaSafeReflectDatumReader.java
deleted file mode 100644
index 7e9f6bc..0000000
--- a/scrunch/src/main/java/org/apache/scrunch/ScalaSafeReflectDatumReader.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch;
-
-import java.io.IOException;
-import java.lang.reflect.Array;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map;
-
-import org.apache.avro.Schema;
-import org.apache.avro.io.ResolvingDecoder;
-import org.apache.avro.reflect.ReflectDatumReader;
-
-import scala.collection.JavaConversions;
-
-/**
- *
- */
-public class ScalaSafeReflectDatumReader<T> extends ReflectDatumReader<T> {
-
- public ScalaSafeReflectDatumReader(Schema schema) {
- super(schema, schema, ScalaSafeReflectData.get());
- }
-
- @Override
- protected Object readArray(Object old, Schema expected,
- ResolvingDecoder in) throws IOException {
- Schema expectedType = expected.getElementType();
- long l = in.readArrayStart();
- long base = 0;
- if (l > 0) {
- Object array = newArray(old, (int) l, expected);
- do {
- for (long i = 0; i < l; i++) {
- addToArray(array, base + i, read(peekArray(array), expectedType, in));
- }
- base += l;
- } while ((l = in.arrayNext()) > 0);
- return scalaIterableCheck(array, expected);
- } else {
- return scalaIterableCheck(newArray(old, 0, expected), expected);
- }
- }
-
- @Override
- protected Object readMap(Object old, Schema expected,
- ResolvingDecoder in) throws IOException {
- return scalaMapCheck(super.readMap(old, expected, in), expected);
- }
-
- public static Object scalaMapCheck(Object map, Schema schema) {
- Class mapClass = ScalaSafeReflectData.getClassProp(schema,
- ScalaSafeReflectData.CLASS_PROP);
- if (mapClass != null && mapClass.isAssignableFrom(scala.collection.Map.class)) {
- return JavaConversions.mapAsScalaMap((Map) map);
- }
- return map;
- }
-
- public static Object scalaIterableCheck(Object array, Schema schema) {
- Class collectionClass = ScalaSafeReflectData.getClassProp(schema,
- ScalaSafeReflectData.CLASS_PROP);
- if (collectionClass != null) {
- if (scala.collection.Iterable.class.isAssignableFrom(collectionClass)) {
- scala.collection.Iterable it = toIter(array);
- if (scala.collection.immutable.List.class.isAssignableFrom(collectionClass)) {
- return it.toList();
- } else if (scala.collection.mutable.Buffer.class.isAssignableFrom(collectionClass)) {
- return it.toBuffer();
- } else if (scala.collection.immutable.Set.class.isAssignableFrom(collectionClass)) {
- return it.toSet();
- }
- return it;
- }
- }
- return array;
- }
-
- private static scala.collection.Iterable toIter(Object array) {
- return JavaConversions.collectionAsScalaIterable((Collection) array);
- }
-
- @Override
- @SuppressWarnings(value="unchecked")
- protected Object newArray(Object old, int size, Schema schema) {
- ScalaSafeReflectData data = ScalaSafeReflectData.get();
- Class collectionClass = ScalaSafeReflectData.getClassProp(schema,
- ScalaSafeReflectData.CLASS_PROP);
- if (collectionClass != null) {
- if (old instanceof Collection) {
- ((Collection)old).clear();
- return old;
- }
- if (scala.collection.Iterable.class.isAssignableFrom(collectionClass) ||
- collectionClass.isAssignableFrom(ArrayList.class)) {
- return new ArrayList();
- }
- return data.newInstance(collectionClass, schema);
- }
- Class elementClass = ScalaSafeReflectData.getClassProp(schema,
- ScalaSafeReflectData.ELEMENT_PROP);
- if (elementClass == null)
- elementClass = data.getClass(schema.getElementType());
- return Array.newInstance(elementClass, size);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/java/org/apache/scrunch/ScalaSafeReflectDatumWriter.java
----------------------------------------------------------------------
diff --git a/scrunch/src/main/java/org/apache/scrunch/ScalaSafeReflectDatumWriter.java b/scrunch/src/main/java/org/apache/scrunch/ScalaSafeReflectDatumWriter.java
deleted file mode 100644
index 6903f9a..0000000
--- a/scrunch/src/main/java/org/apache/scrunch/ScalaSafeReflectDatumWriter.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch;
-
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.avro.reflect.ReflectDatumWriter;
-
-import scala.collection.JavaConversions;
-
-/**
- *
- */
-public class ScalaSafeReflectDatumWriter<T> extends ReflectDatumWriter<T> {
- public ScalaSafeReflectDatumWriter() {
- super(ScalaSafeReflectData.get());
- }
-
- @Override
- protected long getArraySize(Object array) {
- if (array instanceof scala.collection.Iterable) {
- return ((scala.collection.Iterable) array).size();
- }
- return super.getArraySize(array);
- }
-
- @Override
- protected Iterator<Object> getArrayElements(Object array) {
- if (array instanceof scala.collection.Iterable) {
- return JavaConversions.asJavaIterable((scala.collection.Iterable) array).iterator();
- }
- return super.getArrayElements(array);
- }
-
- @Override
- protected int getMapSize(Object map) {
- if (map instanceof scala.collection.Map) {
- return ((scala.collection.Map) map).size();
- }
- return super.getMapSize(map);
- }
-
- /** Called by the default implementation of {@link #writeMap} to enumerate
- * map elements. The default implementation is for {@link Map}.*/
- @SuppressWarnings("unchecked")
- protected Iterable<Map.Entry<Object,Object>> getMapEntries(Object map) {
- if (map instanceof scala.collection.Map) {
- return JavaConversions.mapAsJavaMap((scala.collection.Map) map).entrySet();
- }
- return super.getMapEntries(map);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scala/org/apache/scrunch/Conversions.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/Conversions.scala b/scrunch/src/main/scala/org/apache/scrunch/Conversions.scala
deleted file mode 100644
index a704b80..0000000
--- a/scrunch/src/main/scala/org/apache/scrunch/Conversions.scala
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-import org.apache.crunch.{PCollection => JCollection, PGroupedTable => JGroupedTable, PTable => JTable, DoFn, Emitter}
-import org.apache.crunch.{Pair => CPair}
-import org.apache.crunch.types.PType
-import java.nio.ByteBuffer
-import scala.collection.Iterable
-
-trait CanParallelTransform[El, To] {
- def apply[A](c: PCollectionLike[A, _, JCollection[A]], fn: DoFn[A, El], ptype: PType[El]): To
-}
-
-trait LowPriorityParallelTransforms {
- implicit def single[B] = new CanParallelTransform[B, PCollection[B]] {
- def apply[A](c: PCollectionLike[A, _, JCollection[A]], fn: DoFn[A, B], ptype: PType[B]) = {
- c.parallelDo(fn, ptype)
- }
- }
-}
-
-object CanParallelTransform extends LowPriorityParallelTransforms {
- def tableType[K, V](ptype: PType[(K, V)]) = {
- val st = ptype.getSubTypes()
- ptype.getFamily().tableOf(st.get(0).asInstanceOf[PType[K]], st.get(1).asInstanceOf[PType[V]])
- }
-
- implicit def keyvalue[K, V] = new CanParallelTransform[(K, V), PTable[K,V]] {
- def apply[A](c: PCollectionLike[A, _, JCollection[A]], fn: DoFn[A, (K, V)], ptype: PType[(K, V)]) = {
- c.parallelDo(kvWrapFn(fn), tableType(ptype))
- }
- }
-
- def kvWrapFn[A, K, V](fn: DoFn[A, (K, V)]) = {
- new DoFn[A, CPair[K, V]] {
- override def process(input: A, emitFn: Emitter[CPair[K, V]]) {
- fn.process(input, new Emitter[(K, V)] {
- override def emit(kv: (K, V)) { emitFn.emit(CPair.of(kv._1, kv._2)) }
- override def flush() { emitFn.flush() }
- })
- }
- }
- }
-}
-
-trait PTypeH[T] {
- def get(ptf: PTypeFamily): PType[T]
-}
-
-object PTypeH {
-
- implicit val longs = new PTypeH[Long] { def get(ptf: PTypeFamily) = ptf.longs }
- implicit val ints = new PTypeH[Int] { def get(ptf: PTypeFamily) = ptf.ints }
- implicit val floats = new PTypeH[Float] { def get(ptf: PTypeFamily) = ptf.floats }
- implicit val doubles = new PTypeH[Double] { def get(ptf: PTypeFamily) = ptf.doubles }
- implicit val strings = new PTypeH[String] { def get(ptf: PTypeFamily) = ptf.strings }
- implicit val booleans = new PTypeH[Boolean] { def get(ptf: PTypeFamily) = ptf.booleans }
- implicit val bytes = new PTypeH[ByteBuffer] { def get(ptf: PTypeFamily) = ptf.bytes }
-
- implicit def collections[T: PTypeH] = {
- new PTypeH[Iterable[T]] {
- def get(ptf: PTypeFamily) = {
- ptf.collections(implicitly[PTypeH[T]].get(ptf))
- }
- }
- }
-
- implicit def lists[T: PTypeH] = {
- new PTypeH[List[T]] {
- def get(ptf: PTypeFamily) = {
- ptf.lists(implicitly[PTypeH[T]].get(ptf))
- }
- }
- }
-
- implicit def sets[T: PTypeH] = {
- new PTypeH[Set[T]] {
- def get(ptf: PTypeFamily) = {
- ptf.sets(implicitly[PTypeH[T]].get(ptf))
- }
- }
- }
-
- implicit def pairs[A: PTypeH, B: PTypeH] = {
- new PTypeH[(A, B)] {
- def get(ptf: PTypeFamily) = {
- ptf.tuple2(implicitly[PTypeH[A]].get(ptf), implicitly[PTypeH[B]].get(ptf))
- }
- }
- }
-
- implicit def trips[A: PTypeH, B: PTypeH, C: PTypeH] = {
- new PTypeH[(A, B, C)] {
- def get(ptf: PTypeFamily) = {
- ptf.tuple3(implicitly[PTypeH[A]].get(ptf), implicitly[PTypeH[B]].get(ptf),
- implicitly[PTypeH[C]].get(ptf))
- }
- }
- }
-
- implicit def quads[A: PTypeH, B: PTypeH, C: PTypeH, D: PTypeH] = {
- new PTypeH[(A, B, C, D)] {
- def get(ptf: PTypeFamily) = {
- ptf.tuple4(implicitly[PTypeH[A]].get(ptf), implicitly[PTypeH[B]].get(ptf),
- implicitly[PTypeH[C]].get(ptf), implicitly[PTypeH[D]].get(ptf))
- }
- }
- }
-
- implicit def records[T <: AnyRef : ClassManifest] = new PTypeH[T] {
- def get(ptf: PTypeFamily) = ptf.records(classManifest[T]).asInstanceOf[PType[T]]
- }
-}
-
-object Conversions {
- implicit def jtable2ptable[K, V](jtable: JTable[K, V]) = {
- new PTable[K, V](jtable)
- }
-
- implicit def jcollect2pcollect[S](jcollect: JCollection[S]) = {
- new PCollection[S](jcollect)
- }
-
- implicit def jgrouped2pgrouped[K, V](jgrouped: JGroupedTable[K, V]) = {
- new PGroupedTable[K, V](jgrouped)
- }
-
- implicit def pair2tuple[K, V](p: CPair[K, V]) = (p.first(), p.second())
-
- implicit def tuple2pair[K, V](t: (K, V)) = CPair.of(t._1, t._2)
-}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scala/org/apache/scrunch/EmbeddedPipeline.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/EmbeddedPipeline.scala b/scrunch/src/main/scala/org/apache/scrunch/EmbeddedPipeline.scala
deleted file mode 100644
index 8d69701..0000000
--- a/scrunch/src/main/scala/org/apache/scrunch/EmbeddedPipeline.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-import org.apache.hadoop.conf.Configuration
-
-/**
- * Adds a pipeline to the class it is being mixed in to.
- */
-trait EmbeddedPipeline {
- /** The pipeline to use. */
- protected def pipeline: Pipeline
-}
-
-/**
- * Adds a mapreduce pipeline to the class it is being mixed in to.
- */
-trait MREmbeddedPipeline extends EmbeddedPipeline with EmbeddedPipelineLike {
- protected val pipeline: Pipeline = {
- Pipeline.mapReduce(ClassManifest.fromClass(getClass()).erasure, new Configuration())
- }
-}
-
-/**
- * Adds an in memory pipeline to the class it is being mixed in to.
- */
-trait MemEmbeddedPipeline extends EmbeddedPipeline with EmbeddedPipelineLike {
- protected val pipeline: Pipeline = {
- Pipeline.inMemory
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scala/org/apache/scrunch/EmbeddedPipelineLike.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/EmbeddedPipelineLike.scala b/scrunch/src/main/scala/org/apache/scrunch/EmbeddedPipelineLike.scala
deleted file mode 100644
index 0fbd0ea..0000000
--- a/scrunch/src/main/scala/org/apache/scrunch/EmbeddedPipelineLike.scala
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-import org.apache.crunch.Source
-import org.apache.crunch.TableSource
-import org.apache.crunch.Target
-
-trait EmbeddedPipelineLike { self: EmbeddedPipeline =>
- /**
- * Reads a source into a [[org.apache.scrunch.PCollection]]
- *
- * @param source The source to read from.
- * @tparam T The type of the values being read.
- * @return A PCollection containing data read from the specified source.
- */
- def read[T](source: Source[T]): PCollection[T] = {
- pipeline.read(source)
- }
-
- /**
- * Reads a source into a [[org.apache.scrunch.PTable]]
- *
- * @param source The source to read from.
- * @tparam K The type of the keys being read.
- * @tparam V The type of the values being read.
- * @return A PCollection containing data read from the specified source.
- */
- def read[K, V](source: TableSource[K, V]): PTable[K, V] = {
- pipeline.read(source)
- }
-
- /**
- * Reads a source into a [[org.apache.scrunch.PCollection]]
- *
- * @param source The source to read from.
- * @tparam T The type of the values being read.
- * @return A PCollection containing data read from the specified source.
- */
- def load[T](source: Source[T]): PCollection[T] = {
- read(source)
- }
-
- /**
- * Reads a source into a [[org.apache.scrunch.PTable]]
- *
- * @param source The source to read from.
- * @tparam K The type of the keys being read.
- * @tparam V The type of the values being read.
- * @return A PCollection containing data read from the specified source.
- */
- def load[K, V](source: TableSource[K, V]): PTable[K, V] = {
- read(source)
- }
-
- /**
- * Writes a parallel collection to a target.
- *
- * @param collection The collection to write.
- * @param target The destination target for this write.
- */
- def write(collection: PCollection[_], target: Target) {
- pipeline.write(collection, target)
- }
-
- /**
- * Writes a parallel table to a target.
- *
- * @param table The table to write.
- * @param target The destination target for this write.
- */
- def write(table: PTable[_, _], target: Target) {
- pipeline.write(table, target)
- }
-
- /**
- * Writes a parallel collection to a target.
- *
- * @param collection The collection to write.
- * @param target The destination target for this write.
- */
- def store(collection: PCollection[_], target: Target) {
- write(collection, target)
- }
-
- /**
- * Writes a parallel table to a target.
- *
- * @param table The table to write.
- * @param target The destination target for this write.
- */
- def store(table: PTable[_, _], target: Target) {
- write(table, target)
- }
-
- /**
- * Constructs and executes a series of MapReduce jobs in order
- * to write data to the output targets.
- */
- def run() {
- pipeline.run
- }
-
- /**
- * Run any remaining jobs required to generate outputs and then
- * clean up any intermediate data files that were created in
- * this run or previous calls to `run`.
- */
- def done() {
- pipeline.done
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scala/org/apache/scrunch/IO.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/IO.scala b/scrunch/src/main/scala/org/apache/scrunch/IO.scala
deleted file mode 100644
index 9081d3f..0000000
--- a/scrunch/src/main/scala/org/apache/scrunch/IO.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-import org.apache.crunch.io.{From => from, To => to, At => at}
-import org.apache.crunch.types.avro.AvroType
-import org.apache.hadoop.fs.Path;
-
-trait From {
- def avroFile[T](path: String, atype: AvroType[T]) = from.avroFile(path, atype)
- def avroFile[T](path: Path, atype: AvroType[T]) = from.avroFile(path, atype)
- def textFile(path: String) = from.textFile(path)
- def textFile(path: Path) = from.textFile(path)
-}
-
-object From extends From
-
-trait To {
- def avroFile[T](path: String) = to.avroFile(path)
- def avroFile[T](path: Path) = to.avroFile(path)
- def textFile(path: String) = to.textFile(path)
- def textFile(path: Path) = to.textFile(path)
-}
-
-object To extends To
-
-trait At {
- def avroFile[T](path: String, atype: AvroType[T]) = at.avroFile(path, atype)
- def avroFile[T](path: Path, atype: AvroType[T]) = at.avroFile(path, atype)
- def textFile(path: String) = at.textFile(path)
- def textFile(path: Path) = at.textFile(path)
-}
-
-object At extends At
-
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scala/org/apache/scrunch/Mem.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/Mem.scala b/scrunch/src/main/scala/org/apache/scrunch/Mem.scala
deleted file mode 100644
index 1c4f233..0000000
--- a/scrunch/src/main/scala/org/apache/scrunch/Mem.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-import java.lang.{Iterable => JIterable}
-
-import scala.collection.JavaConversions._
-
-import org.apache.hadoop.conf.Configuration
-
-import org.apache.crunch.{Pair => P}
-import org.apache.crunch.{Source, TableSource, Target}
-import org.apache.crunch.impl.mem.MemPipeline
-import org.apache.scrunch.Conversions._
-
-/**
- * Object for working with in-memory PCollection and PTable instances.
- */
-object Mem extends MemEmbeddedPipeline with PipelineHelper {
- private val ptf = Avros
-
- /**
- * Constructs a PCollection using in memory data.
- *
- * @param collect The data to load.
- * @return A PCollection containing the specified data.
- */
- def collectionOf[T](ts: T*)(implicit pt: PTypeH[T]): PCollection[T] = {
- collectionOf(List(ts:_*))
- }
-
- /**
- * Constructs a PCollection using in memory data.
- *
- * @param collect The data to load.
- * @return A PCollection containing the specified data.
- */
- def collectionOf[T](collect: Iterable[T])(implicit pt: PTypeH[T]): PCollection[T] = {
- val native = MemPipeline.typedCollectionOf(pt.get(ptf), asJavaIterable(collect))
- new PCollection[T](native)
- }
-
- /**
- * Constructs a PTable using in memory data.
- *
- * @param pairs The data to load.
- * @return A PTable containing the specified data.
- */
- def tableOf[K, V](pairs: (K, V)*)(implicit pk: PTypeH[K], pv: PTypeH[V]): PTable[K, V] = {
- tableOf(List(pairs:_*))
- }
-
- /**
- * Constructs a PTable using in memory data.
- *
- * @param pairs The data to load.
- * @return A PTable containing the specified data.
- */
- def tableOf[K, V](pairs: Iterable[(K, V)])(implicit pk: PTypeH[K], pv: PTypeH[V]): PTable[K, V] = {
- val cpairs = pairs.map(kv => P.of(kv._1, kv._2))
- val ptype = ptf.tableOf(pk.get(ptf), pv.get(ptf))
- new PTable[K, V](MemPipeline.typedTableOf(ptype, asJavaIterable(cpairs)))
- }
-
- /** Contains factory methods used to create `Source`s. */
- val from = From
-
- /** Contains factory methods used to create `Target`s. */
- val to = To
-
- /** Contains factory methods used to create `SourceTarget`s. */
- val at = At
-}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scala/org/apache/scrunch/PCollection.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/PCollection.scala b/scrunch/src/main/scala/org/apache/scrunch/PCollection.scala
deleted file mode 100644
index 0924587..0000000
--- a/scrunch/src/main/scala/org/apache/scrunch/PCollection.scala
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-import scala.collection.JavaConversions
-
-import org.apache.crunch.{DoFn, Emitter, FilterFn, MapFn}
-import org.apache.crunch.{PCollection => JCollection, PTable => JTable, Pair => CPair, Target}
-import org.apache.crunch.lib.{Aggregate, Cartesian}
-import org.apache.scrunch.Conversions._
-import org.apache.scrunch.interpreter.InterpreterRunner
-
-class PCollection[S](val native: JCollection[S]) extends PCollectionLike[S, PCollection[S], JCollection[S]] {
- import PCollection._
-
- def filter(f: S => Boolean): PCollection[S] = {
- parallelDo(filterFn[S](f), native.getPType())
- }
-
- def map[T, To](f: S => T)(implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To = {
- b(this, mapFn(f), pt.get(getTypeFamily()))
- }
-
- def flatMap[T, To](f: S => Traversable[T])
- (implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To = {
- b(this, flatMapFn(f), pt.get(getTypeFamily()))
- }
-
- def union(others: PCollection[S]*) = {
- new PCollection[S](native.union(others.map(_.native) : _*))
- }
-
- def by[K: PTypeH](f: S => K): PTable[K, S] = {
- val ptype = getTypeFamily().tableOf(implicitly[PTypeH[K]].get(getTypeFamily()), native.getPType())
- parallelDo(mapKeyFn[S, K](f), ptype)
- }
-
- def groupBy[K: PTypeH](f: S => K): PGroupedTable[K, S] = {
- by(f).groupByKey
- }
-
- def cross[S2](other: PCollection[S2]): PCollection[(S, S2)] = {
- val inter = Cartesian.cross(this.native, other.native)
- val f = (in: CPair[S, S2]) => (in.first(), in.second())
- inter.parallelDo(mapFn(f), getTypeFamily().tuple2(pType, other.pType))
- }
-
- def materialize() = {
- InterpreterRunner.addReplJarsToJob(native.getPipeline().getConfiguration())
- JavaConversions.iterableAsScalaIterable[S](native.materialize)
- }
-
- def wrap(newNative: AnyRef) = new PCollection[S](newNative.asInstanceOf[JCollection[S]])
-
- def count() = {
- val count = new PTable[S, java.lang.Long](Aggregate.count(native))
- count.mapValues(_.longValue())
- }
-
- def max() = wrap(Aggregate.max(native))
-
- def pType = native.getPType()
-}
-
-trait SDoFn[S, T] extends DoFn[S, T] with Function1[S, Traversable[T]] {
- override def process(input: S, emitter: Emitter[T]) {
- for (v <- apply(input)) {
- emitter.emit(v)
- }
- }
-}
-
-trait SFilterFn[T] extends FilterFn[T] with Function1[T, Boolean] {
- override def accept(input: T) = apply(input)
-}
-
-trait SMapFn[S, T] extends MapFn[S, T] with Function1[S, T] {
- override def map(input: S) = apply(input)
-}
-
-trait SMapKeyFn[S, K] extends MapFn[S, CPair[K, S]] with Function1[S, K] {
- override def map(input: S): CPair[K, S] = {
- CPair.of(apply(input), input)
- }
-}
-
-object PCollection {
- def filterFn[S](fn: S => Boolean) = {
- new SFilterFn[S] { def apply(x: S) = fn(x) }
- }
-
- def mapKeyFn[S, K](fn: S => K) = {
- new SMapKeyFn[S, K] { def apply(x: S) = fn(x) }
- }
-
- def mapFn[S, T](fn: S => T) = {
- new SMapFn[S, T] { def apply(s: S) = fn(s) }
- }
-
- def flatMapFn[S, T](fn: S => Traversable[T]) = {
- new SDoFn[S, T] { def apply(s: S) = fn(s) }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scala/org/apache/scrunch/PCollectionLike.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/PCollectionLike.scala b/scrunch/src/main/scala/org/apache/scrunch/PCollectionLike.scala
deleted file mode 100644
index e912e60..0000000
--- a/scrunch/src/main/scala/org/apache/scrunch/PCollectionLike.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-import org.apache.crunch.DoFn
-import org.apache.crunch.{PCollection => JCollection, Pair => JPair, Target}
-import org.apache.crunch.types.{PType, PTableType}
-
-trait PCollectionLike[S, +FullType, +NativeType <: JCollection[S]] {
- val native: NativeType
-
- def wrap(newNative: AnyRef): FullType
-
- def write(target: Target): FullType = wrap(native.write(target))
-
- def parallelDo[T](fn: DoFn[S, T], ptype: PType[T]) = {
- new PCollection[T](native.parallelDo(fn, ptype))
- }
-
- def parallelDo[T](name: String, fn: DoFn[S,T], ptype: PType[T]) = {
- new PCollection[T](native.parallelDo(name, fn, ptype))
- }
-
- def parallelDo[K, V](fn: DoFn[S, JPair[K, V]], ptype: PTableType[K, V]) = {
- new PTable[K, V](native.parallelDo(fn, ptype))
- }
-
- def parallelDo[K, V](name: String, fn: DoFn[S, JPair[K, V]], ptype: PTableType[K, V]) = {
- new PTable[K, V](native.parallelDo(name, fn, ptype))
- }
-
- def getTypeFamily() = Avros
-}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scala/org/apache/scrunch/PGroupedTable.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/PGroupedTable.scala b/scrunch/src/main/scala/org/apache/scrunch/PGroupedTable.scala
deleted file mode 100644
index f4500a5..0000000
--- a/scrunch/src/main/scala/org/apache/scrunch/PGroupedTable.scala
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-import org.apache.crunch.{DoFn, Emitter, FilterFn, MapFn}
-import org.apache.crunch.{CombineFn, PGroupedTable => JGroupedTable, PTable => JTable, Pair => CPair}
-import java.lang.{Iterable => JIterable}
-import scala.collection.{Iterable, Iterator}
-import scala.collection.JavaConversions._
-import Conversions._
-
-class PGroupedTable[K, V](val native: JGroupedTable[K, V])
- extends PCollectionLike[CPair[K, JIterable[V]], PGroupedTable[K, V], JGroupedTable[K, V]] {
- import PGroupedTable._
-
- def filter(f: (K, Iterable[V]) => Boolean) = {
- parallelDo(filterFn[K, V](f), native.getPType())
- }
-
- def map[T, To](f: (K, Iterable[V]) => T)
- (implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To = {
- b(this, mapFn(f), pt.get(getTypeFamily()))
- }
-
- def flatMap[T, To](f: (K, Iterable[V]) => Traversable[T])
- (implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To = {
- b(this, flatMapFn(f), pt.get(getTypeFamily()))
- }
-
- def combine(f: Iterable[V] => V) = combineValues(new IterableCombineFn[K, V](f))
-
- def combineValues(fn: CombineFn[K, V]) = new PTable[K, V](native.combineValues(fn))
-
- def ungroup() = new PTable[K, V](native.ungroup())
-
- def wrap(newNative: AnyRef): PGroupedTable[K, V] = {
- new PGroupedTable[K, V](newNative.asInstanceOf[JGroupedTable[K, V]])
- }
-}
-
-class IterableCombineFn[K, V](f: Iterable[V] => V) extends CombineFn[K, V] {
- override def process(input: CPair[K, JIterable[V]], emitfn: Emitter[CPair[K, V]]) = {
- emitfn.emit(CPair.of(input.first(), f(iterableAsScalaIterable[V](input.second()))))
- }
-}
-
-trait SFilterGroupedFn[K, V] extends FilterFn[CPair[K, JIterable[V]]] with Function2[K, Iterable[V], Boolean] {
- override def accept(input: CPair[K, JIterable[V]]) = apply(input.first(), iterableAsScalaIterable[V](input.second()))
-}
-
-trait SDoGroupedFn[K, V, T] extends DoFn[CPair[K, JIterable[V]], T] with Function2[K, Iterable[V], Traversable[T]] {
- override def process(input: CPair[K, JIterable[V]], emitter: Emitter[T]) {
- for (v <- apply(input.first(), iterableAsScalaIterable[V](input.second()))) {
- emitter.emit(v)
- }
- }
-}
-
-trait SMapGroupedFn[K, V, T] extends MapFn[CPair[K, JIterable[V]], T] with Function2[K, Iterable[V], T] {
- override def map(input: CPair[K, JIterable[V]]) = {
- apply(input.first(), iterableAsScalaIterable[V](input.second()))
- }
-}
-
-object PGroupedTable {
- def filterFn[K, V](fn: (K, Iterable[V]) => Boolean) = {
- new SFilterGroupedFn[K, V] { def apply(k: K, v: Iterable[V]) = fn(k, v) }
- }
-
- def mapFn[K, V, T](fn: (K, Iterable[V]) => T) = {
- new SMapGroupedFn[K, V, T] { def apply(k: K, v: Iterable[V]) = fn(k, v) }
- }
-
- def flatMapFn[K, V, T](fn: (K, Iterable[V]) => Traversable[T]) = {
- new SDoGroupedFn[K, V, T] { def apply(k: K, v: Iterable[V]) = fn(k, v) }
- }
-}