You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by to...@apache.org on 2017/04/13 15:32:11 UTC
crunch git commit: CRUNCH-618: Run on Spark 2. Contributed by Gergő Pásztor.
Repository: crunch
Updated Branches:
refs/heads/master ce9aaa3a5 -> 047d8fd36
CRUNCH-618: Run on Spark 2. Contributed by Gerg\u0151 P�sztor.
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/047d8fd3
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/047d8fd3
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/047d8fd3
Branch: refs/heads/master
Commit: 047d8fd36773608a3d2cf6445881173e7d26377c
Parents: ce9aaa3
Author: Tom White <to...@apache.org>
Authored: Thu Apr 13 16:10:23 2017 +0100
Committer: Tom White <to...@apache.org>
Committed: Thu Apr 13 16:10:23 2017 +0100
----------------------------------------------------------------------
crunch-kafka/pom.xml | 2 +-
crunch-scrunch/pom.xml | 2 +-
.../interpreter/InterpreterJarTest.scala | 23 +++++++++-------
.../org/apache/crunch/scrunch/PTypeFamily.scala | 4 +--
.../scrunch/interpreter/InterpreterRunner.scala | 27 ++++++++++---------
.../org/apache/crunch/fn/IterableIterator.java | 28 ++++++++++++++++++++
.../crunch/fn/SDoubleFlatMapFunction.java | 2 +-
.../org/apache/crunch/fn/SFlatMapFunction.java | 2 +-
.../org/apache/crunch/fn/SFlatMapFunction2.java | 2 +-
.../java/org/apache/crunch/fn/SFunctions.java | 8 +++---
.../apache/crunch/fn/SPairFlatMapFunction.java | 2 +-
.../impl/spark/fn/CombineMapsideFunction.java | 4 +--
.../crunch/impl/spark/fn/CrunchPairTuple2.java | 9 ++-----
.../crunch/impl/spark/fn/FlatMapPairDoFn.java | 4 +--
.../crunch/impl/spark/fn/PairFlatMapDoFn.java | 4 +--
.../impl/spark/fn/ReduceGroupingFunction.java | 9 ++-----
pom.xml | 10 +++----
17 files changed, 83 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-kafka/pom.xml b/crunch-kafka/pom.xml
index 961b106..32429f5 100644
--- a/crunch-kafka/pom.xml
+++ b/crunch-kafka/pom.xml
@@ -40,7 +40,7 @@ under the License.
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
+ <artifactId>kafka_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-scrunch/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-scrunch/pom.xml b/crunch-scrunch/pom.xml
index 51925fb..9376059 100644
--- a/crunch-scrunch/pom.xml
+++ b/crunch-scrunch/pom.xml
@@ -38,7 +38,7 @@ under the License.
<artifactId>scala-compiler</artifactId>
</dependency>
<dependency>
- <groupId>org.scala-lang</groupId>
+ <groupId>jline</groupId>
<artifactId>jline</artifactId>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/interpreter/InterpreterJarTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/interpreter/InterpreterJarTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/interpreter/InterpreterJarTest.scala
index 5ebc303..bc9bd0f 100644
--- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/interpreter/InterpreterJarTest.scala
+++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/interpreter/InterpreterJarTest.scala
@@ -22,8 +22,6 @@ import java.io.FileOutputStream
import java.util.jar.JarFile
import java.util.jar.JarOutputStream
-import scala.tools.nsc.io.VirtualDirectory
-
import com.google.common.io.Files
import org.junit.Assert.assertNotNull
import org.junit.Test
@@ -31,30 +29,35 @@ import org.apache.crunch.test.CrunchTestSupport
import org.scalatest.junit.JUnitSuite
import org.apache.crunch.scrunch.CrunchSuite
+import scala.tools.nsc.interpreter.{ReplDir, ReplOutput}
+import scala.tools.nsc.settings.MutableSettings
+
/**
- * Tests creating jars from a {@link scala.tools.nsc.io.VirtualDirectory}.
+ * Tests creating jars from a {@link scala.tools.nsc.interpreter.ReplDir}.
*/
class InterpreterJarTest extends CrunchSuite {
/**
- * Tests transforming a virtual directory into a temporary jar file.
+ * Tests transforming an output directory into a temporary jar file.
*/
- @Test def virtualDirToJar: Unit = {
- // Create a virtual directory and populate with some mock content.
- val root = new VirtualDirectory("testDir", None)
+ @Test def outputDirToJar: Unit = {
+ // Create an output directory and populate with some mock content.
+ val settings = new MutableSettings(e => println("ERROR: "+e))
+ val dirSetting = settings.StringSetting("-Yrepl-outdir", "path", "Test path", "")
+ val root: ReplDir = new ReplOutput(dirSetting).dir
// Add some subdirectories to the root.
(1 to 10).foreach { i =>
- val subdir = root.subdirectoryNamed("subdir" + i).asInstanceOf[VirtualDirectory]
+ val subdir = root.subdirectoryNamed("subdir" + i)
// Add some classfiles to each sub directory.
(1 to 10).foreach { j =>
subdir.fileNamed("MyClass" + j + ".class")
}
}
- // Now generate a jar file from the virtual directory.
+ // Now generate a jar file from the output directory.
val tempJar = new File(tempDir.getRootFile(), "replJar.jar")
val jarStream = new JarOutputStream(new FileOutputStream(tempJar))
- InterpreterRunner.addVirtualDirectoryToJar(root, "top/pack/name/", jarStream)
+ InterpreterRunner.addOutputDirectoryToJar(root, "top/pack/name/", jarStream)
jarStream.close()
// Verify the contents of the jar.
http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
index 47cf637..a140acd 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
@@ -264,8 +264,8 @@ trait PTypeFamily extends GeneratedTuplePTypeFamily {
}
private def products[T <: Product](tpe: Type, mirror: Mirror): PType[T] = {
- val ctor = tpe.member(nme.CONSTRUCTOR).asMethod
- val args = ctor.paramss.head.map(x => (x.name.toString,
+ val ctor = tpe.member(termNames.CONSTRUCTOR).asMethod
+ val args = ctor.paramLists.head.map(x => (x.name.toString,
typeToPType(x.typeSignature, mirror)))
val out = (x: Product) => TupleN.of(x.productIterator.toArray.asInstanceOf[Array[Object]] : _*)
val rtc = mirror.runtimeClass(tpe)
http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/interpreter/InterpreterRunner.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/interpreter/InterpreterRunner.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/interpreter/InterpreterRunner.scala
index 0d84381..9416f1f 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/interpreter/InterpreterRunner.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/interpreter/InterpreterRunner.scala
@@ -29,11 +29,9 @@ import scala.tools.nsc.ObjectRunner
import scala.tools.nsc.Properties
import scala.tools.nsc.ScriptRunner
import scala.tools.nsc.interpreter.ILoop
-import scala.tools.nsc.io.Jar
-import scala.tools.nsc.io.VirtualDirectory
+import scala.tools.nsc.io.{AbstractFile, Jar}
import com.google.common.io.Files
import org.apache.hadoop.conf.Configuration
-
import org.apache.crunch.util.DistCache
import org.apache.commons.io.IOUtils
@@ -126,7 +124,7 @@ object InterpreterRunner extends MainGenericRunner {
ScriptRunner.runCommand(settings, combinedCode, thingToRun +: command.arguments)
}
else runTarget() match {
- case Left(ex) => errorFn(ex.toString())
+ case Left(ex) => errorFn(ex.getMessage(), Some(ex))
case Right(b) => b
}
}
@@ -145,11 +143,11 @@ object InterpreterRunner extends MainGenericRunner {
def createReplCodeJar(): File = {
var jarStream: JarOutputStream = null
try {
- val virtualDirectory = repl.intp.virtualDirectory.asInstanceOf[VirtualDirectory]
+ val outputDirectory = repl.replOutput.dir
val tempDir = Files.createTempDir()
val tempJar = new File(tempDir, "replJar.jar")
jarStream = new JarOutputStream(new FileOutputStream(tempJar))
- addVirtualDirectoryToJar(virtualDirectory, "", jarStream)
+ addOutputDirectoryToJar(outputDirectory, "", jarStream)
return tempJar
} finally {
IOUtils.closeQuietly(jarStream)
@@ -157,14 +155,14 @@ object InterpreterRunner extends MainGenericRunner {
}
/**
- * Add the contents of the specified virtual directory to a jar. This method will recursively
+ * Add the contents of the specified output directory to a jar. This method will recursively
* descend into subdirectories to add their contents.
*
- * @param dir The virtual directory whose contents should be added.
- * @param entryPath The entry path for classes found in the virtual directory.
+ * @param dir The output directory whose contents should be added.
+ * @param entryPath The entry path for classes found in the output directory.
* @param jarStream An output stream for writing the jar file.
*/
- def addVirtualDirectoryToJar(dir: VirtualDirectory, entryPath: String, jarStream:
+ def addOutputDirectoryToJar(dir: AbstractFile, entryPath: String, jarStream:
JarOutputStream): Unit = {
dir.foreach { file =>
if (file.isDirectory) {
@@ -173,8 +171,7 @@ object InterpreterRunner extends MainGenericRunner {
val entry: JarEntry = new JarEntry(dirPath)
jarStream.putNextEntry(entry)
jarStream.closeEntry()
- addVirtualDirectoryToJar(file.asInstanceOf[VirtualDirectory],
- dirPath, jarStream)
+ addOutputDirectoryToJar(file, dirPath, jarStream)
} else if (file.hasExtension("class")) {
// Add class files as an entry in the jar file and write the class to the jar.
val entry: JarEntry = new JarEntry(entryPath + file.name)
@@ -197,7 +194,11 @@ object InterpreterRunner extends MainGenericRunner {
// Generate a jar of REPL code and add to the distributed cache.
val replJarFile = createReplCodeJar()
DistCache.addJarToDistributedCache(configuration, replJarFile)
- // Get the paths to jars added with the :cp command.
+ /**
+ * Get the paths to jars added with the :cp command.
+ * The next line will cause a Deprecation Warning, because of the 'repl.addedClasspath', but
+ * we can safely ignore it as we are not using it to modify the classpath.
+ */
val addedJarPaths = repl.addedClasspath.split(':')
addedJarPaths.foreach {
path => if (path.endsWith(".jar")) DistCache.addJarToDistributedCache(configuration, path)
http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-spark/src/main/java/org/apache/crunch/fn/IterableIterator.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/IterableIterator.java b/crunch-spark/src/main/java/org/apache/crunch/fn/IterableIterator.java
new file mode 100644
index 0000000..3c06c13
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/fn/IterableIterator.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import java.util.Iterator;
+
+class IterableIterator<T> implements Iterable<T> {
+ private final Iterator<T> itr;
+ IterableIterator(Iterator<T> itr) {
+ this.itr = itr;
+ }
+ public Iterator<T> iterator() { return itr;}
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-spark/src/main/java/org/apache/crunch/fn/SDoubleFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/SDoubleFlatMapFunction.java b/crunch-spark/src/main/java/org/apache/crunch/fn/SDoubleFlatMapFunction.java
index f3f67cc..01b0b4c 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/fn/SDoubleFlatMapFunction.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/fn/SDoubleFlatMapFunction.java
@@ -30,7 +30,7 @@ public abstract class SDoubleFlatMapFunction<T> extends SparkDoFn<T, Double>
@Override
public void process(T input, Emitter<Double> emitter) {
try {
- for (Double d : call(input)) {
+ for (Double d : new IterableIterator<Double>(call(input))) {
emitter.emit(d);
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction.java b/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction.java
index 1fecb76..9ee4d9f 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction.java
@@ -30,7 +30,7 @@ public abstract class SFlatMapFunction<T, R> extends SparkDoFn<T, R>
@Override
public void process(T input, Emitter<R> emitter) {
try {
- for (R r : call(input)) {
+ for (R r : new IterableIterator<R>(call(input))) {
emitter.emit(r);
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction2.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction2.java b/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction2.java
index 0798f63..d7c6514 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction2.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction2.java
@@ -32,7 +32,7 @@ public abstract class SFlatMapFunction2<K, V, R> extends DoFn<Pair<K, V>, R>
@Override
public void process(Pair<K, V> input, Emitter<R> emitter) {
try {
- for (R r : call(input.first(), input.second())) {
+ for (R r : new IterableIterator<R>(call(input.first(), input.second()))) {
emitter.emit(r);
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-spark/src/main/java/org/apache/crunch/fn/SFunctions.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/SFunctions.java b/crunch-spark/src/main/java/org/apache/crunch/fn/SFunctions.java
index cc59746..0ba7a37 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/fn/SFunctions.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/fn/SFunctions.java
@@ -17,6 +17,8 @@
*/
package org.apache.crunch.fn;
+import java.util.Iterator;
+
import org.apache.spark.api.java.function.DoubleFlatMapFunction;
import org.apache.spark.api.java.function.DoubleFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
@@ -62,7 +64,7 @@ public final class SFunctions {
public static <T, R> SFlatMapFunction<T, R> wrap(final FlatMapFunction<T, R> f) {
return new SFlatMapFunction<T, R>() {
@Override
- public Iterable<R> call(T t) throws Exception {
+ public Iterator<R> call(T t) throws Exception {
return f.call(t);
}
};
@@ -71,7 +73,7 @@ public final class SFunctions {
public static <K, V, R> SFlatMapFunction2<K, V, R> wrap(final FlatMapFunction2<K, V, R> f) {
return new SFlatMapFunction2<K, V, R>() {
@Override
- public Iterable<R> call(K k, V v) throws Exception {
+ public Iterator<R> call(K k, V v) throws Exception {
return f.call(k, v);
}
};
@@ -89,7 +91,7 @@ public final class SFunctions {
public static <T> SDoubleFlatMapFunction<T> wrap(final DoubleFlatMapFunction<T> f) {
return new SDoubleFlatMapFunction<T>() {
@Override
- public Iterable<Double> call(T t) throws Exception {
+ public Iterator<Double> call(T t) throws Exception {
return f.call(t);
}
};
http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-spark/src/main/java/org/apache/crunch/fn/SPairFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/SPairFlatMapFunction.java b/crunch-spark/src/main/java/org/apache/crunch/fn/SPairFlatMapFunction.java
index 3b8e75a..2becd48 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/fn/SPairFlatMapFunction.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/fn/SPairFlatMapFunction.java
@@ -32,7 +32,7 @@ public abstract class SPairFlatMapFunction<T, K, V> extends SparkDoFn<T, Pair<K,
@Override
public void process(T input, Emitter<Pair<K, V>> emitter) {
try {
- for (Tuple2<K, V> kv : call(input)) {
+ for (Tuple2<K, V> kv : new IterableIterator<Tuple2<K, V>>(call(input))) {
emitter.emit(Pair.of(kv._1(), kv._2()));
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CombineMapsideFunction.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CombineMapsideFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CombineMapsideFunction.java
index 1bea08d..231de77 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CombineMapsideFunction.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CombineMapsideFunction.java
@@ -45,7 +45,7 @@ public class CombineMapsideFunction<K, V> implements PairFlatMapFunction<Iterato
}
@Override
- public Iterable<Tuple2<K, V>> call(Iterator<Tuple2<K, V>> iter) throws Exception {
+ public Iterator<Tuple2<K, V>> call(Iterator<Tuple2<K, V>> iter) throws Exception {
ctxt.initialize(combineFn, null);
Map<K, List<V>> cache = Maps.newHashMap();
int cnt = 0;
@@ -63,7 +63,7 @@ public class CombineMapsideFunction<K, V> implements PairFlatMapFunction<Iterato
}
}
- return new Flattener<K, V>(cache);
+ return new Flattener<K, V>(cache).iterator();
}
private Map<K, List<V>> reduce(Map<K, List<V>> cache) {
http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CrunchPairTuple2.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CrunchPairTuple2.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CrunchPairTuple2.java
index d6c544c..ca3011f 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CrunchPairTuple2.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CrunchPairTuple2.java
@@ -27,12 +27,7 @@ import java.util.Iterator;
public class CrunchPairTuple2<K, V> implements PairFlatMapFunction<Iterator<Pair<K, V>>, K, V> {
@Override
- public Iterable<Tuple2<K, V>> call(final Iterator<Pair<K, V>> iterator) throws Exception {
- return new Iterable<Tuple2<K, V>>() {
- @Override
- public Iterator<Tuple2<K, V>> iterator() {
- return Iterators.transform(iterator, GuavaUtils.<K, V>pair2tupleFunc());
- }
- };
+ public Iterator<Tuple2<K, V>> call(final Iterator<Pair<K, V>> iterator) throws Exception {
+ return Iterators.transform(iterator, GuavaUtils.<K, V>pair2tupleFunc());
}
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapPairDoFn.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapPairDoFn.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapPairDoFn.java
index 8ec2834..aca59f3 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapPairDoFn.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapPairDoFn.java
@@ -37,9 +37,9 @@ public class FlatMapPairDoFn<K, V, T> implements FlatMapFunction<Iterator<Tuple2
}
@Override
- public Iterable<T> call(Iterator<Tuple2<K, V>> input) throws Exception {
+ public Iterator<T> call(Iterator<Tuple2<K, V>> input) throws Exception {
ctxt.initialize(fn, null);
return new CrunchIterable<Pair<K, V>, T>(fn,
- Iterators.transform(input, GuavaUtils.<K, V>tuple2PairFunc()));
+ Iterators.transform(input, GuavaUtils.<K, V>tuple2PairFunc())).iterator();
}
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapDoFn.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapDoFn.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapDoFn.java
index 7f289cc..c012e96 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapDoFn.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapDoFn.java
@@ -37,10 +37,10 @@ public class PairFlatMapDoFn<T, K, V> implements PairFlatMapFunction<Iterator<T>
}
@Override
- public Iterable<Tuple2<K, V>> call(Iterator<T> input) throws Exception {
+ public Iterator<Tuple2<K, V>> call(Iterator<T> input) throws Exception {
ctxt.initialize(fn, null);
return Iterables.transform(
new CrunchIterable<T, Pair<K, V>>(fn, input),
- GuavaUtils.<K, V>pair2tupleFunc());
+ GuavaUtils.<K, V>pair2tupleFunc()).iterator();
}
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceGroupingFunction.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceGroupingFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceGroupingFunction.java
index d3dd69e..eb14dfe 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceGroupingFunction.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceGroupingFunction.java
@@ -50,14 +50,9 @@ public class ReduceGroupingFunction implements PairFlatMapFunction<Iterator<Tupl
}
@Override
- public Iterable<Tuple2<ByteArray, List<byte[]>>> call(
+ public Iterator<Tuple2<ByteArray, List<byte[]>>> call(
final Iterator<Tuple2<ByteArray, List<byte[]>>> iter) throws Exception {
- return new Iterable<Tuple2<ByteArray, List<byte[]>>>() {
- @Override
- public Iterator<Tuple2<ByteArray, List<byte[]>>> iterator() {
- return new GroupingIterator(iter, rawComparator());
- }
- };
+ return new GroupingIterator(iter, rawComparator());
}
private RawComparator<?> rawComparator() {
http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7a570c0..5c45568 100644
--- a/pom.xml
+++ b/pom.xml
@@ -105,11 +105,11 @@ under the License.
<avro.classifier>hadoop2</avro.classifier>
<kafka.version>0.10.0.1</kafka.version>
- <scala.base.version>2.10</scala.base.version>
- <scala.version>2.10.4</scala.version>
+ <scala.base.version>2.11</scala.base.version>
+ <scala.version>2.11.8</scala.version>
<scalatest.version>2.2.4</scalatest.version>
- <spark.version>1.3.1</spark.version>
- <jline.version>2.10.4</jline.version>
+ <spark.version>2.0.0</spark.version>
+ <jline.version>2.12.1</jline.version>
<jsr305.version>1.3.9</jsr305.version>
</properties>
@@ -456,7 +456,7 @@ under the License.
</dependency>
<dependency>
- <groupId>org.scala-lang</groupId>
+ <groupId>jline</groupId>
<artifactId>jline</artifactId>
<version>${jline.version}</version>
</dependency>