You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/02/19 17:38:01 UTC

[1/2] spark git commit: [SPARK-19533][EXAMPLES] Convert Java tests to use lambdas, Java 8 features

Repository: spark
Updated Branches:
  refs/heads/master ba8912e5f -> de14d35f7


http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaRecommendationExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRecommendationExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRecommendationExample.java
index f69aa4b..1ee68da 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRecommendationExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRecommendationExample.java
@@ -21,7 +21,6 @@ package org.apache.spark.examples.mllib;
 import scala.Tuple2;
 
 import org.apache.spark.api.java.*;
-import org.apache.spark.api.java.function.Function;
 import org.apache.spark.mllib.recommendation.ALS;
 import org.apache.spark.mllib.recommendation.MatrixFactorizationModel;
 import org.apache.spark.mllib.recommendation.Rating;
@@ -37,15 +36,12 @@ public class JavaRecommendationExample {
     // Load and parse the data
     String path = "data/mllib/als/test.data";
     JavaRDD<String> data = jsc.textFile(path);
-    JavaRDD<Rating> ratings = data.map(
-      new Function<String, Rating>() {
-        public Rating call(String s) {
-          String[] sarray = s.split(",");
-          return new Rating(Integer.parseInt(sarray[0]), Integer.parseInt(sarray[1]),
-            Double.parseDouble(sarray[2]));
-        }
-      }
-    );
+    JavaRDD<Rating> ratings = data.map(s -> {
+      String[] sarray = s.split(",");
+      return new Rating(Integer.parseInt(sarray[0]),
+        Integer.parseInt(sarray[1]),
+        Double.parseDouble(sarray[2]));
+    });
 
     // Build the recommendation model using ALS
     int rank = 10;
@@ -53,37 +49,19 @@ public class JavaRecommendationExample {
     MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), rank, numIterations, 0.01);
 
     // Evaluate the model on rating data
-    JavaRDD<Tuple2<Object, Object>> userProducts = ratings.map(
-      new Function<Rating, Tuple2<Object, Object>>() {
-        public Tuple2<Object, Object> call(Rating r) {
-          return new Tuple2<Object, Object>(r.user(), r.product());
-        }
-      }
-    );
+    JavaRDD<Tuple2<Object, Object>> userProducts =
+      ratings.map(r -> new Tuple2<>(r.user(), r.product()));
     JavaPairRDD<Tuple2<Integer, Integer>, Double> predictions = JavaPairRDD.fromJavaRDD(
-      model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD().map(
-        new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Double>>() {
-          public Tuple2<Tuple2<Integer, Integer>, Double> call(Rating r){
-            return new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating());
-          }
-        }
-      ));
-    JavaRDD<Tuple2<Double, Double>> ratesAndPreds =
-      JavaPairRDD.fromJavaRDD(ratings.map(
-        new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Double>>() {
-          public Tuple2<Tuple2<Integer, Integer>, Double> call(Rating r){
-            return new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating());
-          }
-        }
-      )).join(predictions).values();
-    double MSE = JavaDoubleRDD.fromRDD(ratesAndPreds.map(
-      new Function<Tuple2<Double, Double>, Object>() {
-        public Object call(Tuple2<Double, Double> pair) {
-          Double err = pair._1() - pair._2();
-          return err * err;
-        }
-      }
-    ).rdd()).mean();
+      model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD()
+          .map(r -> new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating()))
+    );
+    JavaRDD<Tuple2<Double, Double>> ratesAndPreds = JavaPairRDD.fromJavaRDD(
+        ratings.map(r -> new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating())))
+      .join(predictions).values();
+    double MSE = ratesAndPreds.mapToDouble(pair -> {
+      double err = pair._1() - pair._2();
+      return err * err;
+    }).mean();
     System.out.println("Mean Squared Error = " + MSE);
 
     // Save and load model

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaRegressionMetricsExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRegressionMetricsExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRegressionMetricsExample.java
index b3e5c04..7bb9993 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRegressionMetricsExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRegressionMetricsExample.java
@@ -21,7 +21,6 @@ package org.apache.spark.examples.mllib;
 import scala.Tuple2;
 
 import org.apache.spark.api.java.*;
-import org.apache.spark.api.java.function.Function;
 import org.apache.spark.mllib.linalg.Vectors;
 import org.apache.spark.mllib.regression.LabeledPoint;
 import org.apache.spark.mllib.regression.LinearRegressionModel;
@@ -38,34 +37,24 @@ public class JavaRegressionMetricsExample {
     // Load and parse the data
     String path = "data/mllib/sample_linear_regression_data.txt";
     JavaRDD<String> data = sc.textFile(path);
-    JavaRDD<LabeledPoint> parsedData = data.map(
-      new Function<String, LabeledPoint>() {
-        public LabeledPoint call(String line) {
-          String[] parts = line.split(" ");
-          double[] v = new double[parts.length - 1];
-          for (int i = 1; i < parts.length - 1; i++) {
-            v[i - 1] = Double.parseDouble(parts[i].split(":")[1]);
-          }
-          return new LabeledPoint(Double.parseDouble(parts[0]), Vectors.dense(v));
-        }
+    JavaRDD<LabeledPoint> parsedData = data.map(line -> {
+      String[] parts = line.split(" ");
+      double[] v = new double[parts.length - 1];
+      for (int i = 1; i < parts.length - 1; i++) {
+        v[i - 1] = Double.parseDouble(parts[i].split(":")[1]);
       }
-    );
+      return new LabeledPoint(Double.parseDouble(parts[0]), Vectors.dense(v));
+    });
     parsedData.cache();
 
     // Building the model
     int numIterations = 100;
-    final LinearRegressionModel model = LinearRegressionWithSGD.train(JavaRDD.toRDD(parsedData),
+    LinearRegressionModel model = LinearRegressionWithSGD.train(JavaRDD.toRDD(parsedData),
       numIterations);
 
     // Evaluate model on training examples and compute training error
-    JavaRDD<Tuple2<Object, Object>> valuesAndPreds = parsedData.map(
-      new Function<LabeledPoint, Tuple2<Object, Object>>() {
-        public Tuple2<Object, Object> call(LabeledPoint point) {
-          double prediction = model.predict(point.features());
-          return new Tuple2<Object, Object>(prediction, point.label());
-        }
-      }
-    );
+    JavaPairRDD<Object, Object> valuesAndPreds = parsedData.mapToPair(point ->
+      new Tuple2<>(model.predict(point.features()), point.label()));
 
     // Instantiate metrics object
     RegressionMetrics metrics = new RegressionMetrics(valuesAndPreds.rdd());

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaSVMWithSGDExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaSVMWithSGDExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaSVMWithSGDExample.java
index 720b167..866a221 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaSVMWithSGDExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaSVMWithSGDExample.java
@@ -24,7 +24,6 @@ import org.apache.spark.SparkContext;
 import scala.Tuple2;
 
 import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
 import org.apache.spark.mllib.classification.SVMModel;
 import org.apache.spark.mllib.classification.SVMWithSGD;
 import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics;
@@ -50,20 +49,14 @@ public class JavaSVMWithSGDExample {
 
     // Run training algorithm to build the model.
     int numIterations = 100;
-    final SVMModel model = SVMWithSGD.train(training.rdd(), numIterations);
+    SVMModel model = SVMWithSGD.train(training.rdd(), numIterations);
 
     // Clear the default threshold.
     model.clearThreshold();
 
     // Compute raw scores on the test set.
-    JavaRDD<Tuple2<Object, Object>> scoreAndLabels = test.map(
-      new Function<LabeledPoint, Tuple2<Object, Object>>() {
-        public Tuple2<Object, Object> call(LabeledPoint p) {
-          Double score = model.predict(p.features());
-          return new Tuple2<Object, Object>(score, p.label());
-        }
-      }
-    );
+    JavaRDD<Tuple2<Object, Object>> scoreAndLabels = test.map(p ->
+      new Tuple2<>(model.predict(p.features()), p.label()));
 
     // Get evaluation metrics.
     BinaryClassificationMetrics metrics =

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaSimpleFPGrowth.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaSimpleFPGrowth.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaSimpleFPGrowth.java
index 7f4fe60..f9198e7 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaSimpleFPGrowth.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaSimpleFPGrowth.java
@@ -23,9 +23,6 @@ import java.util.List;
 
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
-// $example off$
-import org.apache.spark.api.java.function.Function;
-// $example on$
 import org.apache.spark.mllib.fpm.AssociationRules;
 import org.apache.spark.mllib.fpm.FPGrowth;
 import org.apache.spark.mllib.fpm.FPGrowthModel;
@@ -42,14 +39,7 @@ public class JavaSimpleFPGrowth {
     // $example on$
     JavaRDD<String> data = sc.textFile("data/mllib/sample_fpgrowth.txt");
 
-    JavaRDD<List<String>> transactions = data.map(
-      new Function<String, List<String>>() {
-        public List<String> call(String line) {
-          String[] parts = line.split(" ");
-          return Arrays.asList(parts);
-        }
-      }
-    );
+    JavaRDD<List<String>> transactions = data.map(line -> Arrays.asList(line.split(" ")));
 
     FPGrowth fpg = new FPGrowth()
       .setMinSupport(0.2)

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java
index cfaa577..4be702c 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java
@@ -17,10 +17,6 @@
 
 package org.apache.spark.examples.mllib;
 
-
-import org.apache.spark.api.java.function.VoidFunction;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
 // $example on$
 import org.apache.spark.mllib.stat.test.BinarySample;
 import org.apache.spark.mllib.stat.test.StreamingTest;
@@ -75,16 +71,12 @@ public class JavaStreamingTestExample {
     ssc.checkpoint(Utils.createTempDir(System.getProperty("java.io.tmpdir"), "spark").toString());
 
     // $example on$
-    JavaDStream<BinarySample> data = ssc.textFileStream(dataDir).map(
-      new Function<String, BinarySample>() {
-        @Override
-        public BinarySample call(String line) {
-          String[] ts = line.split(",");
-          boolean label = Boolean.parseBoolean(ts[0]);
-          double value = Double.parseDouble(ts[1]);
-          return new BinarySample(label, value);
-        }
-      });
+    JavaDStream<BinarySample> data = ssc.textFileStream(dataDir).map(line -> {
+      String[] ts = line.split(",");
+      boolean label = Boolean.parseBoolean(ts[0]);
+      double value = Double.parseDouble(ts[1]);
+      return new BinarySample(label, value);
+    });
 
     StreamingTest streamingTest = new StreamingTest()
       .setPeacePeriod(0)
@@ -98,21 +90,11 @@ public class JavaStreamingTestExample {
     // Stop processing if test becomes significant or we time out
     timeoutCounter = numBatchesTimeout;
 
-    out.foreachRDD(new VoidFunction<JavaRDD<StreamingTestResult>>() {
-      @Override
-      public void call(JavaRDD<StreamingTestResult> rdd) {
-        timeoutCounter -= 1;
-
-        boolean anySignificant = !rdd.filter(new Function<StreamingTestResult, Boolean>() {
-          @Override
-          public Boolean call(StreamingTestResult v) {
-            return v.pValue() < 0.05;
-          }
-        }).isEmpty();
-
-        if (timeoutCounter <= 0 || anySignificant) {
-          rdd.context().stop();
-        }
+    out.foreachRDD(rdd -> {
+      timeoutCounter -= 1;
+      boolean anySignificant = !rdd.filter(v -> v.pValue() < 0.05).isEmpty();
+      if (timeoutCounter <= 0 || anySignificant) {
+        rdd.context().stop();
       }
     });
 

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
index b687fae..adb96dd 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
@@ -139,11 +139,9 @@ public class JavaSQLDataSourceExample {
     // Parquet files can also be used to create a temporary view and then used in SQL statements
     parquetFileDF.createOrReplaceTempView("parquetFile");
     Dataset<Row> namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19");
-    Dataset<String> namesDS = namesDF.map(new MapFunction<Row, String>() {
-      public String call(Row row) {
-        return "Name: " + row.getString(0);
-      }
-    }, Encoders.STRING());
+    Dataset<String> namesDS = namesDF.map(
+        (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
+        Encoders.STRING());
     namesDS.show();
     // +------------+
     // |       value|

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java
index c5770d1..8605852 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java
@@ -227,12 +227,9 @@ public class JavaSparkSQLExample {
     // Encoders for most common types are provided in class Encoders
     Encoder<Integer> integerEncoder = Encoders.INT();
     Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder);
-    Dataset<Integer> transformedDS = primitiveDS.map(new MapFunction<Integer, Integer>() {
-      @Override
-      public Integer call(Integer value) throws Exception {
-        return value + 1;
-      }
-    }, integerEncoder);
+    Dataset<Integer> transformedDS = primitiveDS.map(
+        (MapFunction<Integer, Integer>) value -> value + 1,
+        integerEncoder);
     transformedDS.collect(); // Returns [2, 3, 4]
 
     // DataFrames can be converted to a Dataset by providing a class. Mapping based on name
@@ -255,15 +252,12 @@ public class JavaSparkSQLExample {
     JavaRDD<Person> peopleRDD = spark.read()
       .textFile("examples/src/main/resources/people.txt")
       .javaRDD()
-      .map(new Function<String, Person>() {
-        @Override
-        public Person call(String line) throws Exception {
-          String[] parts = line.split(",");
-          Person person = new Person();
-          person.setName(parts[0]);
-          person.setAge(Integer.parseInt(parts[1].trim()));
-          return person;
-        }
+      .map(line -> {
+        String[] parts = line.split(",");
+        Person person = new Person();
+        person.setName(parts[0]);
+        person.setAge(Integer.parseInt(parts[1].trim()));
+        return person;
       });
 
     // Apply a schema to an RDD of JavaBeans to get a DataFrame
@@ -276,12 +270,9 @@ public class JavaSparkSQLExample {
 
     // The columns of a row in the result can be accessed by field index
     Encoder<String> stringEncoder = Encoders.STRING();
-    Dataset<String> teenagerNamesByIndexDF = teenagersDF.map(new MapFunction<Row, String>() {
-      @Override
-      public String call(Row row) throws Exception {
-        return "Name: " + row.getString(0);
-      }
-    }, stringEncoder);
+    Dataset<String> teenagerNamesByIndexDF = teenagersDF.map(
+        (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
+        stringEncoder);
     teenagerNamesByIndexDF.show();
     // +------------+
     // |       value|
@@ -290,12 +281,9 @@ public class JavaSparkSQLExample {
     // +------------+
 
     // or by field name
-    Dataset<String> teenagerNamesByFieldDF = teenagersDF.map(new MapFunction<Row, String>() {
-      @Override
-      public String call(Row row) throws Exception {
-        return "Name: " + row.<String>getAs("name");
-      }
-    }, stringEncoder);
+    Dataset<String> teenagerNamesByFieldDF = teenagersDF.map(
+        (MapFunction<Row, String>) row -> "Name: " + row.<String>getAs("name"),
+        stringEncoder);
     teenagerNamesByFieldDF.show();
     // +------------+
     // |       value|
@@ -324,12 +312,9 @@ public class JavaSparkSQLExample {
     StructType schema = DataTypes.createStructType(fields);
 
     // Convert records of the RDD (people) to Rows
-    JavaRDD<Row> rowRDD = peopleRDD.map(new Function<String, Row>() {
-      @Override
-      public Row call(String record) throws Exception {
-        String[] attributes = record.split(",");
-        return RowFactory.create(attributes[0], attributes[1].trim());
-      }
+    JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> {
+      String[] attributes = record.split(",");
+      return RowFactory.create(attributes[0], attributes[1].trim());
     });
 
     // Apply the schema to the RDD
@@ -343,12 +328,9 @@ public class JavaSparkSQLExample {
 
     // The results of SQL queries are DataFrames and support all the normal RDD operations
     // The columns of a row in the result can be accessed by field index or by field name
-    Dataset<String> namesDS = results.map(new MapFunction<Row, String>() {
-      @Override
-      public String call(Row row) throws Exception {
-        return "Name: " + row.getString(0);
-      }
-    }, Encoders.STRING());
+    Dataset<String> namesDS = results.map(
+        (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
+        Encoders.STRING());
     namesDS.show();
     // +-------------+
     // |        value|

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java b/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java
index 2fe1307..4763856 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java
@@ -90,12 +90,9 @@ public class JavaSparkHiveExample {
     Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key");
 
     // The items in DaraFrames are of type Row, which lets you to access each column by ordinal.
-    Dataset<String> stringsDS = sqlDF.map(new MapFunction<Row, String>() {
-      @Override
-      public String call(Row row) throws Exception {
-        return "Key: " + row.get(0) + ", Value: " + row.get(1);
-      }
-    }, Encoders.STRING());
+    Dataset<String> stringsDS = sqlDF.map(
+        (MapFunction<Row, String>) row -> "Key: " + row.get(0) + ", Value: " + row.get(1),
+        Encoders.STRING());
     stringsDS.show();
     // +--------------------+
     // |               value|

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredKafkaWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredKafkaWordCount.java
index 0f45cfe..4e02719 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredKafkaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredKafkaWordCount.java
@@ -25,7 +25,6 @@ import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.streaming.StreamingQuery;
 
 import java.util.Arrays;
-import java.util.Iterator;
 
 /**
  * Consumes messages from one or more topics in Kafka and does wordcount.
@@ -78,12 +77,9 @@ public final class JavaStructuredKafkaWordCount {
       .as(Encoders.STRING());
 
     // Generate running word count
-    Dataset<Row> wordCounts = lines.flatMap(new FlatMapFunction<String, String>() {
-      @Override
-      public Iterator<String> call(String x) {
-        return Arrays.asList(x.split(" ")).iterator();
-      }
-    }, Encoders.STRING()).groupBy("value").count();
+    Dataset<Row> wordCounts = lines.flatMap(
+        (FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(),
+        Encoders.STRING()).groupBy("value").count();
 
     // Start running the query that prints the running counts to the console
     StreamingQuery query = wordCounts.writeStream()

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java
index 5f342e1..3af7869 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java
@@ -21,7 +21,6 @@ import org.apache.spark.sql.*;
 import org.apache.spark.sql.streaming.StreamingQuery;
 
 import java.util.Arrays;
-import java.util.Iterator;
 
 /**
  * Counts words in UTF8 encoded, '\n' delimited text received from the network.
@@ -61,13 +60,9 @@ public final class JavaStructuredNetworkWordCount {
       .load();
 
     // Split the lines into words
-    Dataset<String> words = lines.as(Encoders.STRING())
-      .flatMap(new FlatMapFunction<String, String>() {
-        @Override
-        public Iterator<String> call(String x) {
-          return Arrays.asList(x.split(" ")).iterator();
-        }
-    }, Encoders.STRING());
+    Dataset<String> words = lines.as(Encoders.STRING()).flatMap(
+        (FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(),
+        Encoders.STRING());
 
     // Generate running word count
     Dataset<Row> wordCounts = words.groupBy("value").count();

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java
index 172d053..93ec5e2 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java
@@ -18,13 +18,11 @@ package org.apache.spark.examples.sql.streaming;
 
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.sql.*;
-import org.apache.spark.sql.functions;
 import org.apache.spark.sql.streaming.StreamingQuery;
 import scala.Tuple2;
 
 import java.sql.Timestamp;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 
 /**
@@ -86,16 +84,12 @@ public final class JavaStructuredNetworkWordCountWindowed {
     // Split the lines into words, retaining timestamps
     Dataset<Row> words = lines
       .as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()))
-      .flatMap(
-        new FlatMapFunction<Tuple2<String, Timestamp>, Tuple2<String, Timestamp>>() {
-          @Override
-          public Iterator<Tuple2<String, Timestamp>> call(Tuple2<String, Timestamp> t) {
-            List<Tuple2<String, Timestamp>> result = new ArrayList<>();
-            for (String word : t._1.split(" ")) {
-              result.add(new Tuple2<>(word, t._2));
-            }
-            return result.iterator();
+      .flatMap((FlatMapFunction<Tuple2<String, Timestamp>, Tuple2<String, Timestamp>>) t -> {
+          List<Tuple2<String, Timestamp>> result = new ArrayList<>();
+          for (String word : t._1.split(" ")) {
+            result.add(new Tuple2<>(word, t._2));
           }
+          return result.iterator();
         },
         Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP())
       ).toDF("word", "timestamp");

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
index e20b94d..47692ec 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
@@ -20,9 +20,6 @@ package org.apache.spark.examples.streaming;
 import com.google.common.io.Closeables;
 
 import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
 import org.apache.spark.storage.StorageLevel;
 import org.apache.spark.streaming.Duration;
 import org.apache.spark.streaming.api.java.JavaDStream;
@@ -38,7 +35,6 @@ import java.net.ConnectException;
 import java.net.Socket;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
-import java.util.Iterator;
 import java.util.regex.Pattern;
 
 /**
@@ -74,23 +70,9 @@ public class JavaCustomReceiver extends Receiver<String> {
     // words in input stream of \n delimited text (eg. generated by 'nc')
     JavaReceiverInputDStream<String> lines = ssc.receiverStream(
       new JavaCustomReceiver(args[0], Integer.parseInt(args[1])));
-    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
-      @Override
-      public Iterator<String> call(String x) {
-        return Arrays.asList(SPACE.split(x)).iterator();
-      }
-    });
-    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
-      new PairFunction<String, String, Integer>() {
-        @Override public Tuple2<String, Integer> call(String s) {
-          return new Tuple2<>(s, 1);
-        }
-      }).reduceByKey(new Function2<Integer, Integer, Integer>() {
-        @Override
-        public Integer call(Integer i1, Integer i2) {
-          return i1 + i2;
-        }
-      });
+    JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
+    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
+        .reduceByKey((i1, i2) -> i1 + i2);
 
     wordCounts.print();
     ssc.start();
@@ -108,15 +90,13 @@ public class JavaCustomReceiver extends Receiver<String> {
     port = port_;
   }
 
+  @Override
   public void onStart() {
     // Start the thread that receives data over a connection
-    new Thread()  {
-      @Override public void run() {
-        receive();
-      }
-    }.start();
+    new Thread(this::receive).start();
   }
 
+  @Override
   public void onStop() {
     // There is nothing much to do as the thread calling receive()
     // is designed to stop by itself isStopped() returns false
@@ -127,13 +107,13 @@ public class JavaCustomReceiver extends Receiver<String> {
     try {
       Socket socket = null;
       BufferedReader reader = null;
-      String userInput = null;
       try {
         // connect to the server
         socket = new Socket(host, port);
         reader = new BufferedReader(
             new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
         // Until stopped or connection broken continue reading
+        String userInput;
         while (!isStopped() && (userInput = reader.readLine()) != null) {
           System.out.println("Received data '" + userInput + "'");
           store(userInput);

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
index ed118f8..5e5ae62 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
@@ -20,7 +20,6 @@ package org.apache.spark.examples.streaming;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Arrays;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.regex.Pattern;
@@ -30,7 +29,6 @@ import scala.Tuple2;
 import kafka.serializer.StringDecoder;
 
 import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.*;
 import org.apache.spark.streaming.api.java.*;
 import org.apache.spark.streaming.kafka.KafkaUtils;
 import org.apache.spark.streaming.Durations;
@@ -82,31 +80,10 @@ public final class JavaDirectKafkaWordCount {
     );
 
     // Get the lines, split them into words, count the words and print
-    JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
-      @Override
-      public String call(Tuple2<String, String> tuple2) {
-        return tuple2._2();
-      }
-    });
-    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
-      @Override
-      public Iterator<String> call(String x) {
-        return Arrays.asList(SPACE.split(x)).iterator();
-      }
-    });
-    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
-      new PairFunction<String, String, Integer>() {
-        @Override
-        public Tuple2<String, Integer> call(String s) {
-          return new Tuple2<>(s, 1);
-        }
-      }).reduceByKey(
-        new Function2<Integer, Integer, Integer>() {
-        @Override
-        public Integer call(Integer i1, Integer i2) {
-          return i1 + i2;
-        }
-      });
+    JavaDStream<String> lines = messages.map(Tuple2::_2);
+    JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
+    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
+        .reduceByKey((i1, i2) -> i1 + i2);
     wordCounts.print();
 
     // Start the computation

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
index 33c0a2d..0c65104 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
@@ -18,7 +18,6 @@
 package org.apache.spark.examples.streaming;
 
 import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.Function;
 import org.apache.spark.streaming.*;
 import org.apache.spark.streaming.api.java.*;
 import org.apache.spark.streaming.flume.FlumeUtils;
@@ -62,12 +61,7 @@ public final class JavaFlumeEventCount {
 
     flumeStream.count();
 
-    flumeStream.count().map(new Function<Long, String>() {
-      @Override
-      public String call(Long in) {
-        return "Received " + in + " flume events.";
-      }
-    }).print();
+    flumeStream.count().map(in -> "Received " + in + " flume events.").print();
 
     ssc.start();
     ssc.awaitTermination();

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
index 8a5fd53..ce5acdc 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
@@ -18,7 +18,6 @@
 package org.apache.spark.examples.streaming;
 
 import java.util.Arrays;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.HashMap;
 import java.util.regex.Pattern;
@@ -26,10 +25,6 @@ import java.util.regex.Pattern;
 import scala.Tuple2;
 
 import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
 import org.apache.spark.streaming.Duration;
 import org.apache.spark.streaming.api.java.JavaDStream;
 import org.apache.spark.streaming.api.java.JavaPairDStream;
@@ -78,32 +73,12 @@ public final class JavaKafkaWordCount {
     JavaPairReceiverInputDStream<String, String> messages =
             KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
 
-    JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
-      @Override
-      public String call(Tuple2<String, String> tuple2) {
-        return tuple2._2();
-      }
-    });
+    JavaDStream<String> lines = messages.map(Tuple2::_2);
 
-    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
-      @Override
-      public Iterator<String> call(String x) {
-        return Arrays.asList(SPACE.split(x)).iterator();
-      }
-    });
+    JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
 
-    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
-      new PairFunction<String, String, Integer>() {
-        @Override
-        public Tuple2<String, Integer> call(String s) {
-          return new Tuple2<>(s, 1);
-        }
-      }).reduceByKey(new Function2<Integer, Integer, Integer>() {
-        @Override
-        public Integer call(Integer i1, Integer i2) {
-          return i1 + i2;
-        }
-      });
+    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
+        .reduceByKey((i1, i2) -> i1 + i2);
 
     wordCounts.print();
     jssc.start();

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
index 7a8fe99..b217672 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
@@ -18,15 +18,11 @@
 package org.apache.spark.examples.streaming;
 
 import java.util.Arrays;
-import java.util.Iterator;
 import java.util.regex.Pattern;
 
 import scala.Tuple2;
 
 import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
 import org.apache.spark.api.java.StorageLevels;
 import org.apache.spark.streaming.Durations;
 import org.apache.spark.streaming.api.java.JavaDStream;
@@ -66,24 +62,9 @@ public final class JavaNetworkWordCount {
     // Replication necessary in distributed scenario for fault tolerance.
     JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
             args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
-    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
-      @Override
-      public Iterator<String> call(String x) {
-        return Arrays.asList(SPACE.split(x)).iterator();
-      }
-    });
-    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
-      new PairFunction<String, String, Integer>() {
-        @Override
-        public Tuple2<String, Integer> call(String s) {
-          return new Tuple2<>(s, 1);
-        }
-      }).reduceByKey(new Function2<Integer, Integer, Integer>() {
-        @Override
-        public Integer call(Integer i1, Integer i2) {
-          return i1 + i2;
-        }
-      });
+    JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
+    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
+        .reduceByKey((i1, i2) -> i1 + i2);
 
     wordCounts.print();
     ssc.start();

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java
index 62413b4..e86f8ab 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java
@@ -17,19 +17,15 @@
 
 package org.apache.spark.examples.streaming;
 
-
+import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
 
 import scala.Tuple2;
 
-import com.google.common.collect.Lists;
-
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
 import org.apache.spark.streaming.Duration;
 import org.apache.spark.streaming.api.java.JavaDStream;
 import org.apache.spark.streaming.api.java.JavaPairDStream;
@@ -49,14 +45,14 @@ public final class JavaQueueStream {
 
     // Create the queue through which RDDs can be pushed to
     // a QueueInputDStream
-    Queue<JavaRDD<Integer>> rddQueue = new LinkedList<>();
 
     // Create and push some RDDs into the queue
-    List<Integer> list = Lists.newArrayList();
+    List<Integer> list = new ArrayList<>();
     for (int i = 0; i < 1000; i++) {
       list.add(i);
     }
 
+    Queue<JavaRDD<Integer>> rddQueue = new LinkedList<>();
     for (int i = 0; i < 30; i++) {
       rddQueue.add(ssc.sparkContext().parallelize(list));
     }
@@ -64,19 +60,9 @@ public final class JavaQueueStream {
     // Create the QueueInputDStream and use it do some processing
     JavaDStream<Integer> inputStream = ssc.queueStream(rddQueue);
     JavaPairDStream<Integer, Integer> mappedStream = inputStream.mapToPair(
-        new PairFunction<Integer, Integer, Integer>() {
-          @Override
-          public Tuple2<Integer, Integer> call(Integer i) {
-            return new Tuple2<>(i % 10, 1);
-          }
-        });
+        i -> new Tuple2<>(i % 10, 1));
     JavaPairDStream<Integer, Integer> reducedStream = mappedStream.reduceByKey(
-      new Function2<Integer, Integer, Integer>() {
-        @Override
-        public Integer call(Integer i1, Integer i2) {
-          return i1 + i2;
-        }
-    });
+        (i1, i2) -> i1 + i2);
 
     reducedStream.print();
     ssc.start();

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
index acbc345..45a876d 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
@@ -18,10 +18,8 @@
 package org.apache.spark.examples.streaming;
 
 import java.io.File;
-import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.Arrays;
-import java.util.Iterator;
 import java.util.List;
 import java.util.regex.Pattern;
 
@@ -30,12 +28,10 @@ import scala.Tuple2;
 import com.google.common.io.Files;
 
 import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.*;
 import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.streaming.Durations;
-import org.apache.spark.streaming.Time;
 import org.apache.spark.streaming.api.java.JavaDStream;
 import org.apache.spark.streaming.api.java.JavaPairDStream;
 import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
@@ -120,7 +116,7 @@ public final class JavaRecoverableNetworkWordCount {
     // If you do not see this printed, that means the StreamingContext has been loaded
     // from the new checkpoint
     System.out.println("Creating new context");
-    final File outputFile = new File(outputPath);
+    File outputFile = new File(outputPath);
     if (outputFile.exists()) {
       outputFile.delete();
     }
@@ -132,52 +128,31 @@ public final class JavaRecoverableNetworkWordCount {
     // Create a socket stream on target ip:port and count the
     // words in input stream of \n delimited text (eg. generated by 'nc')
     JavaReceiverInputDStream<String> lines = ssc.socketTextStream(ip, port);
-    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
-      @Override
-      public Iterator<String> call(String x) {
-        return Arrays.asList(SPACE.split(x)).iterator();
-      }
-    });
-    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
-      new PairFunction<String, String, Integer>() {
-        @Override
-        public Tuple2<String, Integer> call(String s) {
-          return new Tuple2<>(s, 1);
-        }
-      }).reduceByKey(new Function2<Integer, Integer, Integer>() {
-        @Override
-        public Integer call(Integer i1, Integer i2) {
-          return i1 + i2;
+    JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
+    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
+        .reduceByKey((i1, i2) -> i1 + i2);
+
+    wordCounts.foreachRDD((rdd, time) -> {
+      // Get or register the blacklist Broadcast
+      Broadcast<List<String>> blacklist =
+          JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
+      // Get or register the droppedWordsCounter Accumulator
+      LongAccumulator droppedWordsCounter =
+          JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
+      // Use blacklist to drop words and use droppedWordsCounter to count them
+      String counts = rdd.filter(wordCount -> {
+        if (blacklist.value().contains(wordCount._1())) {
+          droppedWordsCounter.add(wordCount._2());
+          return false;
+        } else {
+          return true;
         }
-      });
-
-    wordCounts.foreachRDD(new VoidFunction2<JavaPairRDD<String, Integer>, Time>() {
-      @Override
-      public void call(JavaPairRDD<String, Integer> rdd, Time time) throws IOException {
-        // Get or register the blacklist Broadcast
-        final Broadcast<List<String>> blacklist =
-            JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
-        // Get or register the droppedWordsCounter Accumulator
-        final LongAccumulator droppedWordsCounter =
-            JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
-        // Use blacklist to drop words and use droppedWordsCounter to count them
-        String counts = rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() {
-          @Override
-          public Boolean call(Tuple2<String, Integer> wordCount) {
-            if (blacklist.value().contains(wordCount._1())) {
-              droppedWordsCounter.add(wordCount._2());
-              return false;
-            } else {
-              return true;
-            }
-          }
-        }).collect().toString();
-        String output = "Counts at time " + time + " " + counts;
-        System.out.println(output);
-        System.out.println("Dropped " + droppedWordsCounter.value() + " word(s) totally");
-        System.out.println("Appending to " + outputFile.getAbsolutePath());
-        Files.append(output + "\n", outputFile, Charset.defaultCharset());
-      }
+      }).collect().toString();
+      String output = "Counts at time " + time + " " + counts;
+      System.out.println(output);
+      System.out.println("Dropped " + droppedWordsCounter.value() + " word(s) totally");
+      System.out.println("Appending to " + outputFile.getAbsolutePath());
+      Files.append(output + "\n", outputFile, Charset.defaultCharset());
     });
 
     return ssc;
@@ -198,19 +173,15 @@ public final class JavaRecoverableNetworkWordCount {
       System.exit(1);
     }
 
-    final String ip = args[0];
-    final int port = Integer.parseInt(args[1]);
-    final String checkpointDirectory = args[2];
-    final String outputPath = args[3];
+    String ip = args[0];
+    int port = Integer.parseInt(args[1]);
+    String checkpointDirectory = args[2];
+    String outputPath = args[3];
 
     // Function to create JavaStreamingContext without any output operations
     // (used to detect the new context)
-    Function0<JavaStreamingContext> createContextFunc = new Function0<JavaStreamingContext>() {
-      @Override
-      public JavaStreamingContext call() {
-        return createContext(ip, port, checkpointDirectory, outputPath);
-      }
-    };
+    Function0<JavaStreamingContext> createContextFunc =
+        () -> createContext(ip, port, checkpointDirectory, outputPath);
 
     JavaStreamingContext ssc =
       JavaStreamingContext.getOrCreate(checkpointDirectory, createContextFunc);

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
index b8e9e12..948d1a2 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
@@ -18,20 +18,15 @@
 package org.apache.spark.examples.streaming;
 
 import java.util.Arrays;
-import java.util.Iterator;
 import java.util.regex.Pattern;
 
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.VoidFunction2;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.api.java.StorageLevels;
 import org.apache.spark.streaming.Durations;
-import org.apache.spark.streaming.Time;
 import org.apache.spark.streaming.api.java.JavaDStream;
 import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
@@ -48,7 +43,6 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext;
  * and then run the example
  *    `$ bin/run-example org.apache.spark.examples.streaming.JavaSqlNetworkWordCount localhost 9999`
  */
-
 public final class JavaSqlNetworkWordCount {
   private static final Pattern SPACE = Pattern.compile(" ");
 
@@ -70,39 +64,28 @@ public final class JavaSqlNetworkWordCount {
     // Replication necessary in distributed scenario for fault tolerance.
     JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
         args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
-    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
-      @Override
-      public Iterator<String> call(String x) {
-        return Arrays.asList(SPACE.split(x)).iterator();
-      }
-    });
+    JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
 
     // Convert RDDs of the words DStream to DataFrame and run SQL query
-    words.foreachRDD(new VoidFunction2<JavaRDD<String>, Time>() {
-      @Override
-      public void call(JavaRDD<String> rdd, Time time) {
-        SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
+    words.foreachRDD((rdd, time) -> {
+      SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
 
-        // Convert JavaRDD[String] to JavaRDD[bean class] to DataFrame
-        JavaRDD<JavaRecord> rowRDD = rdd.map(new Function<String, JavaRecord>() {
-          @Override
-          public JavaRecord call(String word) {
-            JavaRecord record = new JavaRecord();
-            record.setWord(word);
-            return record;
-          }
-        });
-        Dataset<Row> wordsDataFrame = spark.createDataFrame(rowRDD, JavaRecord.class);
+      // Convert JavaRDD[String] to JavaRDD[bean class] to DataFrame
+      JavaRDD<JavaRecord> rowRDD = rdd.map(word -> {
+        JavaRecord record = new JavaRecord();
+        record.setWord(word);
+        return record;
+      });
+      Dataset<Row> wordsDataFrame = spark.createDataFrame(rowRDD, JavaRecord.class);
 
-        // Creates a temporary view using the DataFrame
-        wordsDataFrame.createOrReplaceTempView("words");
+      // Creates a temporary view using the DataFrame
+      wordsDataFrame.createOrReplaceTempView("words");
 
-        // Do word count on table using SQL and print it
-        Dataset<Row> wordCountsDataFrame =
-            spark.sql("select word, count(*) as total from words group by word");
-        System.out.println("========= " + time + "=========");
-        wordCountsDataFrame.show();
-      }
+      // Do word count on table using SQL and print it
+      Dataset<Row> wordCountsDataFrame =
+          spark.sql("select word, count(*) as total from words group by word");
+      System.out.println("========= " + time + "=========");
+      wordCountsDataFrame.show();
     });
 
     ssc.start();

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
index ed36df8..9d8bd7f 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
@@ -18,7 +18,6 @@
 package org.apache.spark.examples.streaming;
 
 import java.util.Arrays;
-import java.util.Iterator;
 import java.util.List;
 import java.util.regex.Pattern;
 
@@ -72,32 +71,17 @@ public class JavaStatefulNetworkWordCount {
     JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
             args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER_2);
 
-    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
-      @Override
-      public Iterator<String> call(String x) {
-        return Arrays.asList(SPACE.split(x)).iterator();
-      }
-    });
+    JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
 
-    JavaPairDStream<String, Integer> wordsDstream = words.mapToPair(
-        new PairFunction<String, String, Integer>() {
-          @Override
-          public Tuple2<String, Integer> call(String s) {
-            return new Tuple2<>(s, 1);
-          }
-        });
+    JavaPairDStream<String, Integer> wordsDstream = words.mapToPair(s -> new Tuple2<>(s, 1));
 
     // Update the cumulative count function
     Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc =
-        new Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>() {
-          @Override
-          public Tuple2<String, Integer> call(String word, Optional<Integer> one,
-              State<Integer> state) {
-            int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
-            Tuple2<String, Integer> output = new Tuple2<>(word, sum);
-            state.update(sum);
-            return output;
-          }
+        (word, one, state) -> {
+          int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
+          Tuple2<String, Integer> output = new Tuple2<>(word, sum);
+          state.update(sum);
+          return output;
         };
 
     // DStream made of get cumulative counts that get updated in every batch


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[2/2] spark git commit: [SPARK-19533][EXAMPLES] Convert Java tests to use lambdas, Java 8 features

Posted by sr...@apache.org.
[SPARK-19533][EXAMPLES] Convert Java tests to use lambdas, Java 8 features

## What changes were proposed in this pull request?

Convert Java tests to use lambdas, Java 8 features.

## How was this patch tested?

Jenkins tests.

Author: Sean Owen <so...@cloudera.com>

Closes #16961 from srowen/SPARK-19533.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/de14d35f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/de14d35f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/de14d35f

Branch: refs/heads/master
Commit: de14d35f77071932963a994fac5aec0e5df838a1
Parents: ba8912e
Author: Sean Owen <so...@cloudera.com>
Authored: Sun Feb 19 09:37:56 2017 -0800
Committer: Sean Owen <so...@cloudera.com>
Committed: Sun Feb 19 09:37:56 2017 -0800

----------------------------------------------------------------------
 .../org/apache/spark/examples/JavaLogQuery.java |  21 +--
 .../org/apache/spark/examples/JavaPageRank.java |  49 ++-----
 .../org/apache/spark/examples/JavaSparkPi.java  |  20 +--
 .../spark/examples/JavaStatusTrackerDemo.java   |   5 +-
 .../java/org/apache/spark/examples/JavaTC.java  |   8 +-
 .../apache/spark/examples/JavaWordCount.java    |  27 +---
 .../spark/examples/ml/JavaALSExample.java       |   7 +-
 ...SelectionViaTrainValidationSplitExample.java |   3 -
 .../spark/examples/ml/JavaTokenizerExample.java |  13 +-
 .../examples/ml/JavaVectorSlicerExample.java    |   7 +-
 .../mllib/JavaAssociationRulesExample.java      |   6 +-
 .../JavaBinaryClassificationMetricsExample.java |  33 ++---
 .../mllib/JavaBisectingKMeansExample.java       |   7 +-
 .../mllib/JavaChiSqSelectorExample.java         |  38 ++----
 .../JavaDecisionTreeClassificationExample.java  |  26 +---
 .../JavaDecisionTreeRegressionExample.java      |  33 ++---
 .../mllib/JavaElementwiseProductExample.java    |  27 +---
 .../mllib/JavaGaussianMixtureExample.java       |  19 +--
 ...vaGradientBoostingClassificationExample.java |  21 +--
 .../JavaGradientBoostingRegressionExample.java  |  30 +----
 .../mllib/JavaIsotonicRegressionExample.java    |  39 ++----
 .../spark/examples/mllib/JavaKMeansExample.java |  19 +--
 .../spark/examples/mllib/JavaLBFGSExample.java  |  23 +---
 .../JavaLatentDirichletAllocationExample.java   |  28 ++--
 .../JavaLinearRegressionWithSGDExample.java     |  47 +++----
 .../JavaLogisticRegressionWithLBFGSExample.java |  14 +-
 ...aMulticlassClassificationMetricsExample.java |  13 +-
 .../examples/mllib/JavaNaiveBayesExample.java   |  19 +--
 .../JavaPowerIterationClusteringExample.java    |   6 +-
 .../JavaRandomForestClassificationExample.java  |  23 +---
 .../JavaRandomForestRegressionExample.java      |  37 ++---
 .../mllib/JavaRankingMetricsExample.java        | 135 ++++++-------------
 .../mllib/JavaRecommendationExample.java        |  58 +++-----
 .../mllib/JavaRegressionMetricsExample.java     |  31 ++---
 .../examples/mllib/JavaSVMWithSGDExample.java   |  13 +-
 .../examples/mllib/JavaSimpleFPGrowth.java      |  12 +-
 .../mllib/JavaStreamingTestExample.java         |  40 ++----
 .../examples/sql/JavaSQLDataSourceExample.java  |   8 +-
 .../spark/examples/sql/JavaSparkSQLExample.java |  60 +++------
 .../examples/sql/hive/JavaSparkHiveExample.java |   9 +-
 .../streaming/JavaStructuredKafkaWordCount.java |  10 +-
 .../JavaStructuredNetworkWordCount.java         |  11 +-
 .../JavaStructuredNetworkWordCountWindowed.java |  16 +--
 .../examples/streaming/JavaCustomReceiver.java  |  34 +----
 .../streaming/JavaDirectKafkaWordCount.java     |  31 +----
 .../examples/streaming/JavaFlumeEventCount.java |   8 +-
 .../examples/streaming/JavaKafkaWordCount.java  |  33 +----
 .../streaming/JavaNetworkWordCount.java         |  25 +---
 .../examples/streaming/JavaQueueStream.java     |  24 +---
 .../JavaRecoverableNetworkWordCount.java        |  91 +++++--------
 .../streaming/JavaSqlNetworkWordCount.java      |  51 +++----
 .../streaming/JavaStatefulNetworkWordCount.java |  30 +----
 52 files changed, 380 insertions(+), 1018 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
index 7775443..cf12de3 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
@@ -17,18 +17,16 @@
 
 package org.apache.spark.examples;
 
-import com.google.common.collect.Lists;
 import scala.Tuple2;
 import scala.Tuple3;
 
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
 import org.apache.spark.sql.SparkSession;
 
 import java.io.Serializable;
+import java.util.Arrays;
 import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -40,7 +38,7 @@ import java.util.regex.Pattern;
  */
 public final class JavaLogQuery {
 
-  public static final List<String> exampleApacheLogs = Lists.newArrayList(
+  public static final List<String> exampleApacheLogs = Arrays.asList(
     "10.10.10.10 - \"FRED\" [18/Jan/2013:17:56:07 +1100] \"GET http://images.com/2013/Generic.jpg " +
       "HTTP/1.1\" 304 315 \"http://referall.com/\" \"Mozilla/4.0 (compatible; MSIE 7.0; " +
       "Windows NT 5.1; GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; " +
@@ -109,19 +107,10 @@ public final class JavaLogQuery {
 
     JavaRDD<String> dataSet = (args.length == 1) ? jsc.textFile(args[0]) : jsc.parallelize(exampleApacheLogs);
 
-    JavaPairRDD<Tuple3<String, String, String>, Stats> extracted = dataSet.mapToPair(new PairFunction<String, Tuple3<String, String, String>, Stats>() {
-      @Override
-      public Tuple2<Tuple3<String, String, String>, Stats> call(String s) {
-        return new Tuple2<>(extractKey(s), extractStats(s));
-      }
-    });
+    JavaPairRDD<Tuple3<String, String, String>, Stats> extracted =
+        dataSet.mapToPair(s -> new Tuple2<>(extractKey(s), extractStats(s)));
 
-    JavaPairRDD<Tuple3<String, String, String>, Stats> counts = extracted.reduceByKey(new Function2<Stats, Stats, Stats>() {
-      @Override
-      public Stats call(Stats stats, Stats stats2) {
-        return stats.merge(stats2);
-      }
-    });
+    JavaPairRDD<Tuple3<String, String, String>, Stats> counts = extracted.reduceByKey(Stats::merge);
 
     List<Tuple2<Tuple3<String, String, String>, Stats>> output = counts.collect();
     for (Tuple2<?,?> t : output) {

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
index bcc493b..b5b4703 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
@@ -19,7 +19,6 @@ package org.apache.spark.examples;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Iterator;
 import java.util.regex.Pattern;
 
 import scala.Tuple2;
@@ -28,10 +27,7 @@ import com.google.common.collect.Iterables;
 
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFlatMapFunction;
-import org.apache.spark.api.java.function.PairFunction;
 import org.apache.spark.sql.SparkSession;
 
 /**
@@ -90,52 +86,35 @@ public final class JavaPageRank {
     JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();
 
     // Loads all URLs from input file and initialize their neighbors.
-    JavaPairRDD<String, Iterable<String>> links = lines.mapToPair(
-      new PairFunction<String, String, String>() {
-        @Override
-        public Tuple2<String, String> call(String s) {
-          String[] parts = SPACES.split(s);
-          return new Tuple2<>(parts[0], parts[1]);
-        }
-      }).distinct().groupByKey().cache();
+    JavaPairRDD<String, Iterable<String>> links = lines.mapToPair(s -> {
+      String[] parts = SPACES.split(s);
+      return new Tuple2<>(parts[0], parts[1]);
+    }).distinct().groupByKey().cache();
 
     // Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
-    JavaPairRDD<String, Double> ranks = links.mapValues(new Function<Iterable<String>, Double>() {
-      @Override
-      public Double call(Iterable<String> rs) {
-        return 1.0;
-      }
-    });
+    JavaPairRDD<String, Double> ranks = links.mapValues(rs -> 1.0);
 
     // Calculates and updates URL ranks continuously using PageRank algorithm.
     for (int current = 0; current < Integer.parseInt(args[1]); current++) {
       // Calculates URL contributions to the rank of other URLs.
       JavaPairRDD<String, Double> contribs = links.join(ranks).values()
-        .flatMapToPair(new PairFlatMapFunction<Tuple2<Iterable<String>, Double>, String, Double>() {
-          @Override
-          public Iterator<Tuple2<String, Double>> call(Tuple2<Iterable<String>, Double> s) {
-            int urlCount = Iterables.size(s._1);
-            List<Tuple2<String, Double>> results = new ArrayList<>();
-            for (String n : s._1) {
-              results.add(new Tuple2<>(n, s._2() / urlCount));
-            }
-            return results.iterator();
+        .flatMapToPair(s -> {
+          int urlCount = Iterables.size(s._1());
+          List<Tuple2<String, Double>> results = new ArrayList<>();
+          for (String n : s._1) {
+            results.add(new Tuple2<>(n, s._2() / urlCount));
           }
-      });
+          return results.iterator();
+        });
 
       // Re-calculates URL ranks based on neighbor contributions.
-      ranks = contribs.reduceByKey(new Sum()).mapValues(new Function<Double, Double>() {
-        @Override
-        public Double call(Double sum) {
-          return 0.15 + sum * 0.85;
-        }
-      });
+      ranks = contribs.reduceByKey(new Sum()).mapValues(sum -> 0.15 + sum * 0.85);
     }
 
     // Collects all URL ranks and dump them to console.
     List<Tuple2<String, Double>> output = ranks.collect();
     for (Tuple2<?,?> tuple : output) {
-        System.out.println(tuple._1() + " has rank: " + tuple._2() + ".");
+      System.out.println(tuple._1() + " has rank: " + tuple._2() + ".");
     }
 
     spark.stop();

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java b/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java
index 89855e8..cb4b265 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java
@@ -19,8 +19,6 @@ package org.apache.spark.examples;
 
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.sql.SparkSession;
 
 import java.util.ArrayList;
@@ -49,19 +47,11 @@ public final class JavaSparkPi {
 
     JavaRDD<Integer> dataSet = jsc.parallelize(l, slices);
 
-    int count = dataSet.map(new Function<Integer, Integer>() {
-      @Override
-      public Integer call(Integer integer) {
-        double x = Math.random() * 2 - 1;
-        double y = Math.random() * 2 - 1;
-        return (x * x + y * y <= 1) ? 1 : 0;
-      }
-    }).reduce(new Function2<Integer, Integer, Integer>() {
-      @Override
-      public Integer call(Integer integer, Integer integer2) {
-        return integer + integer2;
-      }
-    });
+    int count = dataSet.map(integer -> {
+      double x = Math.random() * 2 - 1;
+      double y = Math.random() * 2 - 1;
+      return (x * x + y * y <= 1) ? 1 : 0;
+    }).reduce((integer, integer2) -> integer + integer2);
 
     System.out.println("Pi is roughly " + 4.0 * count / n);
 

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/JavaStatusTrackerDemo.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaStatusTrackerDemo.java b/examples/src/main/java/org/apache/spark/examples/JavaStatusTrackerDemo.java
index 6f899c7..b0ebedf 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaStatusTrackerDemo.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaStatusTrackerDemo.java
@@ -25,7 +25,6 @@ import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.sql.SparkSession;
 
-
 import java.util.Arrays;
 import java.util.List;
 
@@ -50,11 +49,11 @@ public final class JavaStatusTrackerDemo {
       .appName(APP_NAME)
       .getOrCreate();
 
-    final JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
+    JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
 
     // Example of implementing a progress reporter for a simple job.
     JavaRDD<Integer> rdd = jsc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 5).map(
-        new IdentityWithDelay<Integer>());
+        new IdentityWithDelay<>());
     JavaFutureAction<List<Integer>> jobFuture = rdd.collectAsync();
     while (!jobFuture.isDone()) {
       Thread.sleep(1000);  // 1 second

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/JavaTC.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaTC.java b/examples/src/main/java/org/apache/spark/examples/JavaTC.java
index f12ca77..bde30b8 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaTC.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaTC.java
@@ -80,13 +80,7 @@ public final class JavaTC {
     // the graph to obtain the path (x, z).
 
     // Because join() joins on keys, the edges are stored in reversed order.
-    JavaPairRDD<Integer, Integer> edges = tc.mapToPair(
-      new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
-        @Override
-        public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> e) {
-          return new Tuple2<>(e._2(), e._1());
-        }
-    });
+    JavaPairRDD<Integer, Integer> edges = tc.mapToPair(e -> new Tuple2<>(e._2(), e._1()));
 
     long oldCount;
     long nextCount = tc.count();

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
index 8f18604..f1ce1e9 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
@@ -21,13 +21,9 @@ import scala.Tuple2;
 
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
 import org.apache.spark.sql.SparkSession;
 
 import java.util.Arrays;
-import java.util.Iterator;
 import java.util.List;
 import java.util.regex.Pattern;
 
@@ -48,28 +44,11 @@ public final class JavaWordCount {
 
     JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();
 
-    JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
-      @Override
-      public Iterator<String> call(String s) {
-        return Arrays.asList(SPACE.split(s)).iterator();
-      }
-    });
+    JavaRDD<String> words = lines.flatMap(s -> Arrays.asList(SPACE.split(s)).iterator());
 
-    JavaPairRDD<String, Integer> ones = words.mapToPair(
-      new PairFunction<String, String, Integer>() {
-        @Override
-        public Tuple2<String, Integer> call(String s) {
-          return new Tuple2<>(s, 1);
-        }
-      });
+    JavaPairRDD<String, Integer> ones = words.mapToPair(s -> new Tuple2<>(s, 1));
 
-    JavaPairRDD<String, Integer> counts = ones.reduceByKey(
-      new Function2<Integer, Integer, Integer>() {
-        @Override
-        public Integer call(Integer i1, Integer i2) {
-          return i1 + i2;
-        }
-      });
+    JavaPairRDD<String, Integer> counts = ones.reduceByKey((i1, i2) -> i1 + i2);
 
     List<Tuple2<String, Integer>> output = counts.collect();
     for (Tuple2<?,?> tuple : output) {

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java
index 739558e..33ba668 100644
--- a/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java
@@ -25,7 +25,6 @@ import org.apache.spark.sql.SparkSession;
 import java.io.Serializable;
 
 import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
 import org.apache.spark.ml.evaluation.RegressionEvaluator;
 import org.apache.spark.ml.recommendation.ALS;
 import org.apache.spark.ml.recommendation.ALSModel;
@@ -88,11 +87,7 @@ public class JavaALSExample {
     // $example on$
     JavaRDD<Rating> ratingsRDD = spark
       .read().textFile("data/mllib/als/sample_movielens_ratings.txt").javaRDD()
-      .map(new Function<String, Rating>() {
-        public Rating call(String str) {
-          return Rating.parseRating(str);
-        }
-      });
+      .map(Rating::parseRating);
     Dataset<Row> ratings = spark.createDataFrame(ratingsRDD, Rating.class);
     Dataset<Row>[] splits = ratings.randomSplit(new double[]{0.8, 0.2});
     Dataset<Row> training = splits[0];

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/ml/JavaModelSelectionViaTrainValidationSplitExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaModelSelectionViaTrainValidationSplitExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaModelSelectionViaTrainValidationSplitExample.java
index 0f96293..9a4722b 100644
--- a/examples/src/main/java/org/apache/spark/examples/ml/JavaModelSelectionViaTrainValidationSplitExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaModelSelectionViaTrainValidationSplitExample.java
@@ -32,9 +32,6 @@ import org.apache.spark.sql.SparkSession;
 /**
  * Java example demonstrating model selection using TrainValidationSplit.
  *
- * The example is based on {@link org.apache.spark.examples.ml.JavaSimpleParamsExample}
- * using linear regression.
- *
  * Run with
  * {{{
  * bin/run-example ml.JavaModelSelectionViaTrainValidationSplitExample

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java
index 004e9b1..3f809eb 100644
--- a/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java
@@ -69,20 +69,17 @@ public class JavaTokenizerExample {
         .setOutputCol("words")
         .setPattern("\\W");  // alternatively .setPattern("\\w+").setGaps(false);
 
-    spark.udf().register("countTokens", new UDF1<WrappedArray<String>, Integer>() {
-      @Override
-      public Integer call(WrappedArray<String> words) {
-        return words.size();
-      }
-    }, DataTypes.IntegerType);
+    spark.udf().register("countTokens", (WrappedArray<?> words) -> words.size(), DataTypes.IntegerType);
 
     Dataset<Row> tokenized = tokenizer.transform(sentenceDataFrame);
     tokenized.select("sentence", "words")
-        .withColumn("tokens", callUDF("countTokens", col("words"))).show(false);
+        .withColumn("tokens", callUDF("countTokens", col("words")))
+        .show(false);
 
     Dataset<Row> regexTokenized = regexTokenizer.transform(sentenceDataFrame);
     regexTokenized.select("sentence", "words")
-        .withColumn("tokens", callUDF("countTokens", col("words"))).show(false);
+        .withColumn("tokens", callUDF("countTokens", col("words")))
+        .show(false);
     // $example off$
 
     spark.stop();

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/ml/JavaVectorSlicerExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaVectorSlicerExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaVectorSlicerExample.java
index 1922514..1ae48be 100644
--- a/examples/src/main/java/org/apache/spark/examples/ml/JavaVectorSlicerExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaVectorSlicerExample.java
@@ -20,10 +20,9 @@ package org.apache.spark.examples.ml;
 import org.apache.spark.sql.SparkSession;
 
 // $example on$
+import java.util.Arrays;
 import java.util.List;
 
-import com.google.common.collect.Lists;
-
 import org.apache.spark.ml.attribute.Attribute;
 import org.apache.spark.ml.attribute.AttributeGroup;
 import org.apache.spark.ml.attribute.NumericAttribute;
@@ -43,14 +42,14 @@ public class JavaVectorSlicerExample {
       .getOrCreate();
 
     // $example on$
-    Attribute[] attrs = new Attribute[]{
+    Attribute[] attrs = {
       NumericAttribute.defaultAttr().withName("f1"),
       NumericAttribute.defaultAttr().withName("f2"),
       NumericAttribute.defaultAttr().withName("f3")
     };
     AttributeGroup group = new AttributeGroup("userFeatures", attrs);
 
-    List<Row> data = Lists.newArrayList(
+    List<Row> data = Arrays.asList(
       RowFactory.create(Vectors.sparse(3, new int[]{0, 1}, new double[]{-2.0, 2.3})),
       RowFactory.create(Vectors.dense(-2.0, 2.3, 0.0))
     );

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaAssociationRulesExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaAssociationRulesExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaAssociationRulesExample.java
index 189560e..5f43603 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaAssociationRulesExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaAssociationRulesExample.java
@@ -38,9 +38,9 @@ public class JavaAssociationRulesExample {
 
     // $example on$
     JavaRDD<FPGrowth.FreqItemset<String>> freqItemsets = sc.parallelize(Arrays.asList(
-      new FreqItemset<String>(new String[] {"a"}, 15L),
-      new FreqItemset<String>(new String[] {"b"}, 35L),
-      new FreqItemset<String>(new String[] {"a", "b"}, 12L)
+      new FreqItemset<>(new String[] {"a"}, 15L),
+      new FreqItemset<>(new String[] {"b"}, 35L),
+      new FreqItemset<>(new String[] {"a", "b"}, 12L)
     ));
 
     AssociationRules arules = new AssociationRules()

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaBinaryClassificationMetricsExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaBinaryClassificationMetricsExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaBinaryClassificationMetricsExample.java
index 12aa14f..b9d0313 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaBinaryClassificationMetricsExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaBinaryClassificationMetricsExample.java
@@ -21,7 +21,6 @@ package org.apache.spark.examples.mllib;
 import scala.Tuple2;
 
 import org.apache.spark.api.java.*;
-import org.apache.spark.api.java.function.Function;
 import org.apache.spark.mllib.classification.LogisticRegressionModel;
 import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS;
 import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics;
@@ -46,7 +45,7 @@ public class JavaBinaryClassificationMetricsExample {
     JavaRDD<LabeledPoint> test = splits[1];
 
     // Run training algorithm to build the model.
-    final LogisticRegressionModel model = new LogisticRegressionWithLBFGS()
+    LogisticRegressionModel model = new LogisticRegressionWithLBFGS()
       .setNumClasses(2)
       .run(training.rdd());
 
@@ -54,15 +53,8 @@ public class JavaBinaryClassificationMetricsExample {
     model.clearThreshold();
 
     // Compute raw scores on the test set.
-    JavaRDD<Tuple2<Object, Object>> predictionAndLabels = test.map(
-      new Function<LabeledPoint, Tuple2<Object, Object>>() {
-        @Override
-        public Tuple2<Object, Object> call(LabeledPoint p) {
-          Double prediction = model.predict(p.features());
-          return new Tuple2<Object, Object>(prediction, p.label());
-        }
-      }
-    );
+    JavaPairRDD<Object, Object> predictionAndLabels = test.mapToPair(p ->
+      new Tuple2<>(model.predict(p.features()), p.label()));
 
     // Get evaluation metrics.
     BinaryClassificationMetrics metrics =
@@ -73,32 +65,25 @@ public class JavaBinaryClassificationMetricsExample {
     System.out.println("Precision by threshold: " + precision.collect());
 
     // Recall by threshold
-    JavaRDD<Tuple2<Object, Object>> recall = metrics.recallByThreshold().toJavaRDD();
+    JavaRDD<?> recall = metrics.recallByThreshold().toJavaRDD();
     System.out.println("Recall by threshold: " + recall.collect());
 
     // F Score by threshold
-    JavaRDD<Tuple2<Object, Object>> f1Score = metrics.fMeasureByThreshold().toJavaRDD();
+    JavaRDD<?> f1Score = metrics.fMeasureByThreshold().toJavaRDD();
     System.out.println("F1 Score by threshold: " + f1Score.collect());
 
-    JavaRDD<Tuple2<Object, Object>> f2Score = metrics.fMeasureByThreshold(2.0).toJavaRDD();
+    JavaRDD<?> f2Score = metrics.fMeasureByThreshold(2.0).toJavaRDD();
     System.out.println("F2 Score by threshold: " + f2Score.collect());
 
     // Precision-recall curve
-    JavaRDD<Tuple2<Object, Object>> prc = metrics.pr().toJavaRDD();
+    JavaRDD<?> prc = metrics.pr().toJavaRDD();
     System.out.println("Precision-recall curve: " + prc.collect());
 
     // Thresholds
-    JavaRDD<Double> thresholds = precision.map(
-      new Function<Tuple2<Object, Object>, Double>() {
-        @Override
-        public Double call(Tuple2<Object, Object> t) {
-          return new Double(t._1().toString());
-        }
-      }
-    );
+    JavaRDD<Double> thresholds = precision.map(t -> Double.parseDouble(t._1().toString()));
 
     // ROC Curve
-    JavaRDD<Tuple2<Object, Object>> roc = metrics.roc().toJavaRDD();
+    JavaRDD<?> roc = metrics.roc().toJavaRDD();
     System.out.println("ROC curve: " + roc.collect());
 
     // AUPRC

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaBisectingKMeansExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaBisectingKMeansExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaBisectingKMeansExample.java
index c600094..f878b55 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaBisectingKMeansExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaBisectingKMeansExample.java
@@ -17,10 +17,9 @@
 
 package org.apache.spark.examples.mllib;
 
-import java.util.ArrayList;
-
 // $example on$
-import com.google.common.collect.Lists;
+import java.util.Arrays;
+import java.util.List;
 // $example off$
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -41,7 +40,7 @@ public class JavaBisectingKMeansExample {
     JavaSparkContext sc = new JavaSparkContext(sparkConf);
 
     // $example on$
-    ArrayList<Vector> localData = Lists.newArrayList(
+    List<Vector> localData = Arrays.asList(
       Vectors.dense(0.1, 0.1),   Vectors.dense(0.3, 0.3),
       Vectors.dense(10.1, 10.1), Vectors.dense(10.3, 10.3),
       Vectors.dense(20.1, 20.1), Vectors.dense(20.3, 20.3),

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaChiSqSelectorExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaChiSqSelectorExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaChiSqSelectorExample.java
index ad44acb..ce354af 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaChiSqSelectorExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaChiSqSelectorExample.java
@@ -19,10 +19,8 @@ package org.apache.spark.examples.mllib;
 
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.VoidFunction;
 // $example on$
 import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
 import org.apache.spark.mllib.feature.ChiSqSelector;
 import org.apache.spark.mllib.feature.ChiSqSelectorModel;
 import org.apache.spark.mllib.linalg.Vectors;
@@ -42,41 +40,25 @@ public class JavaChiSqSelectorExample {
 
     // Discretize data in 16 equal bins since ChiSqSelector requires categorical features
     // Although features are doubles, the ChiSqSelector treats each unique value as a category
-    JavaRDD<LabeledPoint> discretizedData = points.map(
-      new Function<LabeledPoint, LabeledPoint>() {
-        @Override
-        public LabeledPoint call(LabeledPoint lp) {
-          final double[] discretizedFeatures = new double[lp.features().size()];
-          for (int i = 0; i < lp.features().size(); ++i) {
-            discretizedFeatures[i] = Math.floor(lp.features().apply(i) / 16);
-          }
-          return new LabeledPoint(lp.label(), Vectors.dense(discretizedFeatures));
-        }
+    JavaRDD<LabeledPoint> discretizedData = points.map(lp -> {
+      double[] discretizedFeatures = new double[lp.features().size()];
+      for (int i = 0; i < lp.features().size(); ++i) {
+        discretizedFeatures[i] = Math.floor(lp.features().apply(i) / 16);
       }
-    );
+      return new LabeledPoint(lp.label(), Vectors.dense(discretizedFeatures));
+    });
 
     // Create ChiSqSelector that will select top 50 of 692 features
     ChiSqSelector selector = new ChiSqSelector(50);
     // Create ChiSqSelector model (selecting features)
-    final ChiSqSelectorModel transformer = selector.fit(discretizedData.rdd());
+    ChiSqSelectorModel transformer = selector.fit(discretizedData.rdd());
     // Filter the top 50 features from each feature vector
-    JavaRDD<LabeledPoint> filteredData = discretizedData.map(
-      new Function<LabeledPoint, LabeledPoint>() {
-        @Override
-        public LabeledPoint call(LabeledPoint lp) {
-          return new LabeledPoint(lp.label(), transformer.transform(lp.features()));
-        }
-      }
-    );
+    JavaRDD<LabeledPoint> filteredData = discretizedData.map(lp ->
+      new LabeledPoint(lp.label(), transformer.transform(lp.features())));
     // $example off$
 
     System.out.println("filtered data: ");
-    filteredData.foreach(new VoidFunction<LabeledPoint>() {
-      @Override
-      public void call(LabeledPoint labeledPoint) throws Exception {
-        System.out.println(labeledPoint.toString());
-      }
-    });
+    filteredData.foreach(System.out::println);
 
     jsc.stop();
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeClassificationExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeClassificationExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeClassificationExample.java
index 66387b9..032c168 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeClassificationExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeClassificationExample.java
@@ -27,8 +27,6 @@ import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.PairFunction;
 import org.apache.spark.mllib.regression.LabeledPoint;
 import org.apache.spark.mllib.tree.DecisionTree;
 import org.apache.spark.mllib.tree.model.DecisionTreeModel;
@@ -53,31 +51,21 @@ class JavaDecisionTreeClassificationExample {
 
     // Set parameters.
     //  Empty categoricalFeaturesInfo indicates all features are continuous.
-    Integer numClasses = 2;
+    int numClasses = 2;
     Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>();
     String impurity = "gini";
-    Integer maxDepth = 5;
-    Integer maxBins = 32;
+    int maxDepth = 5;
+    int maxBins = 32;
 
     // Train a DecisionTree model for classification.
-    final DecisionTreeModel model = DecisionTree.trainClassifier(trainingData, numClasses,
+    DecisionTreeModel model = DecisionTree.trainClassifier(trainingData, numClasses,
       categoricalFeaturesInfo, impurity, maxDepth, maxBins);
 
     // Evaluate model on test instances and compute test error
     JavaPairRDD<Double, Double> predictionAndLabel =
-      testData.mapToPair(new PairFunction<LabeledPoint, Double, Double>() {
-        @Override
-        public Tuple2<Double, Double> call(LabeledPoint p) {
-          return new Tuple2<>(model.predict(p.features()), p.label());
-        }
-      });
-    Double testErr =
-      1.0 * predictionAndLabel.filter(new Function<Tuple2<Double, Double>, Boolean>() {
-        @Override
-        public Boolean call(Tuple2<Double, Double> pl) {
-          return !pl._1().equals(pl._2());
-        }
-      }).count() / testData.count();
+      testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label()));
+    double testErr =
+      predictionAndLabel.filter(pl -> !pl._1().equals(pl._2())).count() / (double) testData.count();
 
     System.out.println("Test Error: " + testErr);
     System.out.println("Learned classification tree model:\n" + model.toDebugString());

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeRegressionExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeRegressionExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeRegressionExample.java
index 904e7f7..f222c38 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeRegressionExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeRegressionExample.java
@@ -27,9 +27,6 @@ import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
 import org.apache.spark.mllib.regression.LabeledPoint;
 import org.apache.spark.mllib.tree.DecisionTree;
 import org.apache.spark.mllib.tree.model.DecisionTreeModel;
@@ -56,34 +53,20 @@ class JavaDecisionTreeRegressionExample {
     // Empty categoricalFeaturesInfo indicates all features are continuous.
     Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>();
     String impurity = "variance";
-    Integer maxDepth = 5;
-    Integer maxBins = 32;
+    int maxDepth = 5;
+    int maxBins = 32;
 
     // Train a DecisionTree model.
-    final DecisionTreeModel model = DecisionTree.trainRegressor(trainingData,
+    DecisionTreeModel model = DecisionTree.trainRegressor(trainingData,
       categoricalFeaturesInfo, impurity, maxDepth, maxBins);
 
     // Evaluate model on test instances and compute test error
     JavaPairRDD<Double, Double> predictionAndLabel =
-      testData.mapToPair(new PairFunction<LabeledPoint, Double, Double>() {
-      @Override
-      public Tuple2<Double, Double> call(LabeledPoint p) {
-        return new Tuple2<>(model.predict(p.features()), p.label());
-      }
-    });
-    Double testMSE =
-      predictionAndLabel.map(new Function<Tuple2<Double, Double>, Double>() {
-        @Override
-        public Double call(Tuple2<Double, Double> pl) {
-          Double diff = pl._1() - pl._2();
-          return diff * diff;
-        }
-      }).reduce(new Function2<Double, Double, Double>() {
-        @Override
-        public Double call(Double a, Double b) {
-          return a + b;
-        }
-      }) / data.count();
+      testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label()));
+    double testMSE = predictionAndLabel.mapToDouble(pl -> {
+      double diff = pl._1() - pl._2();
+      return diff * diff;
+    }).mean();
     System.out.println("Test Mean Squared Error: " + testMSE);
     System.out.println("Learned regression tree model:\n" + model.toDebugString());
 

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaElementwiseProductExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaElementwiseProductExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaElementwiseProductExample.java
index c8ce6ab..2d45c61 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaElementwiseProductExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaElementwiseProductExample.java
@@ -25,12 +25,10 @@ import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaSparkContext;
 // $example on$
 import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
 import org.apache.spark.mllib.feature.ElementwiseProduct;
 import org.apache.spark.mllib.linalg.Vector;
 import org.apache.spark.mllib.linalg.Vectors;
 // $example off$
-import org.apache.spark.api.java.function.VoidFunction;
 
 public class JavaElementwiseProductExample {
   public static void main(String[] args) {
@@ -43,35 +41,18 @@ public class JavaElementwiseProductExample {
     JavaRDD<Vector> data = jsc.parallelize(Arrays.asList(
       Vectors.dense(1.0, 2.0, 3.0), Vectors.dense(4.0, 5.0, 6.0)));
     Vector transformingVector = Vectors.dense(0.0, 1.0, 2.0);
-    final ElementwiseProduct transformer = new ElementwiseProduct(transformingVector);
+    ElementwiseProduct transformer = new ElementwiseProduct(transformingVector);
 
     // Batch transform and per-row transform give the same results:
     JavaRDD<Vector> transformedData = transformer.transform(data);
-    JavaRDD<Vector> transformedData2 = data.map(
-      new Function<Vector, Vector>() {
-        @Override
-        public Vector call(Vector v) {
-          return transformer.transform(v);
-        }
-      }
-    );
+    JavaRDD<Vector> transformedData2 = data.map(transformer::transform);
     // $example off$
 
     System.out.println("transformedData: ");
-    transformedData.foreach(new VoidFunction<Vector>() {
-      @Override
-      public void call(Vector vector) throws Exception {
-        System.out.println(vector.toString());
-      }
-    });
+    transformedData.foreach(System.out::println);
 
     System.out.println("transformedData2: ");
-    transformedData2.foreach(new VoidFunction<Vector>() {
-      @Override
-      public void call(Vector vector) throws Exception {
-        System.out.println(vector.toString());
-      }
-    });
+    transformedData2.foreach(System.out::println);
 
     jsc.stop();
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaGaussianMixtureExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaGaussianMixtureExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaGaussianMixtureExample.java
index 3124411..5792e5a 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaGaussianMixtureExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaGaussianMixtureExample.java
@@ -22,7 +22,6 @@ import org.apache.spark.api.java.JavaSparkContext;
 
 // $example on$
 import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
 import org.apache.spark.mllib.clustering.GaussianMixture;
 import org.apache.spark.mllib.clustering.GaussianMixtureModel;
 import org.apache.spark.mllib.linalg.Vector;
@@ -39,18 +38,14 @@ public class JavaGaussianMixtureExample {
     // Load and parse data
     String path = "data/mllib/gmm_data.txt";
     JavaRDD<String> data = jsc.textFile(path);
-    JavaRDD<Vector> parsedData = data.map(
-      new Function<String, Vector>() {
-        public Vector call(String s) {
-          String[] sarray = s.trim().split(" ");
-          double[] values = new double[sarray.length];
-          for (int i = 0; i < sarray.length; i++) {
-            values[i] = Double.parseDouble(sarray[i]);
-          }
-          return Vectors.dense(values);
-        }
+    JavaRDD<Vector> parsedData = data.map(s -> {
+      String[] sarray = s.trim().split(" ");
+      double[] values = new double[sarray.length];
+      for (int i = 0; i < sarray.length; i++) {
+        values[i] = Double.parseDouble(sarray[i]);
       }
-    );
+      return Vectors.dense(values);
+    });
     parsedData.cache();
 
     // Cluster the data into two classes using GaussianMixture

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaGradientBoostingClassificationExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaGradientBoostingClassificationExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaGradientBoostingClassificationExample.java
index 213949e..521ee96 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaGradientBoostingClassificationExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaGradientBoostingClassificationExample.java
@@ -27,8 +27,6 @@ import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.PairFunction;
 import org.apache.spark.mllib.regression.LabeledPoint;
 import org.apache.spark.mllib.tree.GradientBoostedTrees;
 import org.apache.spark.mllib.tree.configuration.BoostingStrategy;
@@ -61,24 +59,13 @@ public class JavaGradientBoostingClassificationExample {
     Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>();
     boostingStrategy.treeStrategy().setCategoricalFeaturesInfo(categoricalFeaturesInfo);
 
-    final GradientBoostedTreesModel model =
-      GradientBoostedTrees.train(trainingData, boostingStrategy);
+    GradientBoostedTreesModel model = GradientBoostedTrees.train(trainingData, boostingStrategy);
 
     // Evaluate model on test instances and compute test error
     JavaPairRDD<Double, Double> predictionAndLabel =
-      testData.mapToPair(new PairFunction<LabeledPoint, Double, Double>() {
-        @Override
-        public Tuple2<Double, Double> call(LabeledPoint p) {
-          return new Tuple2<>(model.predict(p.features()), p.label());
-        }
-      });
-    Double testErr =
-      1.0 * predictionAndLabel.filter(new Function<Tuple2<Double, Double>, Boolean>() {
-        @Override
-        public Boolean call(Tuple2<Double, Double> pl) {
-          return !pl._1().equals(pl._2());
-        }
-      }).count() / testData.count();
+      testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label()));
+    double testErr =
+      predictionAndLabel.filter(pl -> !pl._1().equals(pl._2())).count() / (double) testData.count();
     System.out.println("Test Error: " + testErr);
     System.out.println("Learned classification GBT model:\n" + model.toDebugString());
 

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaGradientBoostingRegressionExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaGradientBoostingRegressionExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaGradientBoostingRegressionExample.java
index 78db442..b345d19 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaGradientBoostingRegressionExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaGradientBoostingRegressionExample.java
@@ -24,12 +24,9 @@ import java.util.Map;
 import scala.Tuple2;
 
 import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.PairFunction;
 import org.apache.spark.mllib.regression.LabeledPoint;
 import org.apache.spark.mllib.tree.GradientBoostedTrees;
 import org.apache.spark.mllib.tree.configuration.BoostingStrategy;
@@ -60,30 +57,15 @@ public class JavaGradientBoostingRegressionExample {
     Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>();
     boostingStrategy.treeStrategy().setCategoricalFeaturesInfo(categoricalFeaturesInfo);
 
-    final GradientBoostedTreesModel model =
-      GradientBoostedTrees.train(trainingData, boostingStrategy);
+    GradientBoostedTreesModel model = GradientBoostedTrees.train(trainingData, boostingStrategy);
 
     // Evaluate model on test instances and compute test error
     JavaPairRDD<Double, Double> predictionAndLabel =
-      testData.mapToPair(new PairFunction<LabeledPoint, Double, Double>() {
-        @Override
-        public Tuple2<Double, Double> call(LabeledPoint p) {
-          return new Tuple2<>(model.predict(p.features()), p.label());
-        }
-      });
-    Double testMSE =
-      predictionAndLabel.map(new Function<Tuple2<Double, Double>, Double>() {
-        @Override
-        public Double call(Tuple2<Double, Double> pl) {
-          Double diff = pl._1() - pl._2();
-          return diff * diff;
-        }
-      }).reduce(new Function2<Double, Double, Double>() {
-        @Override
-        public Double call(Double a, Double b) {
-          return a + b;
-        }
-      }) / data.count();
+      testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label()));
+    double testMSE = predictionAndLabel.mapToDouble(pl -> {
+      double diff = pl._1() - pl._2();
+      return diff * diff;
+    }).mean();
     System.out.println("Test Mean Squared Error: " + testMSE);
     System.out.println("Learned regression GBT model:\n" + model.toDebugString());
 

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaIsotonicRegressionExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaIsotonicRegressionExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaIsotonicRegressionExample.java
index a30b5f1..adebafe 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaIsotonicRegressionExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaIsotonicRegressionExample.java
@@ -20,9 +20,6 @@ package org.apache.spark.examples.mllib;
 
 import scala.Tuple2;
 import scala.Tuple3;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.api.java.JavaDoubleRDD;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.JavaRDD;
@@ -42,14 +39,8 @@ public class JavaIsotonicRegressionExample {
       jsc.sc(), "data/mllib/sample_isotonic_regression_libsvm_data.txt").toJavaRDD();
 
     // Create label, feature, weight tuples from input data with weight set to default value 1.0.
-    JavaRDD<Tuple3<Double, Double, Double>> parsedData = data.map(
-      new Function<LabeledPoint, Tuple3<Double, Double, Double>>() {
-        public Tuple3<Double, Double, Double> call(LabeledPoint point) {
-          return new Tuple3<>(new Double(point.label()),
-            new Double(point.features().apply(0)), 1.0);
-        }
-      }
-    );
+    JavaRDD<Tuple3<Double, Double, Double>> parsedData = data.map(point ->
+      new Tuple3<>(point.label(), point.features().apply(0), 1.0));
 
     // Split data into training (60%) and test (40%) sets.
     JavaRDD<Tuple3<Double, Double, Double>>[] splits =
@@ -59,29 +50,17 @@ public class JavaIsotonicRegressionExample {
 
     // Create isotonic regression model from training data.
     // Isotonic parameter defaults to true so it is only shown for demonstration
-    final IsotonicRegressionModel model =
-      new IsotonicRegression().setIsotonic(true).run(training);
+    IsotonicRegressionModel model = new IsotonicRegression().setIsotonic(true).run(training);
 
     // Create tuples of predicted and real labels.
-    JavaPairRDD<Double, Double> predictionAndLabel = test.mapToPair(
-      new PairFunction<Tuple3<Double, Double, Double>, Double, Double>() {
-        @Override
-        public Tuple2<Double, Double> call(Tuple3<Double, Double, Double> point) {
-          Double predictedLabel = model.predict(point._2());
-          return new Tuple2<>(predictedLabel, point._1());
-        }
-      }
-    );
+    JavaPairRDD<Double, Double> predictionAndLabel = test.mapToPair(point ->
+      new Tuple2<>(model.predict(point._2()), point._1()));
 
     // Calculate mean squared error between predicted and real labels.
-    Double meanSquaredError = new JavaDoubleRDD(predictionAndLabel.map(
-      new Function<Tuple2<Double, Double>, Object>() {
-        @Override
-        public Object call(Tuple2<Double, Double> pl) {
-          return Math.pow(pl._1() - pl._2(), 2);
-        }
-      }
-    ).rdd()).mean();
+    double meanSquaredError = predictionAndLabel.mapToDouble(pl -> {
+      double diff = pl._1() - pl._2();
+      return diff * diff;
+    }).mean();
     System.out.println("Mean Squared Error = " + meanSquaredError);
 
     // Save and load model

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeansExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeansExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeansExample.java
index 2d89c76..f172756 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeansExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeansExample.java
@@ -22,7 +22,6 @@ import org.apache.spark.api.java.JavaSparkContext;
 
 // $example on$
 import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
 import org.apache.spark.mllib.clustering.KMeans;
 import org.apache.spark.mllib.clustering.KMeansModel;
 import org.apache.spark.mllib.linalg.Vector;
@@ -39,18 +38,14 @@ public class JavaKMeansExample {
     // Load and parse data
     String path = "data/mllib/kmeans_data.txt";
     JavaRDD<String> data = jsc.textFile(path);
-    JavaRDD<Vector> parsedData = data.map(
-      new Function<String, Vector>() {
-        public Vector call(String s) {
-          String[] sarray = s.split(" ");
-          double[] values = new double[sarray.length];
-          for (int i = 0; i < sarray.length; i++) {
-            values[i] = Double.parseDouble(sarray[i]);
-          }
-          return Vectors.dense(values);
-        }
+    JavaRDD<Vector> parsedData = data.map(s -> {
+      String[] sarray = s.split(" ");
+      double[] values = new double[sarray.length];
+      for (int i = 0; i < sarray.length; i++) {
+        values[i] = Double.parseDouble(sarray[i]);
       }
-    );
+      return Vectors.dense(values);
+    });
     parsedData.cache();
 
     // Cluster the data into two classes using KMeans

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaLBFGSExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLBFGSExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLBFGSExample.java
index f6f91f4..3fdc03a 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLBFGSExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLBFGSExample.java
@@ -23,7 +23,6 @@ import java.util.Arrays;
 import scala.Tuple2;
 
 import org.apache.spark.api.java.*;
-import org.apache.spark.api.java.function.Function;
 import org.apache.spark.mllib.classification.LogisticRegressionModel;
 import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics;
 import org.apache.spark.mllib.linalg.Vector;
@@ -50,12 +49,8 @@ public class JavaLBFGSExample {
     JavaRDD<LabeledPoint> test = data.subtract(trainingInit);
 
     // Append 1 into the training data as intercept.
-    JavaRDD<Tuple2<Object, Vector>> training = data.map(
-      new Function<LabeledPoint, Tuple2<Object, Vector>>() {
-        public Tuple2<Object, Vector> call(LabeledPoint p) {
-          return new Tuple2<Object, Vector>(p.label(), MLUtils.appendBias(p.features()));
-        }
-      });
+    JavaPairRDD<Object, Vector> training = data.mapToPair(p ->
+      new Tuple2<>(p.label(), MLUtils.appendBias(p.features())));
     training.cache();
 
     // Run training algorithm to build the model.
@@ -77,7 +72,7 @@ public class JavaLBFGSExample {
     Vector weightsWithIntercept = result._1();
     double[] loss = result._2();
 
-    final LogisticRegressionModel model = new LogisticRegressionModel(
+    LogisticRegressionModel model = new LogisticRegressionModel(
       Vectors.dense(Arrays.copyOf(weightsWithIntercept.toArray(), weightsWithIntercept.size() - 1)),
       (weightsWithIntercept.toArray())[weightsWithIntercept.size() - 1]);
 
@@ -85,13 +80,8 @@ public class JavaLBFGSExample {
     model.clearThreshold();
 
     // Compute raw scores on the test set.
-    JavaRDD<Tuple2<Object, Object>> scoreAndLabels = test.map(
-      new Function<LabeledPoint, Tuple2<Object, Object>>() {
-        public Tuple2<Object, Object> call(LabeledPoint p) {
-          Double score = model.predict(p.features());
-          return new Tuple2<Object, Object>(score, p.label());
-        }
-      });
+    JavaPairRDD<Object, Object> scoreAndLabels = test.mapToPair(p ->
+      new Tuple2<>(model.predict(p.features()), p.label()));
 
     // Get evaluation metrics.
     BinaryClassificationMetrics metrics =
@@ -99,8 +89,9 @@ public class JavaLBFGSExample {
     double auROC = metrics.areaUnderROC();
 
     System.out.println("Loss of each step in training process");
-    for (double l : loss)
+    for (double l : loss) {
       System.out.println(l);
+    }
     System.out.println("Area under ROC = " + auROC);
     // $example off$
 

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaLatentDirichletAllocationExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLatentDirichletAllocationExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLatentDirichletAllocationExample.java
index 578564e..887edf8 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLatentDirichletAllocationExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLatentDirichletAllocationExample.java
@@ -25,7 +25,6 @@ import scala.Tuple2;
 
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
 import org.apache.spark.mllib.clustering.DistributedLDAModel;
 import org.apache.spark.mllib.clustering.LDA;
 import org.apache.spark.mllib.clustering.LDAModel;
@@ -44,28 +43,17 @@ public class JavaLatentDirichletAllocationExample {
     // Load and parse the data
     String path = "data/mllib/sample_lda_data.txt";
     JavaRDD<String> data = jsc.textFile(path);
-    JavaRDD<Vector> parsedData = data.map(
-      new Function<String, Vector>() {
-        public Vector call(String s) {
-          String[] sarray = s.trim().split(" ");
-          double[] values = new double[sarray.length];
-          for (int i = 0; i < sarray.length; i++) {
-            values[i] = Double.parseDouble(sarray[i]);
-          }
-          return Vectors.dense(values);
-        }
+    JavaRDD<Vector> parsedData = data.map(s -> {
+      String[] sarray = s.trim().split(" ");
+      double[] values = new double[sarray.length];
+      for (int i = 0; i < sarray.length; i++) {
+        values[i] = Double.parseDouble(sarray[i]);
       }
-    );
+      return Vectors.dense(values);
+    });
     // Index documents with unique IDs
     JavaPairRDD<Long, Vector> corpus =
-      JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map(
-        new Function<Tuple2<Vector, Long>, Tuple2<Long, Vector>>() {
-          public Tuple2<Long, Vector> call(Tuple2<Vector, Long> doc_id) {
-            return doc_id.swap();
-          }
-        }
-      )
-    );
+      JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map(Tuple2::swap));
     corpus.cache();
 
     // Cluster the documents into three topics using LDA

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaLinearRegressionWithSGDExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLinearRegressionWithSGDExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLinearRegressionWithSGDExample.java
index 9ca9a78..324a781 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLinearRegressionWithSGDExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLinearRegressionWithSGDExample.java
@@ -23,9 +23,8 @@ import org.apache.spark.api.java.JavaSparkContext;
 // $example on$
 import scala.Tuple2;
 
-import org.apache.spark.api.java.JavaDoubleRDD;
+import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
 import org.apache.spark.mllib.linalg.Vectors;
 import org.apache.spark.mllib.regression.LabeledPoint;
 import org.apache.spark.mllib.regression.LinearRegressionModel;
@@ -44,43 +43,31 @@ public class JavaLinearRegressionWithSGDExample {
     // Load and parse the data
     String path = "data/mllib/ridge-data/lpsa.data";
     JavaRDD<String> data = sc.textFile(path);
-    JavaRDD<LabeledPoint> parsedData = data.map(
-      new Function<String, LabeledPoint>() {
-        public LabeledPoint call(String line) {
-          String[] parts = line.split(",");
-          String[] features = parts[1].split(" ");
-          double[] v = new double[features.length];
-          for (int i = 0; i < features.length - 1; i++) {
-            v[i] = Double.parseDouble(features[i]);
-          }
-          return new LabeledPoint(Double.parseDouble(parts[0]), Vectors.dense(v));
-        }
+    JavaRDD<LabeledPoint> parsedData = data.map(line -> {
+      String[] parts = line.split(",");
+      String[] features = parts[1].split(" ");
+      double[] v = new double[features.length];
+      for (int i = 0; i < features.length - 1; i++) {
+        v[i] = Double.parseDouble(features[i]);
       }
-    );
+      return new LabeledPoint(Double.parseDouble(parts[0]), Vectors.dense(v));
+    });
     parsedData.cache();
 
     // Building the model
     int numIterations = 100;
     double stepSize = 0.00000001;
-    final LinearRegressionModel model =
+    LinearRegressionModel model =
       LinearRegressionWithSGD.train(JavaRDD.toRDD(parsedData), numIterations, stepSize);
 
     // Evaluate model on training examples and compute training error
-    JavaRDD<Tuple2<Double, Double>> valuesAndPreds = parsedData.map(
-      new Function<LabeledPoint, Tuple2<Double, Double>>() {
-        public Tuple2<Double, Double> call(LabeledPoint point) {
-          double prediction = model.predict(point.features());
-          return new Tuple2<>(prediction, point.label());
-        }
-      }
-    );
-    double MSE = new JavaDoubleRDD(valuesAndPreds.map(
-      new Function<Tuple2<Double, Double>, Object>() {
-        public Object call(Tuple2<Double, Double> pair) {
-          return Math.pow(pair._1() - pair._2(), 2.0);
-        }
-      }
-    ).rdd()).mean();
+    JavaPairRDD<Double, Double> valuesAndPreds = parsedData.mapToPair(point ->
+      new Tuple2<>(model.predict(point.features()), point.label()));
+
+    double MSE = valuesAndPreds.mapToDouble(pair -> {
+      double diff = pair._1() - pair._2();
+      return diff * diff;
+    }).mean();
     System.out.println("training Mean Squared Error = " + MSE);
 
     // Save and load model

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaLogisticRegressionWithLBFGSExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLogisticRegressionWithLBFGSExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLogisticRegressionWithLBFGSExample.java
index 7fc371e..26b8a6e 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLogisticRegressionWithLBFGSExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLogisticRegressionWithLBFGSExample.java
@@ -23,8 +23,8 @@ import org.apache.spark.SparkContext;
 // $example on$
 import scala.Tuple2;
 
+import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
 import org.apache.spark.mllib.classification.LogisticRegressionModel;
 import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS;
 import org.apache.spark.mllib.evaluation.MulticlassMetrics;
@@ -49,19 +49,13 @@ public class JavaLogisticRegressionWithLBFGSExample {
     JavaRDD<LabeledPoint> test = splits[1];
 
     // Run training algorithm to build the model.
-    final LogisticRegressionModel model = new LogisticRegressionWithLBFGS()
+    LogisticRegressionModel model = new LogisticRegressionWithLBFGS()
       .setNumClasses(10)
       .run(training.rdd());
 
     // Compute raw scores on the test set.
-    JavaRDD<Tuple2<Object, Object>> predictionAndLabels = test.map(
-      new Function<LabeledPoint, Tuple2<Object, Object>>() {
-        public Tuple2<Object, Object> call(LabeledPoint p) {
-          Double prediction = model.predict(p.features());
-          return new Tuple2<Object, Object>(prediction, p.label());
-        }
-      }
-    );
+    JavaPairRDD<Object, Object> predictionAndLabels = test.mapToPair(p ->
+      new Tuple2<>(model.predict(p.features()), p.label()));
 
     // Get evaluation metrics.
     MulticlassMetrics metrics = new MulticlassMetrics(predictionAndLabels.rdd());

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaMulticlassClassificationMetricsExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaMulticlassClassificationMetricsExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaMulticlassClassificationMetricsExample.java
index 2d12bdd..0367038 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaMulticlassClassificationMetricsExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaMulticlassClassificationMetricsExample.java
@@ -21,7 +21,6 @@ package org.apache.spark.examples.mllib;
 import scala.Tuple2;
 
 import org.apache.spark.api.java.*;
-import org.apache.spark.api.java.function.Function;
 import org.apache.spark.mllib.classification.LogisticRegressionModel;
 import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS;
 import org.apache.spark.mllib.evaluation.MulticlassMetrics;
@@ -46,19 +45,13 @@ public class JavaMulticlassClassificationMetricsExample {
     JavaRDD<LabeledPoint> test = splits[1];
 
     // Run training algorithm to build the model.
-    final LogisticRegressionModel model = new LogisticRegressionWithLBFGS()
+    LogisticRegressionModel model = new LogisticRegressionWithLBFGS()
       .setNumClasses(3)
       .run(training.rdd());
 
     // Compute raw scores on the test set.
-    JavaRDD<Tuple2<Object, Object>> predictionAndLabels = test.map(
-      new Function<LabeledPoint, Tuple2<Object, Object>>() {
-        public Tuple2<Object, Object> call(LabeledPoint p) {
-          Double prediction = model.predict(p.features());
-          return new Tuple2<Object, Object>(prediction, p.label());
-        }
-      }
-    );
+    JavaPairRDD<Object, Object> predictionAndLabels = test.mapToPair(p ->
+      new Tuple2<>(model.predict(p.features()), p.label()));
 
     // Get evaluation metrics.
     MulticlassMetrics metrics = new MulticlassMetrics(predictionAndLabels.rdd());

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaNaiveBayesExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaNaiveBayesExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaNaiveBayesExample.java
index f4ec04b..d80dbe8 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaNaiveBayesExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaNaiveBayesExample.java
@@ -19,8 +19,6 @@ package org.apache.spark.examples.mllib;
 
 // $example on$
 import scala.Tuple2;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.PairFunction;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -41,20 +39,11 @@ public class JavaNaiveBayesExample {
     JavaRDD<LabeledPoint>[] tmp = inputData.randomSplit(new double[]{0.6, 0.4});
     JavaRDD<LabeledPoint> training = tmp[0]; // training set
     JavaRDD<LabeledPoint> test = tmp[1]; // test set
-    final NaiveBayesModel model = NaiveBayes.train(training.rdd(), 1.0);
+    NaiveBayesModel model = NaiveBayes.train(training.rdd(), 1.0);
     JavaPairRDD<Double, Double> predictionAndLabel =
-      test.mapToPair(new PairFunction<LabeledPoint, Double, Double>() {
-        @Override
-        public Tuple2<Double, Double> call(LabeledPoint p) {
-          return new Tuple2<>(model.predict(p.features()), p.label());
-        }
-      });
-    double accuracy = predictionAndLabel.filter(new Function<Tuple2<Double, Double>, Boolean>() {
-      @Override
-      public Boolean call(Tuple2<Double, Double> pl) {
-        return pl._1().equals(pl._2());
-      }
-    }).count() / (double) test.count();
+      test.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label()));
+    double accuracy =
+      predictionAndLabel.filter(pl -> pl._1().equals(pl._2())).count() / (double) test.count();
 
     // Save and load model
     model.save(jsc.sc(), "target/tmp/myNaiveBayesModel");

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaPowerIterationClusteringExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaPowerIterationClusteringExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaPowerIterationClusteringExample.java
index 91c3bd7..5155f18 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaPowerIterationClusteringExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaPowerIterationClusteringExample.java
@@ -17,9 +17,9 @@
 
 package org.apache.spark.examples.mllib;
 
-import scala.Tuple3;
+import java.util.Arrays;
 
-import com.google.common.collect.Lists;
+import scala.Tuple3;
 
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaRDD;
@@ -39,7 +39,7 @@ public class JavaPowerIterationClusteringExample {
 
     @SuppressWarnings("unchecked")
     // $example on$
-    JavaRDD<Tuple3<Long, Long, Double>> similarities = sc.parallelize(Lists.newArrayList(
+    JavaRDD<Tuple3<Long, Long, Double>> similarities = sc.parallelize(Arrays.asList(
       new Tuple3<>(0L, 1L, 0.9),
       new Tuple3<>(1L, 2L, 0.9),
       new Tuple3<>(2L, 3L, 0.9),

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaRandomForestClassificationExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRandomForestClassificationExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRandomForestClassificationExample.java
index 24af5d0..6998ce2 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRandomForestClassificationExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRandomForestClassificationExample.java
@@ -19,6 +19,7 @@ package org.apache.spark.examples.mllib;
 
 // $example on$
 import java.util.HashMap;
+import java.util.Map;
 
 import scala.Tuple2;
 
@@ -26,8 +27,6 @@ import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.PairFunction;
 import org.apache.spark.mllib.regression.LabeledPoint;
 import org.apache.spark.mllib.tree.RandomForest;
 import org.apache.spark.mllib.tree.model.RandomForestModel;
@@ -50,7 +49,7 @@ public class JavaRandomForestClassificationExample {
     // Train a RandomForest model.
     // Empty categoricalFeaturesInfo indicates all features are continuous.
     Integer numClasses = 2;
-    HashMap<Integer, Integer> categoricalFeaturesInfo = new HashMap<>();
+    Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>();
     Integer numTrees = 3; // Use more in practice.
     String featureSubsetStrategy = "auto"; // Let the algorithm choose.
     String impurity = "gini";
@@ -58,25 +57,15 @@ public class JavaRandomForestClassificationExample {
     Integer maxBins = 32;
     Integer seed = 12345;
 
-    final RandomForestModel model = RandomForest.trainClassifier(trainingData, numClasses,
+    RandomForestModel model = RandomForest.trainClassifier(trainingData, numClasses,
       categoricalFeaturesInfo, numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins,
       seed);
 
     // Evaluate model on test instances and compute test error
     JavaPairRDD<Double, Double> predictionAndLabel =
-      testData.mapToPair(new PairFunction<LabeledPoint, Double, Double>() {
-        @Override
-        public Tuple2<Double, Double> call(LabeledPoint p) {
-          return new Tuple2<>(model.predict(p.features()), p.label());
-        }
-      });
-    Double testErr =
-      1.0 * predictionAndLabel.filter(new Function<Tuple2<Double, Double>, Boolean>() {
-        @Override
-        public Boolean call(Tuple2<Double, Double> pl) {
-          return !pl._1().equals(pl._2());
-        }
-      }).count() / testData.count();
+      testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label()));
+    double testErr =
+      predictionAndLabel.filter(pl -> !pl._1().equals(pl._2())).count() / (double) testData.count();
     System.out.println("Test Error: " + testErr);
     System.out.println("Learned classification forest model:\n" + model.toDebugString());
 

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaRandomForestRegressionExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRandomForestRegressionExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRandomForestRegressionExample.java
index afa9045..4a0f55f 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRandomForestRegressionExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRandomForestRegressionExample.java
@@ -23,12 +23,9 @@ import java.util.Map;
 
 import scala.Tuple2;
 
-import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.PairFunction;
 import org.apache.spark.mllib.regression.LabeledPoint;
 import org.apache.spark.mllib.tree.RandomForest;
 import org.apache.spark.mllib.tree.model.RandomForestModel;
@@ -52,37 +49,23 @@ public class JavaRandomForestRegressionExample {
     // Set parameters.
     // Empty categoricalFeaturesInfo indicates all features are continuous.
     Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>();
-    Integer numTrees = 3; // Use more in practice.
+    int numTrees = 3; // Use more in practice.
     String featureSubsetStrategy = "auto"; // Let the algorithm choose.
     String impurity = "variance";
-    Integer maxDepth = 4;
-    Integer maxBins = 32;
-    Integer seed = 12345;
+    int maxDepth = 4;
+    int maxBins = 32;
+    int seed = 12345;
     // Train a RandomForest model.
-    final RandomForestModel model = RandomForest.trainRegressor(trainingData,
+    RandomForestModel model = RandomForest.trainRegressor(trainingData,
       categoricalFeaturesInfo, numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins, seed);
 
     // Evaluate model on test instances and compute test error
     JavaPairRDD<Double, Double> predictionAndLabel =
-      testData.mapToPair(new PairFunction<LabeledPoint, Double, Double>() {
-        @Override
-        public Tuple2<Double, Double> call(LabeledPoint p) {
-          return new Tuple2<>(model.predict(p.features()), p.label());
-        }
-      });
-    Double testMSE =
-      predictionAndLabel.map(new Function<Tuple2<Double, Double>, Double>() {
-        @Override
-        public Double call(Tuple2<Double, Double> pl) {
-          Double diff = pl._1() - pl._2();
-          return diff * diff;
-        }
-      }).reduce(new Function2<Double, Double, Double>() {
-        @Override
-        public Double call(Double a, Double b) {
-          return a + b;
-        }
-      }) / testData.count();
+      testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label()));
+    double testMSE = predictionAndLabel.mapToDouble(pl -> {
+      double diff = pl._1() - pl._2();
+      return diff * diff;
+    }).mean();
     System.out.println("Test Mean Squared Error: " + testMSE);
     System.out.println("Learned regression forest model:\n" + model.toDebugString());
 

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java
index 54dfc40..bd49f05 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java
@@ -23,7 +23,6 @@ import java.util.*;
 import scala.Tuple2;
 
 import org.apache.spark.api.java.*;
-import org.apache.spark.api.java.function.Function;
 import org.apache.spark.mllib.evaluation.RegressionMetrics;
 import org.apache.spark.mllib.evaluation.RankingMetrics;
 import org.apache.spark.mllib.recommendation.ALS;
@@ -39,93 +38,61 @@ public class JavaRankingMetricsExample {
     // $example on$
     String path = "data/mllib/sample_movielens_data.txt";
     JavaRDD<String> data = sc.textFile(path);
-    JavaRDD<Rating> ratings = data.map(
-      new Function<String, Rating>() {
-        @Override
-        public Rating call(String line) {
-          String[] parts = line.split("::");
-            return new Rating(Integer.parseInt(parts[0]), Integer.parseInt(parts[1]), Double
-              .parseDouble(parts[2]) - 2.5);
-        }
-      }
-    );
+    JavaRDD<Rating> ratings = data.map(line -> {
+        String[] parts = line.split("::");
+        return new Rating(Integer.parseInt(parts[0]), Integer.parseInt(parts[1]), Double
+            .parseDouble(parts[2]) - 2.5);
+      });
     ratings.cache();
 
     // Train an ALS model
-    final MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), 10, 10, 0.01);
+    MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), 10, 10, 0.01);
 
     // Get top 10 recommendations for every user and scale ratings from 0 to 1
     JavaRDD<Tuple2<Object, Rating[]>> userRecs = model.recommendProductsForUsers(10).toJavaRDD();
-    JavaRDD<Tuple2<Object, Rating[]>> userRecsScaled = userRecs.map(
-      new Function<Tuple2<Object, Rating[]>, Tuple2<Object, Rating[]>>() {
-        @Override
-        public Tuple2<Object, Rating[]> call(Tuple2<Object, Rating[]> t) {
-          Rating[] scaledRatings = new Rating[t._2().length];
-          for (int i = 0; i < scaledRatings.length; i++) {
-            double newRating = Math.max(Math.min(t._2()[i].rating(), 1.0), 0.0);
-            scaledRatings[i] = new Rating(t._2()[i].user(), t._2()[i].product(), newRating);
-          }
-          return new Tuple2<>(t._1(), scaledRatings);
+    JavaRDD<Tuple2<Object, Rating[]>> userRecsScaled = userRecs.map(t -> {
+        Rating[] scaledRatings = new Rating[t._2().length];
+        for (int i = 0; i < scaledRatings.length; i++) {
+          double newRating = Math.max(Math.min(t._2()[i].rating(), 1.0), 0.0);
+          scaledRatings[i] = new Rating(t._2()[i].user(), t._2()[i].product(), newRating);
         }
-      }
-    );
+        return new Tuple2<>(t._1(), scaledRatings);
+      });
     JavaPairRDD<Object, Rating[]> userRecommended = JavaPairRDD.fromJavaRDD(userRecsScaled);
 
     // Map ratings to 1 or 0, 1 indicating a movie that should be recommended
-    JavaRDD<Rating> binarizedRatings = ratings.map(
-      new Function<Rating, Rating>() {
-        @Override
-        public Rating call(Rating r) {
-          double binaryRating;
-          if (r.rating() > 0.0) {
-            binaryRating = 1.0;
-          } else {
-            binaryRating = 0.0;
-          }
-          return new Rating(r.user(), r.product(), binaryRating);
+    JavaRDD<Rating> binarizedRatings = ratings.map(r -> {
+        double binaryRating;
+        if (r.rating() > 0.0) {
+          binaryRating = 1.0;
+        } else {
+          binaryRating = 0.0;
         }
-      }
-    );
+        return new Rating(r.user(), r.product(), binaryRating);
+      });
 
     // Group ratings by common user
-    JavaPairRDD<Object, Iterable<Rating>> userMovies = binarizedRatings.groupBy(
-      new Function<Rating, Object>() {
-        @Override
-        public Object call(Rating r) {
-          return r.user();
-        }
-      }
-    );
+    JavaPairRDD<Object, Iterable<Rating>> userMovies = binarizedRatings.groupBy(Rating::user);
 
     // Get true relevant documents from all user ratings
-    JavaPairRDD<Object, List<Integer>> userMoviesList = userMovies.mapValues(
-      new Function<Iterable<Rating>, List<Integer>>() {
-        @Override
-        public List<Integer> call(Iterable<Rating> docs) {
-          List<Integer> products = new ArrayList<>();
-          for (Rating r : docs) {
-            if (r.rating() > 0.0) {
-              products.add(r.product());
-            }
+    JavaPairRDD<Object, List<Integer>> userMoviesList = userMovies.mapValues(docs -> {
+        List<Integer> products = new ArrayList<>();
+        for (Rating r : docs) {
+          if (r.rating() > 0.0) {
+            products.add(r.product());
           }
-          return products;
         }
-      }
-    );
+        return products;
+      });
 
     // Extract the product id from each recommendation
-    JavaPairRDD<Object, List<Integer>> userRecommendedList = userRecommended.mapValues(
-      new Function<Rating[], List<Integer>>() {
-        @Override
-        public List<Integer> call(Rating[] docs) {
-          List<Integer> products = new ArrayList<>();
-          for (Rating r : docs) {
-            products.add(r.product());
-          }
-          return products;
+    JavaPairRDD<Object, List<Integer>> userRecommendedList = userRecommended.mapValues(docs -> {
+        List<Integer> products = new ArrayList<>();
+        for (Rating r : docs) {
+          products.add(r.product());
         }
-      }
-    );
+        return products;
+      });
     JavaRDD<Tuple2<List<Integer>, List<Integer>>> relevantDocs = userMoviesList.join(
       userRecommendedList).values();
 
@@ -143,33 +110,15 @@ public class JavaRankingMetricsExample {
     System.out.format("Mean average precision = %f\n", metrics.meanAveragePrecision());
 
     // Evaluate the model using numerical ratings and regression metrics
-    JavaRDD<Tuple2<Object, Object>> userProducts = ratings.map(
-      new Function<Rating, Tuple2<Object, Object>>() {
-        @Override
-        public Tuple2<Object, Object> call(Rating r) {
-          return new Tuple2<Object, Object>(r.user(), r.product());
-        }
-      }
-    );
+    JavaRDD<Tuple2<Object, Object>> userProducts =
+        ratings.map(r -> new Tuple2<>(r.user(), r.product()));
+
     JavaPairRDD<Tuple2<Integer, Integer>, Object> predictions = JavaPairRDD.fromJavaRDD(
-      model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD().map(
-        new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Object>>() {
-          @Override
-          public Tuple2<Tuple2<Integer, Integer>, Object> call(Rating r) {
-            return new Tuple2<Tuple2<Integer, Integer>, Object>(
-              new Tuple2<>(r.user(), r.product()), r.rating());
-          }
-        }
-      ));
+      model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD().map(r ->
+        new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating())));
     JavaRDD<Tuple2<Object, Object>> ratesAndPreds =
-      JavaPairRDD.fromJavaRDD(ratings.map(
-        new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Object>>() {
-          @Override
-          public Tuple2<Tuple2<Integer, Integer>, Object> call(Rating r) {
-            return new Tuple2<Tuple2<Integer, Integer>, Object>(
-              new Tuple2<>(r.user(), r.product()), r.rating());
-          }
-        }
+      JavaPairRDD.fromJavaRDD(ratings.map(r ->
+        new Tuple2<Tuple2<Integer, Integer>, Object>(new Tuple2<>(r.user(), r.product()), r.rating())
       )).join(predictions).values();
 
     // Create regression metrics object


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org