You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/02/16 12:32:56 UTC

[5/8] spark git commit: [SPARK-19550][BUILD][CORE][WIP] Remove Java 7 support

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/docs/streaming-kafka-0-8-integration.md
----------------------------------------------------------------------
diff --git a/docs/streaming-kafka-0-8-integration.md b/docs/streaming-kafka-0-8-integration.md
index 58b17aa..24a3e4c 100644
--- a/docs/streaming-kafka-0-8-integration.md
+++ b/docs/streaming-kafka-0-8-integration.md
@@ -155,33 +155,22 @@ Next, we discuss how to use this approach in your streaming application.
 	</div>
 	<div data-lang="java" markdown="1">
 		// Hold a reference to the current offset ranges, so it can be used downstream
-		final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>();
-
-		directKafkaStream.transformToPair(
-		  new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
-		    @Override
-		    public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception {
-		      OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
-		      offsetRanges.set(offsets);
-		      return rdd;
-		    }
-		  }
-		).map(
+		AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>();
+
+		directKafkaStream.transformToPair(rdd -> {
+      OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
+      offsetRanges.set(offsets);
+      return rdd;
+		}).map(
 		  ...
-		).foreachRDD(
-		  new Function<JavaPairRDD<String, String>, Void>() {
-		    @Override
-		    public Void call(JavaPairRDD<String, String> rdd) throws IOException {
-		      for (OffsetRange o : offsetRanges.get()) {
-		        System.out.println(
-		          o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()
-		        );
-		      }
-		      ...
-		      return null;
-		    }
-		  }
-		);
+		).foreachRDD(rdd -> {
+      for (OffsetRange o : offsetRanges.get()) {
+        System.out.println(
+          o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()
+        );
+      }
+      ...
+		});
 	</div>
 	<div data-lang="python" markdown="1">
 		offsetRanges = []

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/docs/streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index a878971..abd4ac9 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -163,12 +163,7 @@ space into words.
 
 {% highlight java %}
 // Split each line into words
-JavaDStream<String> words = lines.flatMap(
-  new FlatMapFunction<String, String>() {
-    @Override public Iterator<String> call(String x) {
-      return Arrays.asList(x.split(" ")).iterator();
-    }
-  });
+JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
 {% endhighlight %}
 
 `flatMap` is a DStream operation that creates a new DStream by
@@ -183,18 +178,8 @@ Next, we want to count these words.
 
 {% highlight java %}
 // Count each word in each batch
-JavaPairDStream<String, Integer> pairs = words.mapToPair(
-  new PairFunction<String, String, Integer>() {
-    @Override public Tuple2<String, Integer> call(String s) {
-      return new Tuple2<>(s, 1);
-    }
-  });
-JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
-  new Function2<Integer, Integer, Integer>() {
-    @Override public Integer call(Integer i1, Integer i2) {
-      return i1 + i2;
-    }
-  });
+JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
+JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);
 
 // Print the first ten elements of each RDD generated in this DStream to the console
 wordCounts.print();
@@ -836,11 +821,9 @@ the `(word, 1)` pairs) and the `runningCount` having the previous count.
 
 {% highlight java %}
 Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =
-  new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
-    @Override public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
-      Integer newSum = ...  // add the new values with the previous running count to get the new count
-      return Optional.of(newSum);
-    }
+  (values, state) -> {
+    Integer newSum = ...  // add the new values with the previous running count to get the new count
+    return Optional.of(newSum);
   };
 {% endhighlight %}
 
@@ -915,15 +898,12 @@ val cleanedDStream = wordCounts.transform { rdd =>
 {% highlight java %}
 import org.apache.spark.streaming.api.java.*;
 // RDD containing spam information
-final JavaPairRDD<String, Double> spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...);
+JavaPairRDD<String, Double> spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...);
 
-JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(
-  new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>>() {
-    @Override public JavaPairRDD<String, Integer> call(JavaPairRDD<String, Integer> rdd) throws Exception {
-      rdd.join(spamInfoRDD).filter(...); // join data stream with spam information to do data cleaning
-      ...
-    }
-  });
+JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(rdd -> {
+  rdd.join(spamInfoRDD).filter(...); // join data stream with spam information to do data cleaning
+  ...
+});
 {% endhighlight %}
 
 </div>
@@ -986,15 +966,8 @@ val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Se
 <div data-lang="java" markdown="1">
 
 {% highlight java %}
-// Reduce function adding two integers, defined separately for clarity
-Function2<Integer, Integer, Integer> reduceFunc = new Function2<Integer, Integer, Integer>() {
-  @Override public Integer call(Integer i1, Integer i2) {
-    return i1 + i2;
-  }
-};
-
 // Reduce last 30 seconds of data, every 10 seconds
-JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow(reduceFunc, Durations.seconds(30), Durations.seconds(10));
+JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow((i1, i2) -> i1 + i2, Durations.seconds(30), Durations.seconds(10));
 {% endhighlight %}
 
 </div>
@@ -1141,14 +1114,7 @@ val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
 {% highlight java %}
 JavaPairRDD<String, String> dataset = ...
 JavaPairDStream<String, String> windowedStream = stream.window(Durations.seconds(20));
-JavaPairDStream<String, String> joinedStream = windowedStream.transform(
-  new Function<JavaRDD<Tuple2<String, String>>, JavaRDD<Tuple2<String, String>>>() {
-    @Override
-    public JavaRDD<Tuple2<String, String>> call(JavaRDD<Tuple2<String, String>> rdd) {
-      return rdd.join(dataset);
-    }
-  }
-);
+JavaPairDStream<String, String> joinedStream = windowedStream.transform(rdd -> rdd.join(dataset));
 {% endhighlight %}
 </div>
 <div data-lang="python" markdown="1">
@@ -1248,17 +1214,11 @@ dstream.foreachRDD { rdd =>
 </div>
 <div data-lang="java" markdown="1">
 {% highlight java %}
-dstream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
-  @Override
-  public void call(JavaRDD<String> rdd) {
-    final Connection connection = createNewConnection(); // executed at the driver
-    rdd.foreach(new VoidFunction<String>() {
-      @Override
-      public void call(String record) {
-        connection.send(record); // executed at the worker
-      }
-    });
-  }
+dstream.foreachRDD(rdd -> {
+  Connection connection = createNewConnection(); // executed at the driver
+  rdd.foreach(record -> {
+    connection.send(record); // executed at the worker
+  });
 });
 {% endhighlight %}
 </div>
@@ -1297,18 +1257,12 @@ dstream.foreachRDD { rdd =>
 </div>
 <div data-lang="java" markdown="1">
 {% highlight java %}
-dstream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
-  @Override
-  public void call(JavaRDD<String> rdd) {
-    rdd.foreach(new VoidFunction<String>() {
-      @Override
-      public void call(String record) {
-        Connection connection = createNewConnection();
-        connection.send(record);
-        connection.close();
-      }
-    });
-  }
+dstream.foreachRDD(rdd -> {
+  rdd.foreach(record -> {
+    Connection connection = createNewConnection();
+    connection.send(record);
+    connection.close();
+  });
 });
 {% endhighlight %}
 </div>
@@ -1344,20 +1298,14 @@ dstream.foreachRDD { rdd =>
 </div>
 <div data-lang="java" markdown="1">
 {% highlight java %}
-dstream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
-  @Override
-  public void call(JavaRDD<String> rdd) {
-    rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
-      @Override
-      public void call(Iterator<String> partitionOfRecords) {
-        Connection connection = createNewConnection();
-        while (partitionOfRecords.hasNext()) {
-          connection.send(partitionOfRecords.next());
-        }
-        connection.close();
-      }
-    });
-  }
+dstream.foreachRDD(rdd -> {
+  rdd.foreachPartition(partitionOfRecords -> {
+    Connection connection = createNewConnection();
+    while (partitionOfRecords.hasNext()) {
+      connection.send(partitionOfRecords.next());
+    }
+    connection.close();
+  });
 });
 {% endhighlight %}
 </div>
@@ -1396,21 +1344,15 @@ dstream.foreachRDD { rdd =>
 
 <div data-lang="java" markdown="1">
 {% highlight java %}
-dstream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
-  @Override
-  public void call(JavaRDD<String> rdd) {
-    rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
-      @Override
-      public void call(Iterator<String> partitionOfRecords) {
-        // ConnectionPool is a static, lazily initialized pool of connections
-        Connection connection = ConnectionPool.getConnection();
-        while (partitionOfRecords.hasNext()) {
-          connection.send(partitionOfRecords.next());
-        }
-        ConnectionPool.returnConnection(connection); // return to the pool for future reuse
-      }
-    });
-  }
+dstream.foreachRDD(rdd -> {
+  rdd.foreachPartition(partitionOfRecords -> {
+    // ConnectionPool is a static, lazily initialized pool of connections
+    Connection connection = ConnectionPool.getConnection();
+    while (partitionOfRecords.hasNext()) {
+      connection.send(partitionOfRecords.next());
+    }
+    ConnectionPool.returnConnection(connection); // return to the pool for future reuse
+  });
 });
 {% endhighlight %}
 </div>
@@ -1495,35 +1437,26 @@ public class JavaRow implements java.io.Serializable {
 
 JavaDStream<String> words = ... 
 
-words.foreachRDD(
-  new Function2<JavaRDD<String>, Time, Void>() {
-    @Override
-    public Void call(JavaRDD<String> rdd, Time time) {
-
-      // Get the singleton instance of SparkSession
-      SparkSession spark = SparkSession.builder().config(rdd.sparkContext().getConf()).getOrCreate();
+words.foreachRDD((rdd, time) -> {
+  // Get the singleton instance of SparkSession
+  SparkSession spark = SparkSession.builder().config(rdd.sparkContext().getConf()).getOrCreate();
 
-      // Convert RDD[String] to RDD[case class] to DataFrame
-      JavaRDD<JavaRow> rowRDD = rdd.map(new Function<String, JavaRow>() {
-        public JavaRow call(String word) {
-          JavaRow record = new JavaRow();
-          record.setWord(word);
-          return record;
-        }
-      });
-      DataFrame wordsDataFrame = spark.createDataFrame(rowRDD, JavaRow.class);
+  // Convert RDD[String] to RDD[case class] to DataFrame
+  JavaRDD<JavaRow> rowRDD = rdd.map(word -> {
+    JavaRow record = new JavaRow();
+    record.setWord(word);
+    return record;
+  });
+  DataFrame wordsDataFrame = spark.createDataFrame(rowRDD, JavaRow.class);
 
-      // Creates a temporary view using the DataFrame
-      wordsDataFrame.createOrReplaceTempView("words");
+  // Creates a temporary view using the DataFrame
+  wordsDataFrame.createOrReplaceTempView("words");
 
-      // Do word count on table using SQL and print it
-      DataFrame wordCountsDataFrame =
-        spark.sql("select word, count(*) as total from words group by word");
-      wordCountsDataFrame.show();
-      return null;
-    }
-  }
-);
+  // Do word count on table using SQL and print it
+  DataFrame wordCountsDataFrame =
+    spark.sql("select word, count(*) as total from words group by word");
+  wordCountsDataFrame.show();
+});
 {% endhighlight %}
 
 See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java).
@@ -1883,27 +1816,21 @@ class JavaDroppedWordsCounter {
   }
 }
 
-wordCounts.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
-  @Override
-  public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws IOException {
-    // Get or register the blacklist Broadcast
-    final Broadcast<List<String>> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
-    // Get or register the droppedWordsCounter Accumulator
-    final LongAccumulator droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
-    // Use blacklist to drop words and use droppedWordsCounter to count them
-    String counts = rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() {
-      @Override
-      public Boolean call(Tuple2<String, Integer> wordCount) throws Exception {
-        if (blacklist.value().contains(wordCount._1())) {
-          droppedWordsCounter.add(wordCount._2());
-          return false;
-        } else {
-          return true;
-        }
-      }
-    }).collect().toString();
-    String output = "Counts at time " + time + " " + counts;
-  }
+wordCounts.foreachRDD((rdd, time) -> {
+  // Get or register the blacklist Broadcast
+  Broadcast<List<String>> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
+  // Get or register the droppedWordsCounter Accumulator
+  LongAccumulator droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
+  // Use blacklist to drop words and use droppedWordsCounter to count them
+  String counts = rdd.filter(wordCount -> {
+    if (blacklist.value().contains(wordCount._1())) {
+      droppedWordsCounter.add(wordCount._2());
+      return false;
+    } else {
+      return true;
+    }
+  }).collect().toString();
+  String output = "Counts at time " + time + " " + counts;
 }
 
 {% endhighlight %}

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/docs/structured-streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index b816072..ad3b2fb 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -103,13 +103,7 @@ Dataset<Row> lines = spark
 // Split the lines into words
 Dataset<String> words = lines
   .as(Encoders.STRING())
-  .flatMap(
-    new FlatMapFunction<String, String>() {
-      @Override
-      public Iterator<String> call(String x) {
-        return Arrays.asList(x.split(" ")).iterator();
-      }
-    }, Encoders.STRING());
+  .flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(), Encoders.STRING());
 
 // Generate running word count
 Dataset<Row> wordCounts = words.groupBy("value").count();
@@ -517,7 +511,7 @@ val csvDF = spark
 SparkSession spark = ...
 
 // Read text from socket 
-Dataset[Row] socketDF = spark
+Dataset<Row> socketDF = spark
   .readStream()
   .format("socket")
   .option("host", "localhost")
@@ -530,7 +524,7 @@ socketDF.printSchema();
 
 // Read all the csv files written atomically in a directory
 StructType userSchema = new StructType().add("name", "string").add("age", "integer");
-Dataset[Row] csvDF = spark
+Dataset<Row> csvDF = spark
   .readStream()
   .option("sep", ";")
   .schema(userSchema)      // Specify schema of the csv files
@@ -625,33 +619,15 @@ Dataset<DeviceData> ds = df.as(ExpressionEncoder.javaBean(DeviceData.class)); //
 
 // Select the devices which have signal more than 10
 df.select("device").where("signal > 10"); // using untyped APIs
-ds.filter(new FilterFunction<DeviceData>() { // using typed APIs
-  @Override
-  public boolean call(DeviceData value) throws Exception {
-    return value.getSignal() > 10;
-  }
-}).map(new MapFunction<DeviceData, String>() {
-  @Override
-  public String call(DeviceData value) throws Exception {
-    return value.getDevice();
-  }
-}, Encoders.STRING());
+ds.filter((FilterFunction<DeviceData>) value -> value.getSignal() > 10)
+  .map((MapFunction<DeviceData, String>) value -> value.getDevice(), Encoders.STRING());
 
 // Running count of the number of updates for each device type
 df.groupBy("deviceType").count(); // using untyped API
 
 // Running average signal for each device type
-ds.groupByKey(new MapFunction<DeviceData, String>() { // using typed API
-  @Override
-  public String call(DeviceData value) throws Exception {
-    return value.getDeviceType();
-  }
-}, Encoders.STRING()).agg(typed.avg(new MapFunction<DeviceData, Double>() {
-  @Override
-  public Double call(DeviceData value) throws Exception {
-    return value.getSignal();
-  }
-}));
+ds.groupByKey((MapFunction<DeviceData, String>) value -> value.getDeviceType(), Encoders.STRING())
+  .agg(typed.avg((MapFunction<DeviceData, Double>) value -> value.getSignal()));
 {% endhighlight %}
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java
index f42fd33..004e9b1 100644
--- a/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java
@@ -69,9 +69,9 @@ public class JavaTokenizerExample {
         .setOutputCol("words")
         .setPattern("\\W");  // alternatively .setPattern("\\w+").setGaps(false);
 
-    spark.udf().register("countTokens", new UDF1<WrappedArray, Integer>() {
+    spark.udf().register("countTokens", new UDF1<WrappedArray<String>, Integer>() {
       @Override
-      public Integer call(WrappedArray words) {
+      public Integer call(WrappedArray<String> words) {
         return words.size();
       }
     }, DataTypes.IntegerType);

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
index 1860594..b687fae 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
@@ -224,7 +224,7 @@ public class JavaSQLDataSourceExample {
             "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
     JavaRDD<String> anotherPeopleRDD =
             new JavaSparkContext(spark.sparkContext()).parallelize(jsonData);
-    Dataset anotherPeople = spark.read().json(anotherPeopleRDD);
+    Dataset<Row> anotherPeople = spark.read().json(anotherPeopleRDD);
     anotherPeople.show();
     // +---------------+----+
     // |        address|name|

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/external/java8-tests/README.md
----------------------------------------------------------------------
diff --git a/external/java8-tests/README.md b/external/java8-tests/README.md
deleted file mode 100644
index aa87901..0000000
--- a/external/java8-tests/README.md
+++ /dev/null
@@ -1,22 +0,0 @@
-# Java 8 Test Suites
-
-These tests require having Java 8 installed and are isolated from the main Spark build.
-If Java 8 is not your system's default Java version, you will need to point Spark's build
-to your Java location. The set-up depends a bit on the build system:
-
-* Sbt users can either set JAVA_HOME to the location of a Java 8 JDK or explicitly pass
-  `-java-home` to the sbt launch script. If a Java 8 JDK is detected sbt will automatically
-  include the Java 8 test project.
-
-  `$ JAVA_HOME=/opt/jdk1.8.0/ build/sbt clean java8-tests/test
-
-* For Maven users,
-
-  Maven users can also refer to their Java 8 directory using JAVA_HOME.
-
-  `$ JAVA_HOME=/opt/jdk1.8.0/ mvn clean install -DskipTests`
-  `$ JAVA_HOME=/opt/jdk1.8.0/ mvn -pl :java8-tests_2.11 test`
-
-  Note that the above command can only be run from project root directory since this module
-  depends on core and the test-jars of core and streaming. This means an install step is
-  required to make the test dependencies visible to the Java 8 sub-project.

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/external/java8-tests/pom.xml
----------------------------------------------------------------------
diff --git a/external/java8-tests/pom.xml b/external/java8-tests/pom.xml
deleted file mode 100644
index 8fc46d7..0000000
--- a/external/java8-tests/pom.xml
+++ /dev/null
@@ -1,132 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-~ Licensed to the Apache Software Foundation (ASF) under one or more
-~ contributor license agreements.  See the NOTICE file distributed with
-~ this work for additional information regarding copyright ownership.
-~ The ASF licenses this file to You under the Apache License, Version 2.0
-~ (the "License"); you may not use this file except in compliance with
-~ the License.  You may obtain a copy of the License at
-~
-~    http://www.apache.org/licenses/LICENSE-2.0
-~
-~ Unless required by applicable law or agreed to in writing, software
-~ distributed under the License is distributed on an "AS IS" BASIS,
-~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-~ See the License for the specific language governing permissions and
-~ limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.spark</groupId>
-    <artifactId>spark-parent_2.11</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
-    <relativePath>../../pom.xml</relativePath>
-  </parent>
-
-  <artifactId>java8-tests_2.11</artifactId>
-  <packaging>pom</packaging>
-  <name>Spark Project Java 8 Tests</name>
-
-  <properties>
-    <sbt.project.name>java8-tests</sbt.project.name>
-  </properties>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-core_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-core_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-sql_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-sql_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-tags_${scala.binary.version}</artifactId>
-    </dependency>
-
-    <!--
-      This spark-tags test-dep is needed even though it isn't used in this module, otherwise testing-cmds that exclude
-      them will yield errors.
-    -->
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-tags_${scala.binary.version}</artifactId>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
-
-  </dependencies>
-
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-deploy-plugin</artifactId>
-        <configuration>
-          <skip>true</skip>
-        </configuration>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-install-plugin</artifactId>
-        <configuration>
-          <skip>true</skip>
-        </configuration>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-compiler-plugin</artifactId>
-        <configuration>
-          <forceJavacCompilerUse>true</forceJavacCompilerUse>
-          <source>1.8</source>
-          <target>1.8</target>
-          <compilerVersion>1.8</compilerVersion>
-        </configuration>
-      </plugin>
-      <plugin>
-        <groupId>net.alchim31.maven</groupId>
-        <artifactId>scala-maven-plugin</artifactId>
-        <configuration>
-          <useZincServer>${useZincForJdk8}</useZincServer>
-          <javacArgs>
-            <javacArg>-source</javacArg>
-            <javacArg>1.8</javacArg>
-            <javacArg>-target</javacArg>
-            <javacArg>1.8</javacArg>
-            <javacArg>-Xlint:all,-serial,-path</javacArg>
-          </javacArgs>
-        </configuration>
-      </plugin>
-    </plugins>
-  </build>
-</project>

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/external/java8-tests/src/test/java/test/org/apache/spark/java8/Java8RDDAPISuite.java
----------------------------------------------------------------------
diff --git a/external/java8-tests/src/test/java/test/org/apache/spark/java8/Java8RDDAPISuite.java b/external/java8-tests/src/test/java/test/org/apache/spark/java8/Java8RDDAPISuite.java
deleted file mode 100644
index fa3a66e..0000000
--- a/external/java8-tests/src/test/java/test/org/apache/spark/java8/Java8RDDAPISuite.java
+++ /dev/null
@@ -1,356 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package test.org.apache.spark.java8;
-
-import java.io.File;
-import java.io.Serializable;
-import java.util.*;
-
-import scala.Tuple2;
-
-import com.google.common.collect.Iterables;
-import com.google.common.io.Files;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.spark.api.java.JavaDoubleRDD;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.Optional;
-import org.apache.spark.api.java.function.*;
-import org.apache.spark.util.Utils;
-
-/**
- * Most of these tests replicate org.apache.spark.JavaAPISuite using java 8
- * lambda syntax.
- */
-public class Java8RDDAPISuite implements Serializable {
-  private static int foreachCalls = 0;
-  private transient JavaSparkContext sc;
-
-  @Before
-  public void setUp() {
-    sc = new JavaSparkContext("local", "JavaAPISuite");
-  }
-
-  @After
-  public void tearDown() {
-    sc.stop();
-    sc = null;
-  }
-
-  @Test
-  public void foreachWithAnonymousClass() {
-    foreachCalls = 0;
-    JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
-    rdd.foreach(new VoidFunction<String>() {
-      @Override
-      public void call(String s) {
-        foreachCalls++;
-      }
-    });
-    Assert.assertEquals(2, foreachCalls);
-  }
-
-  @Test
-  public void foreach() {
-    foreachCalls = 0;
-    JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
-    rdd.foreach(x -> foreachCalls++);
-    Assert.assertEquals(2, foreachCalls);
-  }
-
-  @Test
-  public void groupBy() {
-    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
-    Function<Integer, Boolean> isOdd = x -> x % 2 == 0;
-    JavaPairRDD<Boolean, Iterable<Integer>> oddsAndEvens = rdd.groupBy(isOdd);
-    Assert.assertEquals(2, oddsAndEvens.count());
-    Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0)));  // Evens
-    Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
-
-    oddsAndEvens = rdd.groupBy(isOdd, 1);
-    Assert.assertEquals(2, oddsAndEvens.count());
-    Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0)));  // Evens
-    Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
-  }
-
-  @Test
-  public void leftOuterJoin() {
-    JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
-      new Tuple2<>(1, 1),
-      new Tuple2<>(1, 2),
-      new Tuple2<>(2, 1),
-      new Tuple2<>(3, 1)
-    ));
-    JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList(
-      new Tuple2<>(1, 'x'),
-      new Tuple2<>(2, 'y'),
-      new Tuple2<>(2, 'z'),
-      new Tuple2<>(4, 'w')
-    ));
-    List<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>> joined =
-      rdd1.leftOuterJoin(rdd2).collect();
-    Assert.assertEquals(5, joined.size());
-    Tuple2<Integer, Tuple2<Integer, Optional<Character>>> firstUnmatched =
-      rdd1.leftOuterJoin(rdd2).filter(tup -> !tup._2()._2().isPresent()).first();
-    Assert.assertEquals(3, firstUnmatched._1().intValue());
-  }
-
-  @Test
-  public void foldReduce() {
-    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
-    Function2<Integer, Integer, Integer> add = (a, b) -> a + b;
-
-    int sum = rdd.fold(0, add);
-    Assert.assertEquals(33, sum);
-
-    sum = rdd.reduce(add);
-    Assert.assertEquals(33, sum);
-  }
-
-  @Test
-  public void foldByKey() {
-    List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
-      new Tuple2<>(2, 1),
-      new Tuple2<>(2, 1),
-      new Tuple2<>(1, 1),
-      new Tuple2<>(3, 2),
-      new Tuple2<>(3, 1)
-    );
-    JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
-    JavaPairRDD<Integer, Integer> sums = rdd.foldByKey(0, (a, b) -> a + b);
-    Assert.assertEquals(1, sums.lookup(1).get(0).intValue());
-    Assert.assertEquals(2, sums.lookup(2).get(0).intValue());
-    Assert.assertEquals(3, sums.lookup(3).get(0).intValue());
-  }
-
-  @Test
-  public void reduceByKey() {
-    List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
-      new Tuple2<>(2, 1),
-      new Tuple2<>(2, 1),
-      new Tuple2<>(1, 1),
-      new Tuple2<>(3, 2),
-      new Tuple2<>(3, 1)
-    );
-    JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
-    JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
-    Assert.assertEquals(1, counts.lookup(1).get(0).intValue());
-    Assert.assertEquals(2, counts.lookup(2).get(0).intValue());
-    Assert.assertEquals(3, counts.lookup(3).get(0).intValue());
-
-    Map<Integer, Integer> localCounts = counts.collectAsMap();
-    Assert.assertEquals(1, localCounts.get(1).intValue());
-    Assert.assertEquals(2, localCounts.get(2).intValue());
-    Assert.assertEquals(3, localCounts.get(3).intValue());
-
-    localCounts = rdd.reduceByKeyLocally((a, b) -> a + b);
-    Assert.assertEquals(1, localCounts.get(1).intValue());
-    Assert.assertEquals(2, localCounts.get(2).intValue());
-    Assert.assertEquals(3, localCounts.get(3).intValue());
-  }
-
-  @Test
-  public void map() {
-    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
-    JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x).cache();
-    doubles.collect();
-    JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(x -> new Tuple2<>(x, x))
-      .cache();
-    pairs.collect();
-    JavaRDD<String> strings = rdd.map(Object::toString).cache();
-    strings.collect();
-  }
-
-  @Test
-  public void flatMap() {
-    JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello World!",
-      "The quick brown fox jumps over the lazy dog."));
-    JavaRDD<String> words = rdd.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
-
-    Assert.assertEquals("Hello", words.first());
-    Assert.assertEquals(11, words.count());
-
-    JavaPairRDD<String, String> pairs = rdd.flatMapToPair(s -> {
-      List<Tuple2<String, String>> pairs2 = new LinkedList<>();
-      for (String word : s.split(" ")) {
-        pairs2.add(new Tuple2<>(word, word));
-      }
-      return pairs2.iterator();
-    });
-
-    Assert.assertEquals(new Tuple2<>("Hello", "Hello"), pairs.first());
-    Assert.assertEquals(11, pairs.count());
-
-    JavaDoubleRDD doubles = rdd.flatMapToDouble(s -> {
-      List<Double> lengths = new LinkedList<>();
-      for (String word : s.split(" ")) {
-        lengths.add((double) word.length());
-      }
-      return lengths.iterator();
-    });
-
-    Assert.assertEquals(5.0, doubles.first(), 0.01);
-    Assert.assertEquals(11, pairs.count());
-  }
-
-  @Test
-  public void mapsFromPairsToPairs() {
-    List<Tuple2<Integer, String>> pairs = Arrays.asList(
-      new Tuple2<>(1, "a"),
-      new Tuple2<>(2, "aa"),
-      new Tuple2<>(3, "aaa")
-    );
-    JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs);
-
-    // Regression test for SPARK-668:
-    JavaPairRDD<String, Integer> swapped =
-      pairRDD.flatMapToPair(x -> Collections.singletonList(x.swap()).iterator());
-    swapped.collect();
-
-    // There was never a bug here, but it's worth testing:
-    pairRDD.map(Tuple2::swap).collect();
-  }
-
-  @Test
-  public void mapPartitions() {
-    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
-    JavaRDD<Integer> partitionSums = rdd.mapPartitions(iter -> {
-      int sum = 0;
-      while (iter.hasNext()) {
-        sum += iter.next();
-      }
-      return Collections.singletonList(sum).iterator();
-    });
-
-    Assert.assertEquals("[3, 7]", partitionSums.collect().toString());
-  }
-
-  @Test
-  public void sequenceFile() {
-    File tempDir = Files.createTempDir();
-    tempDir.deleteOnExit();
-    String outputDir = new File(tempDir, "output").getAbsolutePath();
-    List<Tuple2<Integer, String>> pairs = Arrays.asList(
-      new Tuple2<>(1, "a"),
-      new Tuple2<>(2, "aa"),
-      new Tuple2<>(3, "aaa")
-    );
-    JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
-
-    rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
-      .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
-
-    // Try reading the output back as an object file
-    JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class, Text.class)
-      .mapToPair(pair -> new Tuple2<>(pair._1().get(), pair._2().toString()));
-    Assert.assertEquals(pairs, readRDD.collect());
-    Utils.deleteRecursively(tempDir);
-  }
-
-  @Test
-  public void zip() {
-    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
-    JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x);
-    JavaPairRDD<Integer, Double> zipped = rdd.zip(doubles);
-    zipped.count();
-  }
-
-  @Test
-  public void zipPartitions() {
-    JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2);
-    JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("1", "2", "3", "4"), 2);
-    FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer> sizesFn =
-      (Iterator<Integer> i, Iterator<String> s) -> {
-        int sizeI = 0;
-        while (i.hasNext()) {
-          sizeI += 1;
-          i.next();
-        }
-        int sizeS = 0;
-        while (s.hasNext()) {
-          sizeS += 1;
-          s.next();
-        }
-        return Arrays.asList(sizeI, sizeS).iterator();
-      };
-    JavaRDD<Integer> sizes = rdd1.zipPartitions(rdd2, sizesFn);
-    Assert.assertEquals("[3, 2, 3, 2]", sizes.collect().toString());
-  }
-
-  @Test
-  public void keyBy() {
-    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2));
-    List<Tuple2<String, Integer>> s = rdd.keyBy(Object::toString).collect();
-    Assert.assertEquals(new Tuple2<>("1", 1), s.get(0));
-    Assert.assertEquals(new Tuple2<>("2", 2), s.get(1));
-  }
-
-  @Test
-  public void mapOnPairRDD() {
-    JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4));
-    JavaPairRDD<Integer, Integer> rdd2 =
-      rdd1.mapToPair(i -> new Tuple2<>(i, i % 2));
-    JavaPairRDD<Integer, Integer> rdd3 =
-      rdd2.mapToPair(in -> new Tuple2<>(in._2(), in._1()));
-    Assert.assertEquals(Arrays.asList(
-      new Tuple2<>(1, 1),
-      new Tuple2<>(0, 2),
-      new Tuple2<>(1, 3),
-      new Tuple2<>(0, 4)), rdd3.collect());
-  }
-
-  @Test
-  public void collectPartitions() {
-    JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
-
-    JavaPairRDD<Integer, Integer> rdd2 =
-      rdd1.mapToPair(i -> new Tuple2<>(i, i % 2));
-    List<Integer>[] parts = rdd1.collectPartitions(new int[]{0});
-    Assert.assertEquals(Arrays.asList(1, 2), parts[0]);
-
-    parts = rdd1.collectPartitions(new int[]{1, 2});
-    Assert.assertEquals(Arrays.asList(3, 4), parts[0]);
-    Assert.assertEquals(Arrays.asList(5, 6, 7), parts[1]);
-
-    Assert.assertEquals(Arrays.asList(new Tuple2<>(1, 1), new Tuple2<>(2, 0)),
-      rdd2.collectPartitions(new int[]{0})[0]);
-
-    List<Tuple2<Integer, Integer>>[] parts2 = rdd2.collectPartitions(new int[]{1, 2});
-    Assert.assertEquals(Arrays.asList(new Tuple2<>(3, 1), new Tuple2<>(4, 0)), parts2[0]);
-    Assert.assertEquals(Arrays.asList(new Tuple2<>(5, 1), new Tuple2<>(6, 0), new Tuple2<>(7, 1)),
-      parts2[1]);
-  }
-
-  @Test
-  public void collectAsMapWithIntArrayValues() {
-    // Regression test for SPARK-1040
-    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1));
-    JavaPairRDD<Integer, int[]> pairRDD =
-      rdd.mapToPair(x -> new Tuple2<>(x, new int[]{x}));
-    pairRDD.collect();  // Works fine
-    pairRDD.collectAsMap();  // Used to crash with ClassCastException
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/external/java8-tests/src/test/java/test/org/apache/spark/java8/dstream/Java8APISuite.java
----------------------------------------------------------------------
diff --git a/external/java8-tests/src/test/java/test/org/apache/spark/java8/dstream/Java8APISuite.java b/external/java8-tests/src/test/java/test/org/apache/spark/java8/dstream/Java8APISuite.java
deleted file mode 100644
index 338ca54..0000000
--- a/external/java8-tests/src/test/java/test/org/apache/spark/java8/dstream/Java8APISuite.java
+++ /dev/null
@@ -1,882 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package test.org.apache.spark.java8.dstream;
-
-import java.io.Serializable;
-import java.util.*;
-
-import scala.Tuple2;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.spark.HashPartitioner;
-import org.apache.spark.api.java.Optional;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.streaming.*;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaMapWithStateDStream;
-
-/**
- * Most of these tests replicate org.apache.spark.streaming.JavaAPISuite using java 8
- * lambda syntax.
- */
-@SuppressWarnings("unchecked")
-public class Java8APISuite extends LocalJavaStreamingContext implements Serializable {
-
-  @Test
-  public void testMap() {
-    List<List<String>> inputData = Arrays.asList(
-      Arrays.asList("hello", "world"),
-      Arrays.asList("goodnight", "moon"));
-
-    List<List<Integer>> expected = Arrays.asList(
-      Arrays.asList(5, 5),
-      Arrays.asList(9, 4));
-
-    JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
-    JavaDStream<Integer> letterCount = stream.map(String::length);
-    JavaTestUtils.attachTestOutputStream(letterCount);
-    List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
-    assertOrderInvariantEquals(expected, result);
-  }
-
-  @Test
-  public void testFilter() {
-    List<List<String>> inputData = Arrays.asList(
-      Arrays.asList("giants", "dodgers"),
-      Arrays.asList("yankees", "red sox"));
-
-    List<List<String>> expected = Arrays.asList(
-      Arrays.asList("giants"),
-      Arrays.asList("yankees"));
-
-    JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
-    JavaDStream<String> filtered = stream.filter(s -> s.contains("a"));
-    JavaTestUtils.attachTestOutputStream(filtered);
-    List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
-    assertOrderInvariantEquals(expected, result);
-  }
-
-  @Test
-  public void testMapPartitions() {
-    List<List<String>> inputData = Arrays.asList(
-      Arrays.asList("giants", "dodgers"),
-      Arrays.asList("yankees", "red sox"));
-
-    List<List<String>> expected = Arrays.asList(
-      Arrays.asList("GIANTSDODGERS"),
-      Arrays.asList("YANKEESRED SOX"));
-
-    JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
-    JavaDStream<String> mapped = stream.mapPartitions(in -> {
-      String out = "";
-      while (in.hasNext()) {
-        out = out + in.next().toUpperCase();
-      }
-      return Lists.newArrayList(out).iterator();
-    });
-    JavaTestUtils.attachTestOutputStream(mapped);
-    List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
-    Assert.assertEquals(expected, result);
-  }
-
-  @Test
-  public void testReduce() {
-    List<List<Integer>> inputData = Arrays.asList(
-      Arrays.asList(1, 2, 3),
-      Arrays.asList(4, 5, 6),
-      Arrays.asList(7, 8, 9));
-
-    List<List<Integer>> expected = Arrays.asList(
-      Arrays.asList(6),
-      Arrays.asList(15),
-      Arrays.asList(24));
-
-    JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
-    JavaDStream<Integer> reduced = stream.reduce((x, y) -> x + y);
-    JavaTestUtils.attachTestOutputStream(reduced);
-    List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
-
-    Assert.assertEquals(expected, result);
-  }
-
-  @Test
-  public void testReduceByWindow() {
-    List<List<Integer>> inputData = Arrays.asList(
-      Arrays.asList(1, 2, 3),
-      Arrays.asList(4, 5, 6),
-      Arrays.asList(7, 8, 9));
-
-    List<List<Integer>> expected = Arrays.asList(
-      Arrays.asList(6),
-      Arrays.asList(21),
-      Arrays.asList(39),
-      Arrays.asList(24));
-
-    JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
-    JavaDStream<Integer> reducedWindowed = stream.reduceByWindow((x, y) -> x + y,
-      (x, y) -> x - y, new Duration(2000), new Duration(1000));
-    JavaTestUtils.attachTestOutputStream(reducedWindowed);
-    List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);
-
-    Assert.assertEquals(expected, result);
-  }
-
-  @Test
-  public void testTransform() {
-    List<List<Integer>> inputData = Arrays.asList(
-      Arrays.asList(1, 2, 3),
-      Arrays.asList(4, 5, 6),
-      Arrays.asList(7, 8, 9));
-
-    List<List<Integer>> expected = Arrays.asList(
-      Arrays.asList(3, 4, 5),
-      Arrays.asList(6, 7, 8),
-      Arrays.asList(9, 10, 11));
-
-    JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
-    JavaDStream<Integer> transformed = stream.transform(in -> in.map(i -> i + 2));
-
-    JavaTestUtils.attachTestOutputStream(transformed);
-    List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
-
-    assertOrderInvariantEquals(expected, result);
-  }
-
-  @Test
-  public void testVariousTransform() {
-    // tests whether all variations of transform can be called from Java
-
-    List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1));
-    JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
-
-    List<List<Tuple2<String, Integer>>> pairInputData =
-      Arrays.asList(Arrays.asList(new Tuple2<>("x", 1)));
-    JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(
-      JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1));
-
-    JavaDStream<Integer> transformed1 = stream.transform(in -> null);
-    JavaDStream<Integer> transformed2 = stream.transform((x, time) -> null);
-    JavaPairDStream<String, Integer> transformed3 = stream.transformToPair(x -> null);
-    JavaPairDStream<String, Integer> transformed4 = stream.transformToPair((x, time) -> null);
-    JavaDStream<Integer> pairTransformed1 = pairStream.transform(x -> null);
-    JavaDStream<Integer> pairTransformed2 = pairStream.transform((x, time) -> null);
-    JavaPairDStream<String, String> pairTransformed3 = pairStream.transformToPair(x -> null);
-    JavaPairDStream<String, String> pairTransformed4 =
-      pairStream.transformToPair((x, time) -> null);
-
-  }
-
-  @Test
-  public void testTransformWith() {
-    List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
-      Arrays.asList(
-        new Tuple2<>("california", "dodgers"),
-        new Tuple2<>("new york", "yankees")),
-      Arrays.asList(
-        new Tuple2<>("california", "sharks"),
-        new Tuple2<>("new york", "rangers")));
-
-    List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
-      Arrays.asList(
-        new Tuple2<>("california", "giants"),
-        new Tuple2<>("new york", "mets")),
-      Arrays.asList(
-        new Tuple2<>("california", "ducks"),
-        new Tuple2<>("new york", "islanders")));
-
-
-    List<Set<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
-      Sets.newHashSet(
-        new Tuple2<>("california",
-          new Tuple2<>("dodgers", "giants")),
-        new Tuple2<>("new york",
-          new Tuple2<>("yankees", "mets"))),
-      Sets.newHashSet(
-        new Tuple2<>("california",
-          new Tuple2<>("sharks", "ducks")),
-        new Tuple2<>("new york",
-          new Tuple2<>("rangers", "islanders"))));
-
-    JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
-      ssc, stringStringKVStream1, 1);
-    JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
-
-    JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
-      ssc, stringStringKVStream2, 1);
-    JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
-
-    JavaPairDStream<String, Tuple2<String, String>> joined =
-      pairStream1.transformWithToPair(pairStream2,(x, y, z) -> x.join(y));
-
-    JavaTestUtils.attachTestOutputStream(joined);
-    List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-    List<Set<Tuple2<String, Tuple2<String, String>>>> unorderedResult = Lists.newArrayList();
-    for (List<Tuple2<String, Tuple2<String, String>>> res : result) {
-      unorderedResult.add(Sets.newHashSet(res));
-    }
-
-    Assert.assertEquals(expected, unorderedResult);
-  }
-
-
-  @Test
-  public void testVariousTransformWith() {
-    // tests whether all variations of transformWith can be called from Java
-
-    List<List<Integer>> inputData1 = Arrays.asList(Arrays.asList(1));
-    List<List<String>> inputData2 = Arrays.asList(Arrays.asList("x"));
-    JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 1);
-    JavaDStream<String> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 1);
-
-    List<List<Tuple2<String, Integer>>> pairInputData1 =
-      Arrays.asList(Arrays.asList(new Tuple2<>("x", 1)));
-    List<List<Tuple2<Double, Character>>> pairInputData2 =
-      Arrays.asList(Arrays.asList(new Tuple2<>(1.0, 'x')));
-    JavaPairDStream<String, Integer> pairStream1 = JavaPairDStream.fromJavaDStream(
-      JavaTestUtils.attachTestInputStream(ssc, pairInputData1, 1));
-    JavaPairDStream<Double, Character> pairStream2 = JavaPairDStream.fromJavaDStream(
-      JavaTestUtils.attachTestInputStream(ssc, pairInputData2, 1));
-
-    JavaDStream<Double> transformed1 = stream1.transformWith(stream2, (x, y, z) -> null);
-    JavaDStream<Double> transformed2 = stream1.transformWith(pairStream1,(x, y, z) -> null);
-
-    JavaPairDStream<Double, Double> transformed3 =
-      stream1.transformWithToPair(stream2,(x, y, z) -> null);
-
-    JavaPairDStream<Double, Double> transformed4 =
-      stream1.transformWithToPair(pairStream1,(x, y, z) -> null);
-
-    JavaDStream<Double> pairTransformed1 = pairStream1.transformWith(stream2,(x, y, z) -> null);
-
-    JavaDStream<Double> pairTransformed2_ =
-      pairStream1.transformWith(pairStream1,(x, y, z) -> null);
-
-    JavaPairDStream<Double, Double> pairTransformed3 =
-      pairStream1.transformWithToPair(stream2,(x, y, z) -> null);
-
-    JavaPairDStream<Double, Double> pairTransformed4 =
-      pairStream1.transformWithToPair(pairStream2,(x, y, z) -> null);
-  }
-
-  @Test
-  public void testStreamingContextTransform() {
-    List<List<Integer>> stream1input = Arrays.asList(
-      Arrays.asList(1),
-      Arrays.asList(2)
-    );
-
-    List<List<Integer>> stream2input = Arrays.asList(
-      Arrays.asList(3),
-      Arrays.asList(4)
-    );
-
-    List<List<Tuple2<Integer, String>>> pairStream1input = Arrays.asList(
-      Arrays.asList(new Tuple2<>(1, "x")),
-      Arrays.asList(new Tuple2<>(2, "y"))
-    );
-
-    List<List<Tuple2<Integer, Tuple2<Integer, String>>>> expected = Arrays.asList(
-      Arrays.asList(new Tuple2<>(1, new Tuple2<>(1, "x"))),
-      Arrays.asList(new Tuple2<>(2, new Tuple2<>(2, "y")))
-    );
-
-    JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, stream1input, 1);
-    JavaDStream<Integer> stream2 = JavaTestUtils.attachTestInputStream(ssc, stream2input, 1);
-    JavaPairDStream<Integer, String> pairStream1 = JavaPairDStream.fromJavaDStream(
-      JavaTestUtils.attachTestInputStream(ssc, pairStream1input, 1));
-
-    List<JavaDStream<?>> listOfDStreams1 = Arrays.<JavaDStream<?>>asList(stream1, stream2);
-
-    // This is just to test whether this transform to JavaStream compiles
-    JavaDStream<Long> transformed1 = ssc.transform(
-      listOfDStreams1, (List<JavaRDD<?>> listOfRDDs, Time time) -> {
-      Assert.assertEquals(2, listOfRDDs.size());
-      return null;
-    });
-
-    List<JavaDStream<?>> listOfDStreams2 =
-      Arrays.<JavaDStream<?>>asList(stream1, stream2, pairStream1.toJavaDStream());
-
-    JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transformToPair(
-      listOfDStreams2, (List<JavaRDD<?>> listOfRDDs, Time time) -> {
-      Assert.assertEquals(3, listOfRDDs.size());
-      JavaRDD<Integer> rdd1 = (JavaRDD<Integer>) listOfRDDs.get(0);
-      JavaRDD<Integer> rdd2 = (JavaRDD<Integer>) listOfRDDs.get(1);
-      JavaRDD<Tuple2<Integer, String>> rdd3 = (JavaRDD<Tuple2<Integer, String>>) listOfRDDs.get(2);
-      JavaPairRDD<Integer, String> prdd3 = JavaPairRDD.fromJavaRDD(rdd3);
-      PairFunction<Integer, Integer, Integer> mapToTuple =
-        (Integer i) -> new Tuple2<>(i, i);
-      return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3);
-    });
-    JavaTestUtils.attachTestOutputStream(transformed2);
-    List<List<Tuple2<Integer, Tuple2<Integer, String>>>> result =
-      JavaTestUtils.runStreams(ssc, 2, 2);
-    Assert.assertEquals(expected, result);
-  }
-
-  @Test
-  public void testFlatMap() {
-    List<List<String>> inputData = Arrays.asList(
-      Arrays.asList("go", "giants"),
-      Arrays.asList("boo", "dodgers"),
-      Arrays.asList("athletics"));
-
-    List<List<String>> expected = Arrays.asList(
-      Arrays.asList("g", "o", "g", "i", "a", "n", "t", "s"),
-      Arrays.asList("b", "o", "o", "d", "o", "d", "g", "e", "r", "s"),
-      Arrays.asList("a", "t", "h", "l", "e", "t", "i", "c", "s"));
-
-    JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
-    JavaDStream<String> flatMapped = stream.flatMap(
-        s -> Lists.newArrayList(s.split("(?!^)")).iterator());
-    JavaTestUtils.attachTestOutputStream(flatMapped);
-    List<List<String>> result = JavaTestUtils.runStreams(ssc, 3, 3);
-
-    assertOrderInvariantEquals(expected, result);
-  }
-
-  @Test
-  public void testPairFlatMap() {
-    List<List<String>> inputData = Arrays.asList(
-      Arrays.asList("giants"),
-      Arrays.asList("dodgers"),
-      Arrays.asList("athletics"));
-
-    List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
-      Arrays.asList(
-        new Tuple2<>(6, "g"),
-        new Tuple2<>(6, "i"),
-        new Tuple2<>(6, "a"),
-        new Tuple2<>(6, "n"),
-        new Tuple2<>(6, "t"),
-        new Tuple2<>(6, "s")),
-      Arrays.asList(
-        new Tuple2<>(7, "d"),
-        new Tuple2<>(7, "o"),
-        new Tuple2<>(7, "d"),
-        new Tuple2<>(7, "g"),
-        new Tuple2<>(7, "e"),
-        new Tuple2<>(7, "r"),
-        new Tuple2<>(7, "s")),
-      Arrays.asList(
-        new Tuple2<>(9, "a"),
-        new Tuple2<>(9, "t"),
-        new Tuple2<>(9, "h"),
-        new Tuple2<>(9, "l"),
-        new Tuple2<>(9, "e"),
-        new Tuple2<>(9, "t"),
-        new Tuple2<>(9, "i"),
-        new Tuple2<>(9, "c"),
-        new Tuple2<>(9, "s")));
-
-    JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
-    JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(s -> {
-      List<Tuple2<Integer, String>> out = Lists.newArrayList();
-      for (String letter : s.split("(?!^)")) {
-        out.add(new Tuple2<>(s.length(), letter));
-      }
-      return out.iterator();
-    });
-
-    JavaTestUtils.attachTestOutputStream(flatMapped);
-    List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
-
-    Assert.assertEquals(expected, result);
-  }
-
-  /*
-   * Performs an order-invariant comparison of lists representing two RDD streams. This allows
-   * us to account for ordering variation within individual RDD's which occurs during windowing.
-   */
-  public static <T extends Comparable<T>> void assertOrderInvariantEquals(
-    List<List<T>> expected, List<List<T>> actual) {
-    expected.forEach(list -> Collections.sort(list));
-    List<List<T>> sortedActual = new ArrayList<>();
-    actual.forEach(list -> {
-        List<T> sortedList = new ArrayList<>(list);
-        Collections.sort(sortedList);
-        sortedActual.add(sortedList);
-    });
-    Assert.assertEquals(expected, sortedActual);
-  }
-
-  @Test
-  public void testPairFilter() {
-    List<List<String>> inputData = Arrays.asList(
-      Arrays.asList("giants", "dodgers"),
-      Arrays.asList("yankees", "red sox"));
-
-    List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
-      Arrays.asList(new Tuple2<>("giants", 6)),
-      Arrays.asList(new Tuple2<>("yankees", 7)));
-
-    JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
-    JavaPairDStream<String, Integer> pairStream =
-      stream.mapToPair(x -> new Tuple2<>(x, x.length()));
-    JavaPairDStream<String, Integer> filtered = pairStream.filter(x -> x._1().contains("a"));
-    JavaTestUtils.attachTestOutputStream(filtered);
-    List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
-    Assert.assertEquals(expected, result);
-  }
-
-  List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList(
-    Arrays.asList(new Tuple2<>("california", "dodgers"),
-      new Tuple2<>("california", "giants"),
-      new Tuple2<>("new york", "yankees"),
-      new Tuple2<>("new york", "mets")),
-    Arrays.asList(new Tuple2<>("california", "sharks"),
-      new Tuple2<>("california", "ducks"),
-      new Tuple2<>("new york", "rangers"),
-      new Tuple2<>("new york", "islanders")));
-
-  List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList(
-    Arrays.asList(
-      new Tuple2<>("california", 1),
-      new Tuple2<>("california", 3),
-      new Tuple2<>("new york", 4),
-      new Tuple2<>("new york", 1)),
-    Arrays.asList(
-      new Tuple2<>("california", 5),
-      new Tuple2<>("california", 5),
-      new Tuple2<>("new york", 3),
-      new Tuple2<>("new york", 1)));
-
-  @Test
-  public void testPairMap() { // Maps pair -> pair of different type
-    List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
-
-    List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
-      Arrays.asList(
-        new Tuple2<>(1, "california"),
-        new Tuple2<>(3, "california"),
-        new Tuple2<>(4, "new york"),
-        new Tuple2<>(1, "new york")),
-      Arrays.asList(
-        new Tuple2<>(5, "california"),
-        new Tuple2<>(5, "california"),
-        new Tuple2<>(3, "new york"),
-        new Tuple2<>(1, "new york")));
-
-    JavaDStream<Tuple2<String, Integer>> stream =
-      JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
-    JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
-    JavaPairDStream<Integer, String> reversed = pairStream.mapToPair(x -> x.swap());
-    JavaTestUtils.attachTestOutputStream(reversed);
-    List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
-    Assert.assertEquals(expected, result);
-  }
-
-  @Test
-  public void testPairMapPartitions() { // Maps pair -> pair of different type
-    List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
-
-    List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
-      Arrays.asList(
-        new Tuple2<>(1, "california"),
-        new Tuple2<>(3, "california"),
-        new Tuple2<>(4, "new york"),
-        new Tuple2<>(1, "new york")),
-      Arrays.asList(
-        new Tuple2<>(5, "california"),
-        new Tuple2<>(5, "california"),
-        new Tuple2<>(3, "new york"),
-        new Tuple2<>(1, "new york")));
-
-    JavaDStream<Tuple2<String, Integer>> stream =
-      JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
-    JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
-    JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair(in -> {
-      LinkedList<Tuple2<Integer, String>> out = new LinkedList<>();
-      while (in.hasNext()) {
-        Tuple2<String, Integer> next = in.next();
-        out.add(next.swap());
-      }
-      return out.iterator();
-    });
-
-    JavaTestUtils.attachTestOutputStream(reversed);
-    List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
-    Assert.assertEquals(expected, result);
-  }
-
-  @Test
-  public void testPairMap2() { // Maps pair -> single
-    List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
-
-    List<List<Integer>> expected = Arrays.asList(
-      Arrays.asList(1, 3, 4, 1),
-      Arrays.asList(5, 5, 3, 1));
-
-    JavaDStream<Tuple2<String, Integer>> stream =
-      JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
-    JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
-    JavaDStream<Integer> reversed = pairStream.map(in -> in._2());
-    JavaTestUtils.attachTestOutputStream(reversed);
-    List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
-    Assert.assertEquals(expected, result);
-  }
-
-  @Test
-  public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair
-    List<List<Tuple2<String, Integer>>> inputData = Arrays.asList(
-      Arrays.asList(
-        new Tuple2<>("hi", 1),
-        new Tuple2<>("ho", 2)),
-      Arrays.asList(
-        new Tuple2<>("hi", 1),
-        new Tuple2<>("ho", 2)));
-
-    List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
-      Arrays.asList(
-        new Tuple2<>(1, "h"),
-        new Tuple2<>(1, "i"),
-        new Tuple2<>(2, "h"),
-        new Tuple2<>(2, "o")),
-      Arrays.asList(
-        new Tuple2<>(1, "h"),
-        new Tuple2<>(1, "i"),
-        new Tuple2<>(2, "h"),
-        new Tuple2<>(2, "o")));
-
-    JavaDStream<Tuple2<String, Integer>> stream =
-      JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
-    JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
-    JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair(in -> {
-      List<Tuple2<Integer, String>> out = new LinkedList<>();
-      for (Character s : in._1().toCharArray()) {
-        out.add(new Tuple2<>(in._2(), s.toString()));
-      }
-      return out.iterator();
-    });
-
-    JavaTestUtils.attachTestOutputStream(flatMapped);
-    List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
-    Assert.assertEquals(expected, result);
-  }
-
-  @Test
-  public void testPairReduceByKey() {
-    List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
-
-    List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
-      Arrays.asList(
-        new Tuple2<>("california", 4),
-        new Tuple2<>("new york", 5)),
-      Arrays.asList(
-        new Tuple2<>("california", 10),
-        new Tuple2<>("new york", 4)));
-
-    JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
-      ssc, inputData, 1);
-    JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
-
-    JavaPairDStream<String, Integer> reduced = pairStream.reduceByKey((x, y) -> x + y);
-
-    JavaTestUtils.attachTestOutputStream(reduced);
-    List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
-    Assert.assertEquals(expected, result);
-  }
-
-  @Test
-  public void testCombineByKey() {
-    List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
-
-    List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
-      Arrays.asList(
-        new Tuple2<>("california", 4),
-        new Tuple2<>("new york", 5)),
-      Arrays.asList(
-        new Tuple2<>("california", 10),
-        new Tuple2<>("new york", 4)));
-
-    JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
-      ssc, inputData, 1);
-    JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
-
-    JavaPairDStream<String, Integer> combined = pairStream.<Integer>combineByKey(i -> i,
-      (x, y) -> x + y, (x, y) -> x + y, new HashPartitioner(2));
-
-    JavaTestUtils.attachTestOutputStream(combined);
-    List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
-    Assert.assertEquals(expected, result);
-  }
-
-  @Test
-  public void testReduceByKeyAndWindow() {
-    List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
-
-    List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
-      Arrays.asList(new Tuple2<>("california", 4),
-        new Tuple2<>("new york", 5)),
-      Arrays.asList(new Tuple2<>("california", 14),
-        new Tuple2<>("new york", 9)),
-      Arrays.asList(new Tuple2<>("california", 10),
-        new Tuple2<>("new york", 4)));
-
-    JavaDStream<Tuple2<String, Integer>> stream =
-      JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
-    JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
-
-    JavaPairDStream<String, Integer> reduceWindowed =
-      pairStream.reduceByKeyAndWindow((x, y) -> x + y, new Duration(2000), new Duration(1000));
-    JavaTestUtils.attachTestOutputStream(reduceWindowed);
-    List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
-
-    Assert.assertEquals(expected, result);
-  }
-
-  @Test
-  public void testUpdateStateByKey() {
-    List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
-
-    List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
-      Arrays.asList(new Tuple2<>("california", 4),
-        new Tuple2<>("new york", 5)),
-      Arrays.asList(new Tuple2<>("california", 14),
-        new Tuple2<>("new york", 9)),
-      Arrays.asList(new Tuple2<>("california", 14),
-        new Tuple2<>("new york", 9)));
-
-    JavaDStream<Tuple2<String, Integer>> stream =
-      JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
-    JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
-
-    JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey((values, state) -> {
-      int out = 0;
-      if (state.isPresent()) {
-        out = out + state.get();
-      }
-      for (Integer v : values) {
-        out = out + v;
-      }
-      return Optional.of(out);
-    });
-
-    JavaTestUtils.attachTestOutputStream(updated);
-    List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
-
-    Assert.assertEquals(expected, result);
-  }
-
-  @Test
-  public void testReduceByKeyAndWindowWithInverse() {
-    List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
-
-    List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
-      Arrays.asList(new Tuple2<>("california", 4),
-        new Tuple2<>("new york", 5)),
-      Arrays.asList(new Tuple2<>("california", 14),
-        new Tuple2<>("new york", 9)),
-      Arrays.asList(new Tuple2<>("california", 10),
-        new Tuple2<>("new york", 4)));
-
-    JavaDStream<Tuple2<String, Integer>> stream =
-      JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
-    JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
-
-    JavaPairDStream<String, Integer> reduceWindowed =
-      pairStream.reduceByKeyAndWindow((x, y) -> x + y, (x, y) -> x - y, new Duration(2000),
-        new Duration(1000));
-    JavaTestUtils.attachTestOutputStream(reduceWindowed);
-    List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
-
-    Assert.assertEquals(expected, result);
-  }
-
-  @Test
-  public void testPairTransform() {
-    List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
-      Arrays.asList(
-        new Tuple2<>(3, 5),
-        new Tuple2<>(1, 5),
-        new Tuple2<>(4, 5),
-        new Tuple2<>(2, 5)),
-      Arrays.asList(
-        new Tuple2<>(2, 5),
-        new Tuple2<>(3, 5),
-        new Tuple2<>(4, 5),
-        new Tuple2<>(1, 5)));
-
-    List<List<Tuple2<Integer, Integer>>> expected = Arrays.asList(
-      Arrays.asList(
-        new Tuple2<>(1, 5),
-        new Tuple2<>(2, 5),
-        new Tuple2<>(3, 5),
-        new Tuple2<>(4, 5)),
-      Arrays.asList(
-        new Tuple2<>(1, 5),
-        new Tuple2<>(2, 5),
-        new Tuple2<>(3, 5),
-        new Tuple2<>(4, 5)));
-
-    JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream(
-      ssc, inputData, 1);
-    JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
-
-    JavaPairDStream<Integer, Integer> sorted = pairStream.transformToPair(in -> in.sortByKey());
-
-    JavaTestUtils.attachTestOutputStream(sorted);
-    List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
-    Assert.assertEquals(expected, result);
-  }
-
-  @Test
-  public void testPairToNormalRDDTransform() {
-    List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
-      Arrays.asList(
-        new Tuple2<>(3, 5),
-        new Tuple2<>(1, 5),
-        new Tuple2<>(4, 5),
-        new Tuple2<>(2, 5)),
-      Arrays.asList(
-        new Tuple2<>(2, 5),
-        new Tuple2<>(3, 5),
-        new Tuple2<>(4, 5),
-        new Tuple2<>(1, 5)));
-
-    List<List<Integer>> expected = Arrays.asList(
-      Arrays.asList(3, 1, 4, 2),
-      Arrays.asList(2, 3, 4, 1));
-
-    JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream(
-      ssc, inputData, 1);
-    JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
-    JavaDStream<Integer> firstParts = pairStream.transform(in -> in.map(x -> x._1()));
-    JavaTestUtils.attachTestOutputStream(firstParts);
-    List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
-    Assert.assertEquals(expected, result);
-  }
-
-  @Test
-  public void testMapValues() {
-    List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
-
-    List<List<Tuple2<String, String>>> expected = Arrays.asList(
-      Arrays.asList(new Tuple2<>("california", "DODGERS"),
-        new Tuple2<>("california", "GIANTS"),
-        new Tuple2<>("new york", "YANKEES"),
-        new Tuple2<>("new york", "METS")),
-      Arrays.asList(new Tuple2<>("california", "SHARKS"),
-        new Tuple2<>("california", "DUCKS"),
-        new Tuple2<>("new york", "RANGERS"),
-        new Tuple2<>("new york", "ISLANDERS")));
-
-    JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
-      ssc, inputData, 1);
-    JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
-
-    JavaPairDStream<String, String> mapped = pairStream.mapValues(String::toUpperCase);
-    JavaTestUtils.attachTestOutputStream(mapped);
-    List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
-    Assert.assertEquals(expected, result);
-  }
-
-  @Test
-  public void testFlatMapValues() {
-    List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
-
-    List<List<Tuple2<String, String>>> expected = Arrays.asList(
-      Arrays.asList(new Tuple2<>("california", "dodgers1"),
-        new Tuple2<>("california", "dodgers2"),
-        new Tuple2<>("california", "giants1"),
-        new Tuple2<>("california", "giants2"),
-        new Tuple2<>("new york", "yankees1"),
-        new Tuple2<>("new york", "yankees2"),
-        new Tuple2<>("new york", "mets1"),
-        new Tuple2<>("new york", "mets2")),
-      Arrays.asList(new Tuple2<>("california", "sharks1"),
-        new Tuple2<>("california", "sharks2"),
-        new Tuple2<>("california", "ducks1"),
-        new Tuple2<>("california", "ducks2"),
-        new Tuple2<>("new york", "rangers1"),
-        new Tuple2<>("new york", "rangers2"),
-        new Tuple2<>("new york", "islanders1"),
-        new Tuple2<>("new york", "islanders2")));
-
-    JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
-      ssc, inputData, 1);
-    JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
-
-    JavaPairDStream<String, String> flatMapped =
-      pairStream.flatMapValues(in -> Arrays.asList(in + "1", in + "2"));
-    JavaTestUtils.attachTestOutputStream(flatMapped);
-    List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-    Assert.assertEquals(expected, result);
-  }
-
-  /**
-   * This test is only for testing the APIs. It's not necessary to run it.
-   */
-  public void testMapWithStateAPI() {
-    JavaPairRDD<String, Boolean> initialRDD = null;
-    JavaPairDStream<String, Integer> wordsDstream = null;
-
-    JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream =
-        wordsDstream.mapWithState(
-            StateSpec.<String, Integer, Boolean, Double>function((time, key, value, state) -> {
-              // Use all State's methods here
-              state.exists();
-              state.get();
-              state.isTimingOut();
-              state.remove();
-              state.update(true);
-              return Optional.of(2.0);
-            }).initialState(initialRDD)
-                .numPartitions(10)
-                .partitioner(new HashPartitioner(10))
-                .timeout(Durations.seconds(10)));
-
-    JavaPairDStream<String, Boolean> emittedRecords = stateDstream.stateSnapshots();
-
-    JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream2 =
-        wordsDstream.mapWithState(
-            StateSpec.<String, Integer, Boolean, Double>function((key, value, state) -> {
-              state.exists();
-              state.get();
-              state.isTimingOut();
-              state.remove();
-              state.update(true);
-              return 2.0;
-            }).initialState(initialRDD)
-                .numPartitions(10)
-                .partitioner(new HashPartitioner(10))
-                .timeout(Durations.seconds(10)));
-
-    JavaPairDStream<String, Boolean> mappedDStream = stateDstream2.stateSnapshots();
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/external/java8-tests/src/test/java/test/org/apache/spark/java8/sql/Java8DatasetAggregatorSuite.java
----------------------------------------------------------------------
diff --git a/external/java8-tests/src/test/java/test/org/apache/spark/java8/sql/Java8DatasetAggregatorSuite.java b/external/java8-tests/src/test/java/test/org/apache/spark/java8/sql/Java8DatasetAggregatorSuite.java
deleted file mode 100644
index 10d25fa..0000000
--- a/external/java8-tests/src/test/java/test/org/apache/spark/java8/sql/Java8DatasetAggregatorSuite.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package test.org.apache.spark.java8.sql;
-
-import java.util.Arrays;
-
-import org.junit.Assert;
-import org.junit.Test;
-import scala.Tuple2;
-
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.KeyValueGroupedDataset;
-import org.apache.spark.sql.expressions.javalang.typed;
-import test.org.apache.spark.sql.JavaDatasetAggregatorSuiteBase;
-
-/**
- * Suite that replicates tests in JavaDatasetAggregatorSuite using lambda syntax.
- */
-public class Java8DatasetAggregatorSuite extends JavaDatasetAggregatorSuiteBase {
-  @Test
-  public void testTypedAggregationAverage() {
-    KeyValueGroupedDataset<String, Tuple2<String, Integer>> grouped = generateGroupedDataset();
-    Dataset<Tuple2<String, Double>> agged = grouped.agg(typed.avg(v -> (double)(v._2() * 2)));
-    Assert.assertEquals(Arrays.asList(tuple2("a", 3.0), tuple2("b", 6.0)), agged.collectAsList());
-  }
-
-  @Test
-  public void testTypedAggregationCount() {
-    KeyValueGroupedDataset<String, Tuple2<String, Integer>> grouped = generateGroupedDataset();
-    Dataset<Tuple2<String, Long>> agged = grouped.agg(typed.count(v -> v));
-    Assert.assertEquals(Arrays.asList(tuple2("a", 2L), tuple2("b", 1L)), agged.collectAsList());
-  }
-
-  @Test
-  public void testTypedAggregationSumDouble() {
-    KeyValueGroupedDataset<String, Tuple2<String, Integer>> grouped = generateGroupedDataset();
-    Dataset<Tuple2<String, Double>> agged = grouped.agg(typed.sum(v -> (double)v._2()));
-    Assert.assertEquals(Arrays.asList(tuple2("a", 3.0), tuple2("b", 3.0)), agged.collectAsList());
-  }
-
-  @Test
-  public void testTypedAggregationSumLong() {
-    KeyValueGroupedDataset<String, Tuple2<String, Integer>> grouped = generateGroupedDataset();
-    Dataset<Tuple2<String, Long>> agged = grouped.agg(typed.sumLong(v -> (long)v._2()));
-    Assert.assertEquals(Arrays.asList(tuple2("a", 3L), tuple2("b", 3L)), agged.collectAsList());
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/external/java8-tests/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/external/java8-tests/src/test/resources/log4j.properties b/external/java8-tests/src/test/resources/log4j.properties
deleted file mode 100644
index 3706a6e..0000000
--- a/external/java8-tests/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Set everything to be logged to the file target/unit-tests.log
-log4j.rootCategory=INFO, file
-log4j.appender.file=org.apache.log4j.FileAppender
-log4j.appender.file.append=true
-log4j.appender.file.file=target/unit-tests.log
-log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
-
-# Ignore messages below warning level from Jetty, because it's a bit verbose
-log4j.logger.org.spark_project.jetty=WARN

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/external/java8-tests/src/test/scala/test/org/apache/spark/java8/JDK8ScalaSuite.scala
----------------------------------------------------------------------
diff --git a/external/java8-tests/src/test/scala/test/org/apache/spark/java8/JDK8ScalaSuite.scala b/external/java8-tests/src/test/scala/test/org/apache/spark/java8/JDK8ScalaSuite.scala
deleted file mode 100644
index c4042e4..0000000
--- a/external/java8-tests/src/test/scala/test/org/apache/spark/java8/JDK8ScalaSuite.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package test.org.apache.spark.java8
-
-import org.apache.spark.SharedSparkContext
-import org.apache.spark.SparkFunSuite
-
-/**
- * Test cases where JDK8-compiled Scala user code is used with Spark.
- */
-class JDK8ScalaSuite extends SparkFunSuite with SharedSparkContext {
-  test("basic RDD closure test (SPARK-6152)") {
-    sc.parallelize(1 to 1000).map(x => x * x).count()
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 02b2311..9c5dcec 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -259,7 +259,7 @@ private[kafka010] class KafkaSource(
       val preferredLoc = if (numExecutors > 0) {
         // This allows cached KafkaConsumers in the executors to be re-used to read the same
         // partition in every batch.
-        Some(sortedExecutors(floorMod(tp.hashCode, numExecutors)))
+        Some(sortedExecutors(Math.floorMod(tp.hashCode, numExecutors)))
       } else None
       KafkaSourceRDDOffsetRange(tp, fromOffset, untilOffset, preferredLoc)
     }.filter { range =>
@@ -347,5 +347,4 @@ private[kafka010] object KafkaSource {
     if (a.host == b.host) { a.executorId > b.executorId } else { a.host > b.host }
   }
 
-  def floorMod(a: Long, b: Int): Int = ((a % b).toInt + b) % b
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
index bf8adbe..4c6e2ce 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
@@ -145,11 +145,6 @@ private[spark] class KafkaRDD[K, V](
       a.host > b.host
     }
 
-  /**
-   * Non-negative modulus, from java 8 math
-   */
-  private def floorMod(a: Int, b: Int): Int = ((a % b) + b) % b
-
   override def getPreferredLocations(thePart: Partition): Seq[String] = {
     // The intention is best-effort consistent executor for a given topicpartition,
     // so that caching consumers can be effective.
@@ -164,7 +159,7 @@ private[spark] class KafkaRDD[K, V](
       Seq()
     } else {
       // execs is sorted, tp.hashCode depends only on topic and partition, so consistent index
-      val index = this.floorMod(tp.hashCode, execs.length)
+      val index = Math.floorMod(tp.hashCode, execs.length)
       val chosen = execs(index)
       Seq(chosen.toString)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
----------------------------------------------------------------------
diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
index 0622fef..bc8d603 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
@@ -104,15 +104,12 @@ abstract class AbstractCommandBuilder {
     // Load extra JAVA_OPTS from conf/java-opts, if it exists.
     File javaOpts = new File(join(File.separator, getConfDir(), "java-opts"));
     if (javaOpts.isFile()) {
-      BufferedReader br = new BufferedReader(new InputStreamReader(
-          new FileInputStream(javaOpts), StandardCharsets.UTF_8));
-      try {
+      try (BufferedReader br = new BufferedReader(new InputStreamReader(
+          new FileInputStream(javaOpts), StandardCharsets.UTF_8))) {
         String line;
         while ((line = br.readLine()) != null) {
           addOptionString(cmd, line);
         }
-      } finally {
-        br.close();
       }
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
----------------------------------------------------------------------
diff --git a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
index c0779e1..12bf29d 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
@@ -18,7 +18,6 @@
 package org.apache.spark.launcher;
 
 import java.io.IOException;
-import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.logging.Level;
@@ -103,14 +102,7 @@ class ChildProcAppHandle implements SparkAppHandle {
       try {
         childProc.exitValue();
       } catch (IllegalThreadStateException e) {
-        // Child is still alive. Try to use Java 8's "destroyForcibly()" if available,
-        // fall back to the old API if it's not there.
-        try {
-          Method destroy = childProc.getClass().getMethod("destroyForcibly");
-          destroy.invoke(childProc);
-        } catch (Exception inner) {
-          childProc.destroy();
-        }
+        childProc.destroyForcibly();
       } finally {
         childProc = null;
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java
----------------------------------------------------------------------
diff --git a/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java b/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java
index 250b2a8..e14c8aa 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java
@@ -313,27 +313,6 @@ class CommandBuilderUtils {
   }
 
   /**
-   * Adds the default perm gen size option for Spark if the VM requires it and the user hasn't
-   * set it.
-   */
-  static void addPermGenSizeOpt(List<String> cmd) {
-    // Don't set MaxPermSize for IBM Java, or Oracle Java 8 and later.
-    if (getJavaVendor() == JavaVendor.IBM) {
-      return;
-    }
-    if (javaMajorVersion(System.getProperty("java.version")) > 7) {
-      return;
-    }
-    for (String arg : cmd) {
-      if (arg.contains("-XX:MaxPermSize=")) {
-        return;
-      }
-    }
-
-    cmd.add("-XX:MaxPermSize=256m");
-  }
-
-  /**
    * Get the major version of the java version string supplied. This method
    * accepts any JEP-223-compliant strings (9-ea, 9+100), as well as legacy
    * version strings such as 1.7.0_79

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
----------------------------------------------------------------------
diff --git a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
index ae43f56..865d492 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
@@ -137,12 +137,7 @@ class LauncherServer implements Closeable {
       this.server = server;
       this.running = true;
 
-      this.serverThread = factory.newThread(new Runnable() {
-        @Override
-        public void run() {
-          acceptConnections();
-        }
-      });
+      this.serverThread = factory.newThread(this::acceptConnections);
       serverThread.start();
     } catch (IOException ioe) {
       close();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org