You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/01/26 12:55:35 UTC

spark git commit: [SPARK-3369][CORE][STREAMING] Java mapPartitions Iterator->Iterable is inconsistent with Scala's Iterator->Iterator

Repository: spark
Updated Branches:
  refs/heads/master 5936bf9fa -> 649e9d0f5


[SPARK-3369][CORE][STREAMING] Java mapPartitions Iterator->Iterable is inconsistent with Scala's Iterator->Iterator

Fix Java function API methods for flatMap and mapPartitions to require producing only an Iterator, not Iterable. Also fix DStream.flatMap to require a function producing TraversableOnce only, not Traversable.

CC rxin pwendell for API change; tdas since it also touches streaming.

Author: Sean Owen <so...@cloudera.com>

Closes #10413 from srowen/SPARK-3369.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/649e9d0f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/649e9d0f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/649e9d0f

Branch: refs/heads/master
Commit: 649e9d0f5b2d5fc13f2dd5be675331510525927f
Parents: 5936bf9
Author: Sean Owen <so...@cloudera.com>
Authored: Tue Jan 26 11:55:28 2016 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Tue Jan 26 11:55:28 2016 +0000

----------------------------------------------------------------------
 .../api/java/function/CoGroupFunction.java      |  2 +-
 .../java/function/DoubleFlatMapFunction.java    |  3 +-
 .../api/java/function/FlatMapFunction.java      |  3 +-
 .../api/java/function/FlatMapFunction2.java     |  3 +-
 .../java/function/FlatMapGroupsFunction.java    |  2 +-
 .../java/function/MapPartitionsFunction.java    |  2 +-
 .../api/java/function/PairFlatMapFunction.java  |  3 +-
 .../org/apache/spark/api/java/JavaRDDLike.scala | 20 ++++++-------
 .../java/org/apache/spark/JavaAPISuite.java     | 24 +++++++--------
 docs/streaming-programming-guide.md             |  4 +--
 .../org/apache/spark/examples/JavaPageRank.java | 18 +++++-------
 .../apache/spark/examples/JavaWordCount.java    |  5 ++--
 .../examples/streaming/JavaActorWordCount.java  |  5 ++--
 .../examples/streaming/JavaCustomReceiver.java  |  7 +++--
 .../streaming/JavaDirectKafkaWordCount.java     |  6 ++--
 .../examples/streaming/JavaKafkaWordCount.java  |  9 +++---
 .../streaming/JavaNetworkWordCount.java         | 11 +++----
 .../JavaRecoverableNetworkWordCount.java        |  6 ++--
 .../streaming/JavaSqlNetworkWordCount.java      |  8 ++---
 .../streaming/JavaStatefulNetworkWordCount.java |  5 ++--
 .../JavaTwitterHashTagJoinSentiments.java       |  5 ++--
 .../java/org/apache/spark/Java8APISuite.java    |  2 +-
 .../streaming/JavaKinesisWordCountASL.java      |  9 ++++--
 project/MimaExcludes.scala                      | 31 ++++++++++++++++++++
 .../scala/org/apache/spark/sql/Dataset.scala    |  4 +--
 .../org/apache/spark/sql/JavaDatasetSuite.java  | 25 +++++++---------
 .../streaming/api/java/JavaDStreamLike.scala    | 10 +++----
 .../spark/streaming/dstream/DStream.scala       |  2 +-
 .../streaming/dstream/FlatMappedDStream.scala   |  2 +-
 .../apache/spark/streaming/JavaAPISuite.java    | 20 ++++++-------
 30 files changed, 146 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java b/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java
index 279639a..07aebb7 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java
@@ -25,5 +25,5 @@ import java.util.Iterator;
  * Datasets.
  */
 public interface CoGroupFunction<K, V1, V2, R> extends Serializable {
-  Iterable<R> call(K key, Iterator<V1> left, Iterator<V2> right) throws Exception;
+  Iterator<R> call(K key, Iterator<V1> left, Iterator<V2> right) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
index 57fd0a7..576087b 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
@@ -18,10 +18,11 @@
 package org.apache.spark.api.java.function;
 
 import java.io.Serializable;
+import java.util.Iterator;
 
 /**
  * A function that returns zero or more records of type Double from each input record.
  */
 public interface DoubleFlatMapFunction<T> extends Serializable {
-  public Iterable<Double> call(T t) throws Exception;
+  Iterator<Double> call(T t) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
index ef0d182..2d8ea6d 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
@@ -18,10 +18,11 @@
 package org.apache.spark.api.java.function;
 
 import java.io.Serializable;
+import java.util.Iterator;
 
 /**
  * A function that returns zero or more output records from each input record.
  */
 public interface FlatMapFunction<T, R> extends Serializable {
-  Iterable<R> call(T t) throws Exception;
+  Iterator<R> call(T t) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
index 14a98a3..fc97b63 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
@@ -18,10 +18,11 @@
 package org.apache.spark.api.java.function;
 
 import java.io.Serializable;
+import java.util.Iterator;
 
 /**
  * A function that takes two inputs and returns zero or more output records.
  */
 public interface FlatMapFunction2<T1, T2, R> extends Serializable {
-  Iterable<R> call(T1 t1, T2 t2) throws Exception;
+  Iterator<R> call(T1 t1, T2 t2) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java
index d7a80e7..bae574a 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java
@@ -24,5 +24,5 @@ import java.util.Iterator;
  * A function that returns zero or more output records from each grouping key and its values.
  */
 public interface FlatMapGroupsFunction<K, V, R> extends Serializable {
-  Iterable<R> call(K key, Iterator<V> values) throws Exception;
+  Iterator<R> call(K key, Iterator<V> values) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java b/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java
index 6cb569c..cf9945a 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java
@@ -24,5 +24,5 @@ import java.util.Iterator;
  * Base interface for function used in Dataset's mapPartitions.
  */
 public interface MapPartitionsFunction<T, U> extends Serializable {
-  Iterable<U> call(Iterator<T> input) throws Exception;
+  Iterator<U> call(Iterator<T> input) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
index 691ef2e..51eed2e 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
@@ -18,6 +18,7 @@
 package org.apache.spark.api.java.function;
 
 import java.io.Serializable;
+import java.util.Iterator;
 
 import scala.Tuple2;
 
@@ -26,5 +27,5 @@ import scala.Tuple2;
  * key-value pairs are represented as scala.Tuple2 objects.
  */
 public interface PairFlatMapFunction<T, K, V> extends Serializable {
-  public Iterable<Tuple2<K, V>> call(T t) throws Exception;
+  Iterator<Tuple2<K, V>> call(T t) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 0f8d13c..7340def 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -121,7 +121,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    *  RDD, and then flattening the results.
    */
   def flatMap[U](f: FlatMapFunction[T, U]): JavaRDD[U] = {
-    def fn: (T) => Iterable[U] = (x: T) => f.call(x).asScala
+    def fn: (T) => Iterator[U] = (x: T) => f.call(x).asScala
     JavaRDD.fromRDD(rdd.flatMap(fn)(fakeClassTag[U]))(fakeClassTag[U])
   }
 
@@ -130,7 +130,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    *  RDD, and then flattening the results.
    */
   def flatMapToDouble(f: DoubleFlatMapFunction[T]): JavaDoubleRDD = {
-    def fn: (T) => Iterable[jl.Double] = (x: T) => f.call(x).asScala
+    def fn: (T) => Iterator[jl.Double] = (x: T) => f.call(x).asScala
     new JavaDoubleRDD(rdd.flatMap(fn).map((x: jl.Double) => x.doubleValue()))
   }
 
@@ -139,7 +139,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    *  RDD, and then flattening the results.
    */
   def flatMapToPair[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
-    def fn: (T) => Iterable[(K2, V2)] = (x: T) => f.call(x).asScala
+    def fn: (T) => Iterator[(K2, V2)] = (x: T) => f.call(x).asScala
     def cm: ClassTag[(K2, V2)] = implicitly[ClassTag[(K2, V2)]]
     JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(fakeClassTag[K2], fakeClassTag[V2])
   }
@@ -149,7 +149,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    */
   def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaRDD[U] = {
     def fn: (Iterator[T]) => Iterator[U] = {
-      (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
+      (x: Iterator[T]) => f.call(x.asJava).asScala
     }
     JavaRDD.fromRDD(rdd.mapPartitions(fn)(fakeClassTag[U]))(fakeClassTag[U])
   }
@@ -160,7 +160,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
   def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U],
       preservesPartitioning: Boolean): JavaRDD[U] = {
     def fn: (Iterator[T]) => Iterator[U] = {
-      (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
+      (x: Iterator[T]) => f.call(x.asJava).asScala
     }
     JavaRDD.fromRDD(
       rdd.mapPartitions(fn, preservesPartitioning)(fakeClassTag[U]))(fakeClassTag[U])
@@ -171,7 +171,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    */
   def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = {
     def fn: (Iterator[T]) => Iterator[jl.Double] = {
-      (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
+      (x: Iterator[T]) => f.call(x.asJava).asScala
     }
     new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: jl.Double) => x.doubleValue()))
   }
@@ -182,7 +182,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
   def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]):
   JavaPairRDD[K2, V2] = {
     def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
-      (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
+      (x: Iterator[T]) => f.call(x.asJava).asScala
     }
     JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(fakeClassTag[K2], fakeClassTag[V2])
   }
@@ -193,7 +193,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
   def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]],
       preservesPartitioning: Boolean): JavaDoubleRDD = {
     def fn: (Iterator[T]) => Iterator[jl.Double] = {
-      (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
+      (x: Iterator[T]) => f.call(x.asJava).asScala
     }
     new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning)
       .map(x => x.doubleValue()))
@@ -205,7 +205,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
   def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2],
       preservesPartitioning: Boolean): JavaPairRDD[K2, V2] = {
     def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
-      (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
+      (x: Iterator[T]) => f.call(x.asJava).asScala
     }
     JavaPairRDD.fromRDD(
       rdd.mapPartitions(fn, preservesPartitioning))(fakeClassTag[K2], fakeClassTag[V2])
@@ -290,7 +290,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
       other: JavaRDDLike[U, _],
       f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V]): JavaRDD[V] = {
     def fn: (Iterator[T], Iterator[U]) => Iterator[V] = {
-      (x: Iterator[T], y: Iterator[U]) => f.call(x.asJava, y.asJava).iterator().asScala
+      (x: Iterator[T], y: Iterator[U]) => f.call(x.asJava, y.asJava).asScala
     }
     JavaRDD.fromRDD(
       rdd.zipPartitions(other.rdd)(fn)(other.classTag, fakeClassTag[V]))(fakeClassTag[V])

http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/core/src/test/java/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index 44d5cac..8117ad9 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -880,8 +880,8 @@ public class JavaAPISuite implements Serializable {
       "The quick brown fox jumps over the lazy dog."));
     JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String, String>() {
       @Override
-      public Iterable<String> call(String x) {
-        return Arrays.asList(x.split(" "));
+      public Iterator<String> call(String x) {
+        return Arrays.asList(x.split(" ")).iterator();
       }
     });
     Assert.assertEquals("Hello", words.first());
@@ -890,12 +890,12 @@ public class JavaAPISuite implements Serializable {
     JavaPairRDD<String, String> pairsRDD = rdd.flatMapToPair(
       new PairFlatMapFunction<String, String, String>() {
         @Override
-        public Iterable<Tuple2<String, String>> call(String s) {
+        public Iterator<Tuple2<String, String>> call(String s) {
           List<Tuple2<String, String>> pairs = new LinkedList<>();
           for (String word : s.split(" ")) {
             pairs.add(new Tuple2<>(word, word));
           }
-          return pairs;
+          return pairs.iterator();
         }
       }
     );
@@ -904,12 +904,12 @@ public class JavaAPISuite implements Serializable {
 
     JavaDoubleRDD doubles = rdd.flatMapToDouble(new DoubleFlatMapFunction<String>() {
       @Override
-      public Iterable<Double> call(String s) {
+      public Iterator<Double> call(String s) {
         List<Double> lengths = new LinkedList<>();
         for (String word : s.split(" ")) {
           lengths.add((double) word.length());
         }
-        return lengths;
+        return lengths.iterator();
       }
     });
     Assert.assertEquals(5.0, doubles.first(), 0.01);
@@ -930,8 +930,8 @@ public class JavaAPISuite implements Serializable {
     JavaPairRDD<String, Integer> swapped = pairRDD.flatMapToPair(
       new PairFlatMapFunction<Tuple2<Integer, String>, String, Integer>() {
         @Override
-        public Iterable<Tuple2<String, Integer>> call(Tuple2<Integer, String> item) {
-          return Collections.singletonList(item.swap());
+        public Iterator<Tuple2<String, Integer>> call(Tuple2<Integer, String> item) {
+          return Collections.singletonList(item.swap()).iterator();
         }
       });
     swapped.collect();
@@ -951,12 +951,12 @@ public class JavaAPISuite implements Serializable {
     JavaRDD<Integer> partitionSums = rdd.mapPartitions(
       new FlatMapFunction<Iterator<Integer>, Integer>() {
         @Override
-        public Iterable<Integer> call(Iterator<Integer> iter) {
+        public Iterator<Integer> call(Iterator<Integer> iter) {
           int sum = 0;
           while (iter.hasNext()) {
             sum += iter.next();
           }
-          return Collections.singletonList(sum);
+          return Collections.singletonList(sum).iterator();
         }
     });
     Assert.assertEquals("[3, 7]", partitionSums.collect().toString());
@@ -1367,8 +1367,8 @@ public class JavaAPISuite implements Serializable {
     FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer> sizesFn =
       new FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer>() {
         @Override
-        public Iterable<Integer> call(Iterator<Integer> i, Iterator<String> s) {
-          return Arrays.asList(Iterators.size(i), Iterators.size(s));
+        public Iterator<Integer> call(Iterator<Integer> i, Iterator<String> s) {
+          return Arrays.asList(Iterators.size(i), Iterators.size(s)).iterator();
         }
       };
 

http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/docs/streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index 93c34ef..7e681b6 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -165,8 +165,8 @@ space into words.
 // Split each line into words
 JavaDStream<String> words = lines.flatMap(
   new FlatMapFunction<String, String>() {
-    @Override public Iterable<String> call(String x) {
-      return Arrays.asList(x.split(" "));
+    @Override public Iterator<String> call(String x) {
+      return Arrays.asList(x.split(" ")).iterator();
     }
   });
 {% endhighlight %}

http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
index a5db8ac..635fb6a 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
@@ -17,7 +17,10 @@
 
 package org.apache.spark.examples;
 
-
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Iterator;
+import java.util.regex.Pattern;
 
 import scala.Tuple2;
 
@@ -32,11 +35,6 @@ import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.api.java.function.PairFunction;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Iterator;
-import java.util.regex.Pattern;
-
 /**
  * Computes the PageRank of URLs from an input file. Input file should
  * be in format of:
@@ -108,13 +106,13 @@ public final class JavaPageRank {
       JavaPairRDD<String, Double> contribs = links.join(ranks).values()
         .flatMapToPair(new PairFlatMapFunction<Tuple2<Iterable<String>, Double>, String, Double>() {
           @Override
-          public Iterable<Tuple2<String, Double>> call(Tuple2<Iterable<String>, Double> s) {
+          public Iterator<Tuple2<String, Double>> call(Tuple2<Iterable<String>, Double> s) {
             int urlCount = Iterables.size(s._1);
-            List<Tuple2<String, Double>> results = new ArrayList<Tuple2<String, Double>>();
+            List<Tuple2<String, Double>> results = new ArrayList<>();
             for (String n : s._1) {
-              results.add(new Tuple2<String, Double>(n, s._2() / urlCount));
+              results.add(new Tuple2<>(n, s._2() / urlCount));
             }
-            return results;
+            return results.iterator();
           }
       });
 

http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
index 9a6a944..d746a3d 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
@@ -27,6 +27,7 @@ import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.PairFunction;
 
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
 import java.util.regex.Pattern;
 
@@ -46,8 +47,8 @@ public final class JavaWordCount {
 
     JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
       @Override
-      public Iterable<String> call(String s) {
-        return Arrays.asList(SPACE.split(s));
+      public Iterator<String> call(String s) {
+        return Arrays.asList(SPACE.split(s)).iterator();
       }
     });
 

http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java
index 62e5633..cf77466 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java
@@ -18,6 +18,7 @@
 package org.apache.spark.examples.streaming;
 
 import java.util.Arrays;
+import java.util.Iterator;
 
 import scala.Tuple2;
 
@@ -120,8 +121,8 @@ public class JavaActorWordCount {
     // compute wordcount
     lines.flatMap(new FlatMapFunction<String, String>() {
       @Override
-      public Iterable<String> call(String s) {
-        return Arrays.asList(s.split("\\s+"));
+      public Iterator<String> call(String s) {
+        return Arrays.asList(s.split("\\s+")).iterator();
       }
     }).mapToPair(new PairFunction<String, String, Integer>() {
       @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
index 4b50fbf..3d668ad 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
@@ -17,7 +17,6 @@
 
 package org.apache.spark.examples.streaming;
 
-import com.google.common.collect.Lists;
 import com.google.common.io.Closeables;
 
 import org.apache.spark.SparkConf;
@@ -37,6 +36,8 @@ import java.io.BufferedReader;
 import java.io.InputStreamReader;
 import java.net.ConnectException;
 import java.net.Socket;
+import java.util.Arrays;
+import java.util.Iterator;
 import java.util.regex.Pattern;
 
 /**
@@ -74,8 +75,8 @@ public class JavaCustomReceiver extends Receiver<String> {
       new JavaCustomReceiver(args[0], Integer.parseInt(args[1])));
     JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
       @Override
-      public Iterable<String> call(String x) {
-        return Lists.newArrayList(SPACE.split(x));
+      public Iterator<String> call(String x) {
+        return Arrays.asList(SPACE.split(x)).iterator();
       }
     });
     JavaPairDStream<String, Integer> wordCounts = words.mapToPair(

http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
index f9a5e7f..5107500 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
@@ -20,11 +20,11 @@ package org.apache.spark.examples.streaming;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.regex.Pattern;
 
 import scala.Tuple2;
 
-import com.google.common.collect.Lists;
 import kafka.serializer.StringDecoder;
 
 import org.apache.spark.SparkConf;
@@ -87,8 +87,8 @@ public final class JavaDirectKafkaWordCount {
     });
     JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
       @Override
-      public Iterable<String> call(String x) {
-        return Lists.newArrayList(SPACE.split(x));
+      public Iterator<String> call(String x) {
+        return Arrays.asList(SPACE.split(x)).iterator();
       }
     });
     JavaPairDStream<String, Integer> wordCounts = words.mapToPair(

http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
index 337f8ff..0df4cb4 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
@@ -17,20 +17,19 @@
 
 package org.apache.spark.examples.streaming;
 
+import java.util.Arrays;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.HashMap;
 import java.util.regex.Pattern;
 
-
 import scala.Tuple2;
 
-import com.google.common.collect.Lists;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.examples.streaming.StreamingExamples;
 import org.apache.spark.streaming.Duration;
 import org.apache.spark.streaming.api.java.JavaDStream;
 import org.apache.spark.streaming.api.java.JavaPairDStream;
@@ -88,8 +87,8 @@ public final class JavaKafkaWordCount {
 
     JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
       @Override
-      public Iterable<String> call(String x) {
-        return Lists.newArrayList(SPACE.split(x));
+      public Iterator<String> call(String x) {
+        return Arrays.asList(SPACE.split(x)).iterator();
       }
     });
 

http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
index 3e9f0f4..b82b319 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
@@ -17,8 +17,11 @@
 
 package org.apache.spark.examples.streaming;
 
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.regex.Pattern;
+
 import scala.Tuple2;
-import com.google.common.collect.Lists;
 
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.function.FlatMapFunction;
@@ -31,8 +34,6 @@ import org.apache.spark.streaming.api.java.JavaPairDStream;
 import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
 
-import java.util.regex.Pattern;
-
 /**
  * Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
  *
@@ -67,8 +68,8 @@ public final class JavaNetworkWordCount {
             args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
     JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
       @Override
-      public Iterable<String> call(String x) {
-        return Lists.newArrayList(SPACE.split(x));
+      public Iterator<String> call(String x) {
+        return Arrays.asList(SPACE.split(x)).iterator();
       }
     });
     JavaPairDStream<String, Integer> wordCounts = words.mapToPair(

http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
index bc963a0..bc8cbcd 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
@@ -21,11 +21,11 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
 import java.util.regex.Pattern;
 
 import scala.Tuple2;
-import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 
 import org.apache.spark.Accumulator;
@@ -138,8 +138,8 @@ public final class JavaRecoverableNetworkWordCount {
     JavaReceiverInputDStream<String> lines = ssc.socketTextStream(ip, port);
     JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
       @Override
-      public Iterable<String> call(String x) {
-        return Lists.newArrayList(SPACE.split(x));
+      public Iterator<String> call(String x) {
+        return Arrays.asList(SPACE.split(x)).iterator();
       }
     });
     JavaPairDStream<String, Integer> wordCounts = words.mapToPair(

http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
index 084f68a..f0228f5 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
@@ -17,10 +17,10 @@
 
 package org.apache.spark.examples.streaming;
 
+import java.util.Arrays;
+import java.util.Iterator;
 import java.util.regex.Pattern;
 
-import com.google.common.collect.Lists;
-
 import org.apache.spark.SparkConf;
 import org.apache.spark.SparkContext;
 import org.apache.spark.api.java.JavaRDD;
@@ -72,8 +72,8 @@ public final class JavaSqlNetworkWordCount {
         args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
     JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
       @Override
-      public Iterable<String> call(String x) {
-        return Lists.newArrayList(SPACE.split(x));
+      public Iterator<String> call(String x) {
+        return Arrays.asList(SPACE.split(x)).iterator();
       }
     });
 

http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
index f52cc7c..6beab90 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
@@ -18,6 +18,7 @@
 package org.apache.spark.examples.streaming;
 
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
 import java.util.regex.Pattern;
 
@@ -73,8 +74,8 @@ public class JavaStatefulNetworkWordCount {
 
     JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
       @Override
-      public Iterable<String> call(String x) {
-        return Arrays.asList(SPACE.split(x));
+      public Iterator<String> call(String x) {
+        return Arrays.asList(SPACE.split(x)).iterator();
       }
     });
 

http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java
index d869768..f0ae9a9 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java
@@ -34,6 +34,7 @@ import scala.Tuple2;
 import twitter4j.Status;
 
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
 
 /**
@@ -70,8 +71,8 @@ public class JavaTwitterHashTagJoinSentiments {
 
     JavaDStream<String> words = stream.flatMap(new FlatMapFunction<Status, String>() {
       @Override
-      public Iterable<String> call(Status s) {
-        return Arrays.asList(s.getText().split(" "));
+      public Iterator<String> call(Status s) {
+        return Arrays.asList(s.getText().split(" ")).iterator();
       }
     });
 

http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
----------------------------------------------------------------------
diff --git a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
index 27d494c..c0b58e7 100644
--- a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
+++ b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
@@ -294,7 +294,7 @@ public class Java8APISuite implements Serializable {
           sizeS += 1;
           s.next();
         }
-        return Arrays.asList(sizeI, sizeS);
+        return Arrays.asList(sizeI, sizeS).iterator();
       };
     JavaRDD<Integer> sizes = rdd1.zipPartitions(rdd2, sizesFn);
     Assert.assertEquals("[3, 2, 3, 2]", sizes.collect().toString());

http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java b/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
index 06e0ff2..64e044a 100644
--- a/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
+++ b/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
@@ -16,7 +16,10 @@
  */
 package org.apache.spark.examples.streaming;
 
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
 import java.util.regex.Pattern;
 
@@ -38,7 +41,6 @@ import scala.Tuple2;
 import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
 import com.amazonaws.services.kinesis.AmazonKinesisClient;
 import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
-import com.google.common.collect.Lists;
 
 /**
  * Consumes messages from a Amazon Kinesis streams and does wordcount.
@@ -154,8 +156,9 @@ public final class JavaKinesisWordCountASL { // needs to be public for access fr
     // Convert each line of Array[Byte] to String, and split into words
     JavaDStream<String> words = unionStreams.flatMap(new FlatMapFunction<byte[], String>() {
       @Override
-      public Iterable<String> call(byte[] line) {
-        return Lists.newArrayList(WORD_SEPARATOR.split(new String(line)));
+      public Iterator<String> call(byte[] line) {
+        String s = new String(line, StandardCharsets.UTF_8);
+        return Arrays.asList(WORD_SEPARATOR.split(s)).iterator();
       }
     });
 

http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 501456b..643bee6 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -61,6 +61,37 @@ object MimaExcludes {
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.broadcast.HttpBroadcastFactory")
       ) ++
       Seq(
+        // SPARK-3369 Fix Iterable/Iterator in Java API
+        ProblemFilters.exclude[IncompatibleResultTypeProblem](
+          "org.apache.spark.api.java.function.FlatMapFunction.call"),
+        ProblemFilters.exclude[MissingMethodProblem](
+          "org.apache.spark.api.java.function.FlatMapFunction.call"),
+        ProblemFilters.exclude[IncompatibleResultTypeProblem](
+          "org.apache.spark.api.java.function.DoubleFlatMapFunction.call"),
+        ProblemFilters.exclude[MissingMethodProblem](
+          "org.apache.spark.api.java.function.DoubleFlatMapFunction.call"),
+        ProblemFilters.exclude[IncompatibleResultTypeProblem](
+          "org.apache.spark.api.java.function.FlatMapFunction2.call"),
+        ProblemFilters.exclude[MissingMethodProblem](
+          "org.apache.spark.api.java.function.FlatMapFunction2.call"),
+        ProblemFilters.exclude[IncompatibleResultTypeProblem](
+          "org.apache.spark.api.java.function.PairFlatMapFunction.call"),
+        ProblemFilters.exclude[MissingMethodProblem](
+          "org.apache.spark.api.java.function.PairFlatMapFunction.call"),
+        ProblemFilters.exclude[IncompatibleResultTypeProblem](
+          "org.apache.spark.api.java.function.CoGroupFunction.call"),
+        ProblemFilters.exclude[MissingMethodProblem](
+          "org.apache.spark.api.java.function.CoGroupFunction.call"),
+        ProblemFilters.exclude[IncompatibleResultTypeProblem](
+          "org.apache.spark.api.java.function.MapPartitionsFunction.call"),
+        ProblemFilters.exclude[MissingMethodProblem](
+          "org.apache.spark.api.java.function.MapPartitionsFunction.call"),
+        ProblemFilters.exclude[IncompatibleResultTypeProblem](
+          "org.apache.spark.api.java.function.FlatMapGroupsFunction.call"),
+        ProblemFilters.exclude[MissingMethodProblem](
+          "org.apache.spark.api.java.function.FlatMapGroupsFunction.call")
+      ) ++
+      Seq(
         // SPARK-4819 replace Guava Optional
         ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.api.java.JavaSparkContext.getCheckpointDir"),
         ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.api.java.JavaSparkContext.getSparkHome"),

http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index bd99c39..f182270 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -346,7 +346,7 @@ class Dataset[T] private[sql](
    * @since 1.6.0
    */
   def mapPartitions[U](f: MapPartitionsFunction[T, U], encoder: Encoder[U]): Dataset[U] = {
-    val func: (Iterator[T]) => Iterator[U] = x => f.call(x.asJava).iterator.asScala
+    val func: (Iterator[T]) => Iterator[U] = x => f.call(x.asJava).asScala
     mapPartitions(func)(encoder)
   }
 
@@ -366,7 +366,7 @@ class Dataset[T] private[sql](
    * @since 1.6.0
    */
   def flatMap[U](f: FlatMapFunction[T, U], encoder: Encoder[U]): Dataset[U] = {
-    val func: (T) => Iterable[U] = x => f.call(x).asScala
+    val func: (T) => Iterator[U] = x => f.call(x).asScala
     flatMap(func)(encoder)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
index 3c0f25a..a6fb62c 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
@@ -111,24 +111,24 @@ public class JavaDatasetSuite implements Serializable {
 
     Dataset<String> parMapped = ds.mapPartitions(new MapPartitionsFunction<String, String>() {
       @Override
-      public Iterable<String> call(Iterator<String> it) throws Exception {
-        List<String> ls = new LinkedList<String>();
+      public Iterator<String> call(Iterator<String> it) {
+        List<String> ls = new LinkedList<>();
         while (it.hasNext()) {
-          ls.add(it.next().toUpperCase());
+          ls.add(it.next().toUpperCase(Locale.ENGLISH));
         }
-        return ls;
+        return ls.iterator();
       }
     }, Encoders.STRING());
     Assert.assertEquals(Arrays.asList("HELLO", "WORLD"), parMapped.collectAsList());
 
     Dataset<String> flatMapped = ds.flatMap(new FlatMapFunction<String, String>() {
       @Override
-      public Iterable<String> call(String s) throws Exception {
-        List<String> ls = new LinkedList<String>();
+      public Iterator<String> call(String s) {
+        List<String> ls = new LinkedList<>();
         for (char c : s.toCharArray()) {
           ls.add(String.valueOf(c));
         }
-        return ls;
+        return ls.iterator();
       }
     }, Encoders.STRING());
     Assert.assertEquals(
@@ -192,12 +192,12 @@ public class JavaDatasetSuite implements Serializable {
     Dataset<String> flatMapped = grouped.flatMapGroups(
       new FlatMapGroupsFunction<Integer, String, String>() {
         @Override
-        public Iterable<String> call(Integer key, Iterator<String> values) throws Exception {
+        public Iterator<String> call(Integer key, Iterator<String> values) {
           StringBuilder sb = new StringBuilder(key.toString());
           while (values.hasNext()) {
             sb.append(values.next());
           }
-          return Collections.singletonList(sb.toString());
+          return Collections.singletonList(sb.toString()).iterator();
         }
       },
       Encoders.STRING());
@@ -228,10 +228,7 @@ public class JavaDatasetSuite implements Serializable {
       grouped2,
       new CoGroupFunction<Integer, String, Integer, String>() {
         @Override
-        public Iterable<String> call(
-          Integer key,
-          Iterator<String> left,
-          Iterator<Integer> right) throws Exception {
+        public Iterator<String> call(Integer key, Iterator<String> left, Iterator<Integer> right) {
           StringBuilder sb = new StringBuilder(key.toString());
           while (left.hasNext()) {
             sb.append(left.next());
@@ -240,7 +237,7 @@ public class JavaDatasetSuite implements Serializable {
           while (right.hasNext()) {
             sb.append(right.next());
           }
-          return Collections.singletonList(sb.toString());
+          return Collections.singletonList(sb.toString()).iterator();
         }
       },
       Encoders.STRING());

http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index a791a47..f10de48 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -166,8 +166,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
    * and then flattening the results
    */
   def flatMap[U](f: FlatMapFunction[T, U]): JavaDStream[U] = {
-    import scala.collection.JavaConverters._
-    def fn: (T) => Iterable[U] = (x: T) => f.call(x).asScala
+    def fn: (T) => Iterator[U] = (x: T) => f.call(x).asScala
     new JavaDStream(dstream.flatMap(fn)(fakeClassTag[U]))(fakeClassTag[U])
   }
 
@@ -176,8 +175,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
    * and then flattening the results
    */
   def flatMapToPair[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
-    import scala.collection.JavaConverters._
-    def fn: (T) => Iterable[(K2, V2)] = (x: T) => f.call(x).asScala
+    def fn: (T) => Iterator[(K2, V2)] = (x: T) => f.call(x).asScala
     def cm: ClassTag[(K2, V2)] = fakeClassTag
     new JavaPairDStream(dstream.flatMap(fn)(cm))(fakeClassTag[K2], fakeClassTag[V2])
   }
@@ -189,7 +187,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
    */
   def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaDStream[U] = {
     def fn: (Iterator[T]) => Iterator[U] = {
-      (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
+      (x: Iterator[T]) => f.call(x.asJava).asScala
     }
     new JavaDStream(dstream.mapPartitions(fn)(fakeClassTag[U]))(fakeClassTag[U])
   }
@@ -202,7 +200,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
   def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2])
   : JavaPairDStream[K2, V2] = {
     def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
-      (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
+      (x: Iterator[T]) => f.call(x.asJava).asScala
     }
     new JavaPairDStream(dstream.mapPartitions(fn))(fakeClassTag[K2], fakeClassTag[V2])
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 1dfb4e7..db79eea 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -550,7 +550,7 @@ abstract class DStream[T: ClassTag] (
    * Return a new DStream by applying a function to all elements of this DStream,
    * and then flattening the results
    */
-  def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = ssc.withScope {
+  def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U] = ssc.withScope {
     new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
index 96a444a..d60a617 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
@@ -25,7 +25,7 @@ import org.apache.spark.streaming.{Duration, Time}
 private[streaming]
 class FlatMappedDStream[T: ClassTag, U: ClassTag](
     parent: DStream[T],
-    flatMapFunc: T => Traversable[U]
+    flatMapFunc: T => TraversableOnce[U]
   ) extends DStream[U](parent.ssc) {
 
   override def dependencies: List[DStream[_]] = List(parent)

http://git-wip-us.apache.org/repos/asf/spark/blob/649e9d0f/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 4dbcef2..806cea2 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -271,12 +271,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     JavaDStream<String> mapped = stream.mapPartitions(
         new FlatMapFunction<Iterator<String>, String>() {
           @Override
-          public Iterable<String> call(Iterator<String> in) {
+          public Iterator<String> call(Iterator<String> in) {
             StringBuilder out = new StringBuilder();
             while (in.hasNext()) {
               out.append(in.next().toUpperCase(Locale.ENGLISH));
             }
-            return Arrays.asList(out.toString());
+            return Arrays.asList(out.toString()).iterator();
           }
         });
     JavaTestUtils.attachTestOutputStream(mapped);
@@ -759,8 +759,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaDStream<String> flatMapped = stream.flatMap(new FlatMapFunction<String, String>() {
       @Override
-      public Iterable<String> call(String x) {
-        return Arrays.asList(x.split("(?!^)"));
+      public Iterator<String> call(String x) {
+        return Arrays.asList(x.split("(?!^)")).iterator();
       }
     });
     JavaTestUtils.attachTestOutputStream(flatMapped);
@@ -846,12 +846,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(
       new PairFlatMapFunction<String, Integer, String>() {
         @Override
-        public Iterable<Tuple2<Integer, String>> call(String in) {
+        public Iterator<Tuple2<Integer, String>> call(String in) {
           List<Tuple2<Integer, String>> out = new ArrayList<>();
           for (String letter: in.split("(?!^)")) {
             out.add(new Tuple2<>(in.length(), letter));
           }
-          return out;
+          return out.iterator();
         }
       });
     JavaTestUtils.attachTestOutputStream(flatMapped);
@@ -1019,13 +1019,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair(
         new PairFlatMapFunction<Iterator<Tuple2<String, Integer>>, Integer, String>() {
           @Override
-          public Iterable<Tuple2<Integer, String>> call(Iterator<Tuple2<String, Integer>> in) {
+          public Iterator<Tuple2<Integer, String>> call(Iterator<Tuple2<String, Integer>> in) {
             List<Tuple2<Integer, String>> out = new LinkedList<>();
             while (in.hasNext()) {
               Tuple2<String, Integer> next = in.next();
               out.add(next.swap());
             }
-            return out;
+            return out.iterator();
           }
         });
 
@@ -1089,12 +1089,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair(
         new PairFlatMapFunction<Tuple2<String, Integer>, Integer, String>() {
           @Override
-          public Iterable<Tuple2<Integer, String>> call(Tuple2<String, Integer> in) {
+          public Iterator<Tuple2<Integer, String>> call(Tuple2<String, Integer> in) {
             List<Tuple2<Integer, String>> out = new LinkedList<>();
             for (Character s : in._1().toCharArray()) {
               out.add(new Tuple2<>(in._2(), s.toString()));
             }
-            return out;
+            return out.iterator();
           }
         });
     JavaTestUtils.attachTestOutputStream(flatMapped);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org