You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/01/26 12:55:35 UTC
spark git commit: [SPARK-3369][CORE][STREAMING] Java mapPartitions
Iterator->Iterable is inconsistent with Scala's Iterator->Iterator
Repository: spark
Updated Branches:
refs/heads/master 5936bf9fa -> 649e9d0f5
[SPARK-3369][CORE][STREAMING] Java mapPartitions Iterator->Iterable is inconsistent with Scala's Iterator->Iterator
Fix Java function API methods for flatMap and mapPartitions to require producing only an Iterator, not Iterable. Also fix DStream.flatMap to require a function producing TraversableOnce only, not Traversable.
CC rxin pwendell for API change; tdas since it also touches streaming.
Author: Sean Owen <so...@cloudera.com>
Closes #10413 from srowen/SPARK-3369.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/649e9d0f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/649e9d0f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/649e9d0f
Branch: refs/heads/master
Commit: 649e9d0f5b2d5fc13f2dd5be675331510525927f
Parents: 5936bf9
Author: Sean Owen <so...@cloudera.com>
Authored: Tue Jan 26 11:55:28 2016 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Tue Jan 26 11:55:28 2016 +0000
----------------------------------------------------------------------
.../api/java/function/CoGroupFunction.java | 2 +-
.../java/function/DoubleFlatMapFunction.java | 3 +-
.../api/java/function/FlatMapFunction.java | 3 +-
.../api/java/function/FlatMapFunction2.java | 3 +-
.../java/function/FlatMapGroupsFunction.java | 2 +-
.../java/function/MapPartitionsFunction.java | 2 +-
.../api/java/function/PairFlatMapFunction.java | 3 +-
.../org/apache/spark/api/java/JavaRDDLike.scala | 20 ++++++-------
.../java/org/apache/spark/JavaAPISuite.java | 24 +++++++--------
docs/streaming-programming-guide.md | 4 +--
.../org/apache/spark/examples/JavaPageRank.java | 18 +++++-------
.../apache/spark/examples/JavaWordCount.java | 5 ++--
.../examples/streaming/JavaActorWordCount.java | 5 ++--
.../examples/streaming/JavaCustomReceiver.java | 7 +++--
.../streaming/JavaDirectKafkaWordCount.java | 6 ++--
.../examples/streaming/JavaKafkaWordCount.java | 9 +++---
.../streaming/JavaNetworkWordCount.java | 11 +++----
.../JavaRecoverableNetworkWordCount.java | 6 ++--
.../streaming/JavaSqlNetworkWordCount.java | 8 ++---
.../streaming/JavaStatefulNetworkWordCount.java | 5 ++--
.../JavaTwitterHashTagJoinSentiments.java | 5 ++--
.../java/org/apache/spark/Java8APISuite.java | 2 +-
.../streaming/JavaKinesisWordCountASL.java | 9 ++++--
project/MimaExcludes.scala | 31 ++++++++++++++++++++
.../scala/org/apache/spark/sql/Dataset.scala | 4 +--
.../org/apache/spark/sql/JavaDatasetSuite.java | 25 +++++++---------
.../streaming/api/java/JavaDStreamLike.scala | 10 +++----
.../spark/streaming/dstream/DStream.scala | 2 +-
.../streaming/dstream/FlatMappedDStream.scala | 2 +-
.../apache/spark/streaming/JavaAPISuite.java | 20 ++++++-------
30 files changed, 146 insertions(+), 110 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java b/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java
index 279639a..07aebb7 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java
@@ -25,5 +25,5 @@ import java.util.Iterator;
* Datasets.
*/
public interface CoGroupFunction<K, V1, V2, R> extends Serializable {
- Iterable<R> call(K key, Iterator<V1> left, Iterator<V2> right) throws Exception;
+ Iterator<R> call(K key, Iterator<V1> left, Iterator<V2> right) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
index 57fd0a7..576087b 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
@@ -18,10 +18,11 @@
package org.apache.spark.api.java.function;
import java.io.Serializable;
+import java.util.Iterator;
/**
* A function that returns zero or more records of type Double from each input record.
*/
public interface DoubleFlatMapFunction<T> extends Serializable {
- public Iterable<Double> call(T t) throws Exception;
+ Iterator<Double> call(T t) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
index ef0d182..2d8ea6d 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
@@ -18,10 +18,11 @@
package org.apache.spark.api.java.function;
import java.io.Serializable;
+import java.util.Iterator;
/**
* A function that returns zero or more output records from each input record.
*/
public interface FlatMapFunction<T, R> extends Serializable {
- Iterable<R> call(T t) throws Exception;
+ Iterator<R> call(T t) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
index 14a98a3..fc97b63 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
@@ -18,10 +18,11 @@
package org.apache.spark.api.java.function;
import java.io.Serializable;
+import java.util.Iterator;
/**
* A function that takes two inputs and returns zero or more output records.
*/
public interface FlatMapFunction2<T1, T2, R> extends Serializable {
- Iterable<R> call(T1 t1, T2 t2) throws Exception;
+ Iterator<R> call(T1 t1, T2 t2) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java
index d7a80e7..bae574a 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java
@@ -24,5 +24,5 @@ import java.util.Iterator;
* A function that returns zero or more output records from each grouping key and its values.
*/
public interface FlatMapGroupsFunction<K, V, R> extends Serializable {
- Iterable<R> call(K key, Iterator<V> values) throws Exception;
+ Iterator<R> call(K key, Iterator<V> values) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java b/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java
index 6cb569c..cf9945a 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java
@@ -24,5 +24,5 @@ import java.util.Iterator;
* Base interface for function used in Dataset's mapPartitions.
*/
public interface MapPartitionsFunction<T, U> extends Serializable {
- Iterable<U> call(Iterator<T> input) throws Exception;
+ Iterator<U> call(Iterator<T> input) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
index 691ef2e..51eed2e 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
@@ -18,6 +18,7 @@
package org.apache.spark.api.java.function;
import java.io.Serializable;
+import java.util.Iterator;
import scala.Tuple2;
@@ -26,5 +27,5 @@ import scala.Tuple2;
* key-value pairs are represented as scala.Tuple2 objects.
*/
public interface PairFlatMapFunction<T, K, V> extends Serializable {
- public Iterable<Tuple2<K, V>> call(T t) throws Exception;
+ Iterator<Tuple2<K, V>> call(T t) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 0f8d13c..7340def 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -121,7 +121,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* RDD, and then flattening the results.
*/
def flatMap[U](f: FlatMapFunction[T, U]): JavaRDD[U] = {
- def fn: (T) => Iterable[U] = (x: T) => f.call(x).asScala
+ def fn: (T) => Iterator[U] = (x: T) => f.call(x).asScala
JavaRDD.fromRDD(rdd.flatMap(fn)(fakeClassTag[U]))(fakeClassTag[U])
}
@@ -130,7 +130,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* RDD, and then flattening the results.
*/
def flatMapToDouble(f: DoubleFlatMapFunction[T]): JavaDoubleRDD = {
- def fn: (T) => Iterable[jl.Double] = (x: T) => f.call(x).asScala
+ def fn: (T) => Iterator[jl.Double] = (x: T) => f.call(x).asScala
new JavaDoubleRDD(rdd.flatMap(fn).map((x: jl.Double) => x.doubleValue()))
}
@@ -139,7 +139,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* RDD, and then flattening the results.
*/
def flatMapToPair[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
- def fn: (T) => Iterable[(K2, V2)] = (x: T) => f.call(x).asScala
+ def fn: (T) => Iterator[(K2, V2)] = (x: T) => f.call(x).asScala
def cm: ClassTag[(K2, V2)] = implicitly[ClassTag[(K2, V2)]]
JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(fakeClassTag[K2], fakeClassTag[V2])
}
@@ -149,7 +149,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaRDD[U] = {
def fn: (Iterator[T]) => Iterator[U] = {
- (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
+ (x: Iterator[T]) => f.call(x.asJava).asScala
}
JavaRDD.fromRDD(rdd.mapPartitions(fn)(fakeClassTag[U]))(fakeClassTag[U])
}
@@ -160,7 +160,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U],
preservesPartitioning: Boolean): JavaRDD[U] = {
def fn: (Iterator[T]) => Iterator[U] = {
- (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
+ (x: Iterator[T]) => f.call(x.asJava).asScala
}
JavaRDD.fromRDD(
rdd.mapPartitions(fn, preservesPartitioning)(fakeClassTag[U]))(fakeClassTag[U])
@@ -171,7 +171,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = {
def fn: (Iterator[T]) => Iterator[jl.Double] = {
- (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
+ (x: Iterator[T]) => f.call(x.asJava).asScala
}
new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: jl.Double) => x.doubleValue()))
}
@@ -182,7 +182,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]):
JavaPairRDD[K2, V2] = {
def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
- (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
+ (x: Iterator[T]) => f.call(x.asJava).asScala
}
JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(fakeClassTag[K2], fakeClassTag[V2])
}
@@ -193,7 +193,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]],
preservesPartitioning: Boolean): JavaDoubleRDD = {
def fn: (Iterator[T]) => Iterator[jl.Double] = {
- (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
+ (x: Iterator[T]) => f.call(x.asJava).asScala
}
new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning)
.map(x => x.doubleValue()))
@@ -205,7 +205,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2],
preservesPartitioning: Boolean): JavaPairRDD[K2, V2] = {
def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
- (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
+ (x: Iterator[T]) => f.call(x.asJava).asScala
}
JavaPairRDD.fromRDD(
rdd.mapPartitions(fn, preservesPartitioning))(fakeClassTag[K2], fakeClassTag[V2])
@@ -290,7 +290,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
other: JavaRDDLike[U, _],
f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V]): JavaRDD[V] = {
def fn: (Iterator[T], Iterator[U]) => Iterator[V] = {
- (x: Iterator[T], y: Iterator[U]) => f.call(x.asJava, y.asJava).iterator().asScala
+ (x: Iterator[T], y: Iterator[U]) => f.call(x.asJava, y.asJava).asScala
}
JavaRDD.fromRDD(
rdd.zipPartitions(other.rdd)(fn)(other.classTag, fakeClassTag[V]))(fakeClassTag[V])
http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/core/src/test/java/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index 44d5cac..8117ad9 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -880,8 +880,8 @@ public class JavaAPISuite implements Serializable {
"The quick brown fox jumps over the lazy dog."));
JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String, String>() {
@Override
- public Iterable<String> call(String x) {
- return Arrays.asList(x.split(" "));
+ public Iterator<String> call(String x) {
+ return Arrays.asList(x.split(" ")).iterator();
}
});
Assert.assertEquals("Hello", words.first());
@@ -890,12 +890,12 @@ public class JavaAPISuite implements Serializable {
JavaPairRDD<String, String> pairsRDD = rdd.flatMapToPair(
new PairFlatMapFunction<String, String, String>() {
@Override
- public Iterable<Tuple2<String, String>> call(String s) {
+ public Iterator<Tuple2<String, String>> call(String s) {
List<Tuple2<String, String>> pairs = new LinkedList<>();
for (String word : s.split(" ")) {
pairs.add(new Tuple2<>(word, word));
}
- return pairs;
+ return pairs.iterator();
}
}
);
@@ -904,12 +904,12 @@ public class JavaAPISuite implements Serializable {
JavaDoubleRDD doubles = rdd.flatMapToDouble(new DoubleFlatMapFunction<String>() {
@Override
- public Iterable<Double> call(String s) {
+ public Iterator<Double> call(String s) {
List<Double> lengths = new LinkedList<>();
for (String word : s.split(" ")) {
lengths.add((double) word.length());
}
- return lengths;
+ return lengths.iterator();
}
});
Assert.assertEquals(5.0, doubles.first(), 0.01);
@@ -930,8 +930,8 @@ public class JavaAPISuite implements Serializable {
JavaPairRDD<String, Integer> swapped = pairRDD.flatMapToPair(
new PairFlatMapFunction<Tuple2<Integer, String>, String, Integer>() {
@Override
- public Iterable<Tuple2<String, Integer>> call(Tuple2<Integer, String> item) {
- return Collections.singletonList(item.swap());
+ public Iterator<Tuple2<String, Integer>> call(Tuple2<Integer, String> item) {
+ return Collections.singletonList(item.swap()).iterator();
}
});
swapped.collect();
@@ -951,12 +951,12 @@ public class JavaAPISuite implements Serializable {
JavaRDD<Integer> partitionSums = rdd.mapPartitions(
new FlatMapFunction<Iterator<Integer>, Integer>() {
@Override
- public Iterable<Integer> call(Iterator<Integer> iter) {
+ public Iterator<Integer> call(Iterator<Integer> iter) {
int sum = 0;
while (iter.hasNext()) {
sum += iter.next();
}
- return Collections.singletonList(sum);
+ return Collections.singletonList(sum).iterator();
}
});
Assert.assertEquals("[3, 7]", partitionSums.collect().toString());
@@ -1367,8 +1367,8 @@ public class JavaAPISuite implements Serializable {
FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer> sizesFn =
new FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer>() {
@Override
- public Iterable<Integer> call(Iterator<Integer> i, Iterator<String> s) {
- return Arrays.asList(Iterators.size(i), Iterators.size(s));
+ public Iterator<Integer> call(Iterator<Integer> i, Iterator<String> s) {
+ return Arrays.asList(Iterators.size(i), Iterators.size(s)).iterator();
}
};
http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/docs/streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index 93c34ef..7e681b6 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -165,8 +165,8 @@ space into words.
// Split each line into words
JavaDStream<String> words = lines.flatMap(
new FlatMapFunction<String, String>() {
- @Override public Iterable<String> call(String x) {
- return Arrays.asList(x.split(" "));
+ @Override public Iterator<String> call(String x) {
+ return Arrays.asList(x.split(" ")).iterator();
}
});
{% endhighlight %}
http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
index a5db8ac..635fb6a 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
@@ -17,7 +17,10 @@
package org.apache.spark.examples;
-
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Iterator;
+import java.util.regex.Pattern;
import scala.Tuple2;
@@ -32,11 +35,6 @@ import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Iterator;
-import java.util.regex.Pattern;
-
/**
* Computes the PageRank of URLs from an input file. Input file should
* be in format of:
@@ -108,13 +106,13 @@ public final class JavaPageRank {
JavaPairRDD<String, Double> contribs = links.join(ranks).values()
.flatMapToPair(new PairFlatMapFunction<Tuple2<Iterable<String>, Double>, String, Double>() {
@Override
- public Iterable<Tuple2<String, Double>> call(Tuple2<Iterable<String>, Double> s) {
+ public Iterator<Tuple2<String, Double>> call(Tuple2<Iterable<String>, Double> s) {
int urlCount = Iterables.size(s._1);
- List<Tuple2<String, Double>> results = new ArrayList<Tuple2<String, Double>>();
+ List<Tuple2<String, Double>> results = new ArrayList<>();
for (String n : s._1) {
- results.add(new Tuple2<String, Double>(n, s._2() / urlCount));
+ results.add(new Tuple2<>(n, s._2() / urlCount));
}
- return results;
+ return results.iterator();
}
});
http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
index 9a6a944..d746a3d 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
@@ -27,6 +27,7 @@ import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
import java.util.regex.Pattern;
@@ -46,8 +47,8 @@ public final class JavaWordCount {
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
- public Iterable<String> call(String s) {
- return Arrays.asList(SPACE.split(s));
+ public Iterator<String> call(String s) {
+ return Arrays.asList(SPACE.split(s)).iterator();
}
});
http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java
index 62e5633..cf77466 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java
@@ -18,6 +18,7 @@
package org.apache.spark.examples.streaming;
import java.util.Arrays;
+import java.util.Iterator;
import scala.Tuple2;
@@ -120,8 +121,8 @@ public class JavaActorWordCount {
// compute wordcount
lines.flatMap(new FlatMapFunction<String, String>() {
@Override
- public Iterable<String> call(String s) {
- return Arrays.asList(s.split("\\s+"));
+ public Iterator<String> call(String s) {
+ return Arrays.asList(s.split("\\s+")).iterator();
}
}).mapToPair(new PairFunction<String, String, Integer>() {
@Override
http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
index 4b50fbf..3d668ad 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
@@ -17,7 +17,6 @@
package org.apache.spark.examples.streaming;
-import com.google.common.collect.Lists;
import com.google.common.io.Closeables;
import org.apache.spark.SparkConf;
@@ -37,6 +36,8 @@ import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.ConnectException;
import java.net.Socket;
+import java.util.Arrays;
+import java.util.Iterator;
import java.util.regex.Pattern;
/**
@@ -74,8 +75,8 @@ public class JavaCustomReceiver extends Receiver<String> {
new JavaCustomReceiver(args[0], Integer.parseInt(args[1])));
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
- public Iterable<String> call(String x) {
- return Lists.newArrayList(SPACE.split(x));
+ public Iterator<String> call(String x) {
+ return Arrays.asList(SPACE.split(x)).iterator();
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
index f9a5e7f..5107500 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
@@ -20,11 +20,11 @@ package org.apache.spark.examples.streaming;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Arrays;
+import java.util.Iterator;
import java.util.regex.Pattern;
import scala.Tuple2;
-import com.google.common.collect.Lists;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
@@ -87,8 +87,8 @@ public final class JavaDirectKafkaWordCount {
});
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
- public Iterable<String> call(String x) {
- return Lists.newArrayList(SPACE.split(x));
+ public Iterator<String> call(String x) {
+ return Arrays.asList(SPACE.split(x)).iterator();
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
index 337f8ff..0df4cb4 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
@@ -17,20 +17,19 @@
package org.apache.spark.examples.streaming;
+import java.util.Arrays;
+import java.util.Iterator;
import java.util.Map;
import java.util.HashMap;
import java.util.regex.Pattern;
-
import scala.Tuple2;
-import com.google.common.collect.Lists;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.examples.streaming.StreamingExamples;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
@@ -88,8 +87,8 @@ public final class JavaKafkaWordCount {
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
- public Iterable<String> call(String x) {
- return Lists.newArrayList(SPACE.split(x));
+ public Iterator<String> call(String x) {
+ return Arrays.asList(SPACE.split(x)).iterator();
}
});
http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
index 3e9f0f4..b82b319 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
@@ -17,8 +17,11 @@
package org.apache.spark.examples.streaming;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.regex.Pattern;
+
import scala.Tuple2;
-import com.google.common.collect.Lists;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
@@ -31,8 +34,6 @@ import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import java.util.regex.Pattern;
-
/**
* Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
*
@@ -67,8 +68,8 @@ public final class JavaNetworkWordCount {
args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
- public Iterable<String> call(String x) {
- return Lists.newArrayList(SPACE.split(x));
+ public Iterator<String> call(String x) {
+ return Arrays.asList(SPACE.split(x)).iterator();
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
index bc963a0..bc8cbcd 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
@@ -21,11 +21,11 @@ import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
import java.util.regex.Pattern;
import scala.Tuple2;
-import com.google.common.collect.Lists;
import com.google.common.io.Files;
import org.apache.spark.Accumulator;
@@ -138,8 +138,8 @@ public final class JavaRecoverableNetworkWordCount {
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(ip, port);
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
- public Iterable<String> call(String x) {
- return Lists.newArrayList(SPACE.split(x));
+ public Iterator<String> call(String x) {
+ return Arrays.asList(SPACE.split(x)).iterator();
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
index 084f68a..f0228f5 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
@@ -17,10 +17,10 @@
package org.apache.spark.examples.streaming;
+import java.util.Arrays;
+import java.util.Iterator;
import java.util.regex.Pattern;
-import com.google.common.collect.Lists;
-
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
@@ -72,8 +72,8 @@ public final class JavaSqlNetworkWordCount {
args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
- public Iterable<String> call(String x) {
- return Lists.newArrayList(SPACE.split(x));
+ public Iterator<String> call(String x) {
+ return Arrays.asList(SPACE.split(x)).iterator();
}
});
http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
index f52cc7c..6beab90 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
@@ -18,6 +18,7 @@
package org.apache.spark.examples.streaming;
import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
import java.util.regex.Pattern;
@@ -73,8 +74,8 @@ public class JavaStatefulNetworkWordCount {
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
- public Iterable<String> call(String x) {
- return Arrays.asList(SPACE.split(x));
+ public Iterator<String> call(String x) {
+ return Arrays.asList(SPACE.split(x)).iterator();
}
});
http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java
index d869768..f0ae9a9 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java
@@ -34,6 +34,7 @@ import scala.Tuple2;
import twitter4j.Status;
import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
/**
@@ -70,8 +71,8 @@ public class JavaTwitterHashTagJoinSentiments {
JavaDStream<String> words = stream.flatMap(new FlatMapFunction<Status, String>() {
@Override
- public Iterable<String> call(Status s) {
- return Arrays.asList(s.getText().split(" "));
+ public Iterator<String> call(Status s) {
+ return Arrays.asList(s.getText().split(" ")).iterator();
}
});
http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
----------------------------------------------------------------------
diff --git a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
index 27d494c..c0b58e7 100644
--- a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
+++ b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
@@ -294,7 +294,7 @@ public class Java8APISuite implements Serializable {
sizeS += 1;
s.next();
}
- return Arrays.asList(sizeI, sizeS);
+ return Arrays.asList(sizeI, sizeS).iterator();
};
JavaRDD<Integer> sizes = rdd1.zipPartitions(rdd2, sizesFn);
Assert.assertEquals("[3, 2, 3, 2]", sizes.collect().toString());
http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java b/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
index 06e0ff2..64e044a 100644
--- a/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
+++ b/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
@@ -16,7 +16,10 @@
*/
package org.apache.spark.examples.streaming;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
import java.util.regex.Pattern;
@@ -38,7 +41,6 @@ import scala.Tuple2;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
-import com.google.common.collect.Lists;
/**
* Consumes messages from a Amazon Kinesis streams and does wordcount.
@@ -154,8 +156,9 @@ public final class JavaKinesisWordCountASL { // needs to be public for access fr
// Convert each line of Array[Byte] to String, and split into words
JavaDStream<String> words = unionStreams.flatMap(new FlatMapFunction<byte[], String>() {
@Override
- public Iterable<String> call(byte[] line) {
- return Lists.newArrayList(WORD_SEPARATOR.split(new String(line)));
+ public Iterator<String> call(byte[] line) {
+ String s = new String(line, StandardCharsets.UTF_8);
+ return Arrays.asList(WORD_SEPARATOR.split(s)).iterator();
}
});
http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 501456b..643bee6 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -61,6 +61,37 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.broadcast.HttpBroadcastFactory")
) ++
Seq(
+ // SPARK-3369 Fix Iterable/Iterator in Java API
+ ProblemFilters.exclude[IncompatibleResultTypeProblem](
+ "org.apache.spark.api.java.function.FlatMapFunction.call"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.function.FlatMapFunction.call"),
+ ProblemFilters.exclude[IncompatibleResultTypeProblem](
+ "org.apache.spark.api.java.function.DoubleFlatMapFunction.call"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.function.DoubleFlatMapFunction.call"),
+ ProblemFilters.exclude[IncompatibleResultTypeProblem](
+ "org.apache.spark.api.java.function.FlatMapFunction2.call"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.function.FlatMapFunction2.call"),
+ ProblemFilters.exclude[IncompatibleResultTypeProblem](
+ "org.apache.spark.api.java.function.PairFlatMapFunction.call"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.function.PairFlatMapFunction.call"),
+ ProblemFilters.exclude[IncompatibleResultTypeProblem](
+ "org.apache.spark.api.java.function.CoGroupFunction.call"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.function.CoGroupFunction.call"),
+ ProblemFilters.exclude[IncompatibleResultTypeProblem](
+ "org.apache.spark.api.java.function.MapPartitionsFunction.call"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.function.MapPartitionsFunction.call"),
+ ProblemFilters.exclude[IncompatibleResultTypeProblem](
+ "org.apache.spark.api.java.function.FlatMapGroupsFunction.call"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.function.FlatMapGroupsFunction.call")
+ ) ++
+ Seq(
// SPARK-4819 replace Guava Optional
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.api.java.JavaSparkContext.getCheckpointDir"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.api.java.JavaSparkContext.getSparkHome"),
http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index bd99c39..f182270 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -346,7 +346,7 @@ class Dataset[T] private[sql](
* @since 1.6.0
*/
def mapPartitions[U](f: MapPartitionsFunction[T, U], encoder: Encoder[U]): Dataset[U] = {
- val func: (Iterator[T]) => Iterator[U] = x => f.call(x.asJava).iterator.asScala
+ val func: (Iterator[T]) => Iterator[U] = x => f.call(x.asJava).asScala
mapPartitions(func)(encoder)
}
@@ -366,7 +366,7 @@ class Dataset[T] private[sql](
* @since 1.6.0
*/
def flatMap[U](f: FlatMapFunction[T, U], encoder: Encoder[U]): Dataset[U] = {
- val func: (T) => Iterable[U] = x => f.call(x).asScala
+ val func: (T) => Iterator[U] = x => f.call(x).asScala
flatMap(func)(encoder)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
index 3c0f25a..a6fb62c 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
@@ -111,24 +111,24 @@ public class JavaDatasetSuite implements Serializable {
Dataset<String> parMapped = ds.mapPartitions(new MapPartitionsFunction<String, String>() {
@Override
- public Iterable<String> call(Iterator<String> it) throws Exception {
- List<String> ls = new LinkedList<String>();
+ public Iterator<String> call(Iterator<String> it) {
+ List<String> ls = new LinkedList<>();
while (it.hasNext()) {
- ls.add(it.next().toUpperCase());
+ ls.add(it.next().toUpperCase(Locale.ENGLISH));
}
- return ls;
+ return ls.iterator();
}
}, Encoders.STRING());
Assert.assertEquals(Arrays.asList("HELLO", "WORLD"), parMapped.collectAsList());
Dataset<String> flatMapped = ds.flatMap(new FlatMapFunction<String, String>() {
@Override
- public Iterable<String> call(String s) throws Exception {
- List<String> ls = new LinkedList<String>();
+ public Iterator<String> call(String s) {
+ List<String> ls = new LinkedList<>();
for (char c : s.toCharArray()) {
ls.add(String.valueOf(c));
}
- return ls;
+ return ls.iterator();
}
}, Encoders.STRING());
Assert.assertEquals(
@@ -192,12 +192,12 @@ public class JavaDatasetSuite implements Serializable {
Dataset<String> flatMapped = grouped.flatMapGroups(
new FlatMapGroupsFunction<Integer, String, String>() {
@Override
- public Iterable<String> call(Integer key, Iterator<String> values) throws Exception {
+ public Iterator<String> call(Integer key, Iterator<String> values) {
StringBuilder sb = new StringBuilder(key.toString());
while (values.hasNext()) {
sb.append(values.next());
}
- return Collections.singletonList(sb.toString());
+ return Collections.singletonList(sb.toString()).iterator();
}
},
Encoders.STRING());
@@ -228,10 +228,7 @@ public class JavaDatasetSuite implements Serializable {
grouped2,
new CoGroupFunction<Integer, String, Integer, String>() {
@Override
- public Iterable<String> call(
- Integer key,
- Iterator<String> left,
- Iterator<Integer> right) throws Exception {
+ public Iterator<String> call(Integer key, Iterator<String> left, Iterator<Integer> right) {
StringBuilder sb = new StringBuilder(key.toString());
while (left.hasNext()) {
sb.append(left.next());
@@ -240,7 +237,7 @@ public class JavaDatasetSuite implements Serializable {
while (right.hasNext()) {
sb.append(right.next());
}
- return Collections.singletonList(sb.toString());
+ return Collections.singletonList(sb.toString()).iterator();
}
},
Encoders.STRING());
http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index a791a47..f10de48 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -166,8 +166,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* and then flattening the results
*/
def flatMap[U](f: FlatMapFunction[T, U]): JavaDStream[U] = {
- import scala.collection.JavaConverters._
- def fn: (T) => Iterable[U] = (x: T) => f.call(x).asScala
+ def fn: (T) => Iterator[U] = (x: T) => f.call(x).asScala
new JavaDStream(dstream.flatMap(fn)(fakeClassTag[U]))(fakeClassTag[U])
}
@@ -176,8 +175,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* and then flattening the results
*/
def flatMapToPair[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
- import scala.collection.JavaConverters._
- def fn: (T) => Iterable[(K2, V2)] = (x: T) => f.call(x).asScala
+ def fn: (T) => Iterator[(K2, V2)] = (x: T) => f.call(x).asScala
def cm: ClassTag[(K2, V2)] = fakeClassTag
new JavaPairDStream(dstream.flatMap(fn)(cm))(fakeClassTag[K2], fakeClassTag[V2])
}
@@ -189,7 +187,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
*/
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaDStream[U] = {
def fn: (Iterator[T]) => Iterator[U] = {
- (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
+ (x: Iterator[T]) => f.call(x.asJava).asScala
}
new JavaDStream(dstream.mapPartitions(fn)(fakeClassTag[U]))(fakeClassTag[U])
}
@@ -202,7 +200,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2])
: JavaPairDStream[K2, V2] = {
def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
- (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
+ (x: Iterator[T]) => f.call(x.asJava).asScala
}
new JavaPairDStream(dstream.mapPartitions(fn))(fakeClassTag[K2], fakeClassTag[V2])
}
http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 1dfb4e7..db79eea 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -550,7 +550,7 @@ abstract class DStream[T: ClassTag] (
* Return a new DStream by applying a function to all elements of this DStream,
* and then flattening the results
*/
- def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = ssc.withScope {
+ def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U] = ssc.withScope {
new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
}
http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
index 96a444a..d60a617 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
@@ -25,7 +25,7 @@ import org.apache.spark.streaming.{Duration, Time}
private[streaming]
class FlatMappedDStream[T: ClassTag, U: ClassTag](
parent: DStream[T],
- flatMapFunc: T => Traversable[U]
+ flatMapFunc: T => TraversableOnce[U]
) extends DStream[U](parent.ssc) {
override def dependencies: List[DStream[_]] = List(parent)
http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 4dbcef2..806cea2 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -271,12 +271,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<String> mapped = stream.mapPartitions(
new FlatMapFunction<Iterator<String>, String>() {
@Override
- public Iterable<String> call(Iterator<String> in) {
+ public Iterator<String> call(Iterator<String> in) {
StringBuilder out = new StringBuilder();
while (in.hasNext()) {
out.append(in.next().toUpperCase(Locale.ENGLISH));
}
- return Arrays.asList(out.toString());
+ return Arrays.asList(out.toString()).iterator();
}
});
JavaTestUtils.attachTestOutputStream(mapped);
@@ -759,8 +759,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream<String> flatMapped = stream.flatMap(new FlatMapFunction<String, String>() {
@Override
- public Iterable<String> call(String x) {
- return Arrays.asList(x.split("(?!^)"));
+ public Iterator<String> call(String x) {
+ return Arrays.asList(x.split("(?!^)")).iterator();
}
});
JavaTestUtils.attachTestOutputStream(flatMapped);
@@ -846,12 +846,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(
new PairFlatMapFunction<String, Integer, String>() {
@Override
- public Iterable<Tuple2<Integer, String>> call(String in) {
+ public Iterator<Tuple2<Integer, String>> call(String in) {
List<Tuple2<Integer, String>> out = new ArrayList<>();
for (String letter: in.split("(?!^)")) {
out.add(new Tuple2<>(in.length(), letter));
}
- return out;
+ return out.iterator();
}
});
JavaTestUtils.attachTestOutputStream(flatMapped);
@@ -1019,13 +1019,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair(
new PairFlatMapFunction<Iterator<Tuple2<String, Integer>>, Integer, String>() {
@Override
- public Iterable<Tuple2<Integer, String>> call(Iterator<Tuple2<String, Integer>> in) {
+ public Iterator<Tuple2<Integer, String>> call(Iterator<Tuple2<String, Integer>> in) {
List<Tuple2<Integer, String>> out = new LinkedList<>();
while (in.hasNext()) {
Tuple2<String, Integer> next = in.next();
out.add(next.swap());
}
- return out;
+ return out.iterator();
}
});
@@ -1089,12 +1089,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair(
new PairFlatMapFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
- public Iterable<Tuple2<Integer, String>> call(Tuple2<String, Integer> in) {
+ public Iterator<Tuple2<Integer, String>> call(Tuple2<String, Integer> in) {
List<Tuple2<Integer, String>> out = new LinkedList<>();
for (Character s : in._1().toCharArray()) {
out.add(new Tuple2<>(in._2(), s.toString()));
}
- return out;
+ return out.iterator();
}
});
JavaTestUtils.attachTestOutputStream(flatMapped);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org