You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/02/16 12:32:59 UTC

[8/8] spark git commit: [SPARK-19550][BUILD][CORE][WIP] Remove Java 7 support

[SPARK-19550][BUILD][CORE][WIP] Remove Java 7 support

- Move external/java8-tests tests into core, streaming, sql and remove
- Remove MaxPermGen and related options
- Fix some reflection / TODOs around Java 8+ methods
- Update doc references to 1.7/1.8 differences
- Remove Java 7/8 related build profiles
- Update some plugins for better Java 8 compatibility
- Fix a few Java-related warnings

For the future:

- Update Java 8 examples to fully use Java 8
- Update Java tests to use lambdas for simplicity
- Update Java internal implementations to use lambdas

## How was this patch tested?

Existing tests

Author: Sean Owen <so...@cloudera.com>

Closes #16871 from srowen/SPARK-19493.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0e240549
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0e240549
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0e240549

Branch: refs/heads/master
Commit: 0e2405490f2056728d1353abbac6f3ea177ae533
Parents: 3871d94
Author: Sean Owen <so...@cloudera.com>
Authored: Thu Feb 16 12:32:45 2017 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Thu Feb 16 12:32:45 2017 +0000

----------------------------------------------------------------------
 assembly/pom.xml                                |    1 +
 build/mvn                                       |    8 +-
 build/sbt-launch-lib.bash                       |    2 +-
 .../spark/network/client/TransportClient.java   |  111 +-
 .../network/crypto/AuthClientBootstrap.java     |   16 +-
 .../spark/network/crypto/AuthRpcHandler.java    |    3 -
 .../network/server/TransportRequestHandler.java |   27 +-
 .../spark/network/crypto/AuthEngineSuite.java   |    2 -
 .../shuffle/ExternalShuffleBlockHandler.java    |    8 +-
 .../shuffle/ExternalShuffleBlockResolver.java   |    7 +-
 .../network/shuffle/ExternalShuffleClient.java  |   21 +-
 .../network/shuffle/RetryingBlockFetcher.java   |    9 +-
 common/sketch/pom.xml                           |    2 +
 common/unsafe/pom.xml                           |    2 +
 .../java/org/apache/spark/unsafe/Platform.java  |    9 +-
 .../spark/unsafe/types/CalendarInterval.java    |   88 +-
 .../org/apache/spark/api/java/Optional.java     |    7 +-
 .../api/java/function/CoGroupFunction.java      |    1 +
 .../java/function/DoubleFlatMapFunction.java    |    1 +
 .../spark/api/java/function/DoubleFunction.java |    1 +
 .../spark/api/java/function/FilterFunction.java |    1 +
 .../api/java/function/FlatMapFunction.java      |    1 +
 .../api/java/function/FlatMapFunction2.java     |    1 +
 .../java/function/FlatMapGroupsFunction.java    |    1 +
 .../api/java/function/ForeachFunction.java      |    1 +
 .../java/function/ForeachPartitionFunction.java |    1 +
 .../spark/api/java/function/Function.java       |    1 +
 .../spark/api/java/function/Function0.java      |    1 +
 .../spark/api/java/function/Function2.java      |    1 +
 .../spark/api/java/function/Function3.java      |    1 +
 .../spark/api/java/function/Function4.java      |    1 +
 .../spark/api/java/function/MapFunction.java    |    1 +
 .../api/java/function/MapGroupsFunction.java    |    1 +
 .../java/function/MapPartitionsFunction.java    |    1 +
 .../api/java/function/PairFlatMapFunction.java  |    1 +
 .../spark/api/java/function/PairFunction.java   |    1 +
 .../spark/api/java/function/ReduceFunction.java |    1 +
 .../spark/api/java/function/VoidFunction.java   |    1 +
 .../spark/api/java/function/VoidFunction2.java  |    1 +
 .../unsafe/sort/UnsafeExternalSorter.java       |    9 +-
 .../unsafe/sort/UnsafeSorterSpillMerger.java    |   28 +-
 .../scala/org/apache/spark/SparkContext.scala   |    3 -
 .../spark/launcher/WorkerCommandBuilder.scala   |    1 -
 .../scala/org/apache/spark/util/Utils.scala     |   44 +-
 .../java/org/apache/spark/JavaAPISuite.java     | 1836 ----------------
 .../test/org/apache/spark/Java8RDDAPISuite.java |  356 ++++
 .../test/org/apache/spark/JavaAPISuite.java     | 1842 ++++++++++++++++
 .../org/apache/spark/util/UtilsSuite.scala      |    6 +-
 dev/appveyor-install-dependencies.ps1           |    2 +-
 dev/create-release/release-build.sh             |    1 -
 dev/make-distribution.sh                        |    2 +-
 dev/mima                                        |    1 -
 dev/run-tests.py                                |    3 -
 dev/test-dependencies.sh                        |    2 +-
 docs/building-spark.md                          |   32 +-
 docs/index.md                                   |    6 +-
 docs/mllib-linear-methods.md                    |    2 +-
 docs/mllib-statistics.md                        |    7 +-
 docs/programming-guide.md                       |   11 +-
 docs/quick-start.md                             |    9 +-
 docs/streaming-custom-receivers.md              |   10 +-
 docs/streaming-kafka-0-10-integration.md        |   62 +-
 docs/streaming-kafka-0-8-integration.md         |   41 +-
 docs/streaming-programming-guide.md             |  219 +-
 docs/structured-streaming-programming-guide.md  |   38 +-
 .../spark/examples/ml/JavaTokenizerExample.java |    4 +-
 .../examples/sql/JavaSQLDataSourceExample.java  |    2 +-
 external/java8-tests/README.md                  |   22 -
 external/java8-tests/pom.xml                    |  132 --
 .../apache/spark/java8/Java8RDDAPISuite.java    |  356 ----
 .../spark/java8/dstream/Java8APISuite.java      |  882 --------
 .../java8/sql/Java8DatasetAggregatorSuite.java  |   62 -
 .../src/test/resources/log4j.properties         |   27 -
 .../org/apache/spark/java8/JDK8ScalaSuite.scala |   30 -
 .../apache/spark/sql/kafka010/KafkaSource.scala |    3 +-
 .../spark/streaming/kafka010/KafkaRDD.scala     |    7 +-
 .../spark/launcher/AbstractCommandBuilder.java  |    7 +-
 .../spark/launcher/ChildProcAppHandle.java      |   10 +-
 .../spark/launcher/CommandBuilderUtils.java     |   21 -
 .../apache/spark/launcher/LauncherServer.java   |    7 +-
 .../apache/spark/launcher/OutputRedirector.java |    7 +-
 .../apache/spark/launcher/SparkAppHandle.java   |    3 -
 .../launcher/SparkClassCommandBuilder.java      |   68 +-
 .../launcher/SparkSubmitCommandBuilder.java     |  101 +-
 .../launcher/CommandBuilderUtilsSuite.java      |   36 -
 .../SparkSubmitCommandBuilderSuite.java         |    8 +-
 launcher/src/test/resources/spark-defaults.conf |    2 +-
 pom.xml                                         |  171 +-
 project/SparkBuild.scala                        |   41 +-
 .../org/apache/spark/deploy/yarn/Client.scala   |    1 -
 .../spark/deploy/yarn/ExecutorRunnable.scala    |    2 -
 .../launcher/YarnCommandBuilderUtils.scala      |   12 -
 .../org/apache/spark/sql/types/Decimal.scala    |    8 +-
 .../FlatMapGroupsWithStateFunction.java         |    4 +-
 .../function/MapGroupsWithStateFunction.java    |    4 +-
 .../scala/org/apache/spark/sql/Dataset.scala    |    2 +-
 .../spark/sql/KeyValueGroupedDataset.scala      |    2 +-
 .../scala/org/apache/spark/sql/SQLContext.scala |   11 -
 .../org/apache/spark/sql/SparkSession.scala     |   11 -
 .../spark/sql/Java8DatasetAggregatorSuite.java  |   61 +
 .../org/apache/spark/sql/JavaDatasetSuite.java  |    2 +-
 sql/hive/pom.xml                                |    3 +-
 .../execution/ScriptTransformationExec.scala    |   10 +-
 .../apache/spark/streaming/JavaAPISuite.java    | 2000 -----------------
 .../apache/spark/streaming/Java8APISuite.java   |  887 ++++++++
 .../apache/spark/streaming/JavaAPISuite.java    | 2008 ++++++++++++++++++
 106 files changed, 5641 insertions(+), 6314 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/assembly/pom.xml
----------------------------------------------------------------------
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 53f1879..9d8607d 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -187,6 +187,7 @@
           <plugin>
             <groupId>org.apache.maven.plugins</groupId>
             <artifactId>maven-assembly-plugin</artifactId>
+            <version>3.0.0</version>
             <executions>
               <execution>
                 <id>dist</id>

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/build/mvn
----------------------------------------------------------------------
diff --git a/build/mvn b/build/mvn
index 866bad8..1e393c3 100755
--- a/build/mvn
+++ b/build/mvn
@@ -22,7 +22,7 @@ _DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
 # Preserve the calling directory
 _CALLING_DIR="$(pwd)"
 # Options used during compilation
-_COMPILE_JVM_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"
+_COMPILE_JVM_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
 
 # Installs any application tarball given a URL, the expected tarball name,
 # and, optionally, a checkable binary path to determine if the binary has
@@ -141,13 +141,9 @@ cd "${_CALLING_DIR}"
 # Now that zinc is ensured to be installed, check its status and, if its
 # not running or just installed, start it
 if [ -n "${ZINC_INSTALL_FLAG}" -o -z "`"${ZINC_BIN}" -status -port ${ZINC_PORT}`" ]; then
-  ZINC_JAVA_HOME=
-  if [ -n "$JAVA_7_HOME" ]; then
-    ZINC_JAVA_HOME="env JAVA_HOME=$JAVA_7_HOME"
-  fi
   export ZINC_OPTS=${ZINC_OPTS:-"$_COMPILE_JVM_OPTS"}
   "${ZINC_BIN}" -shutdown -port ${ZINC_PORT}
-  $ZINC_JAVA_HOME "${ZINC_BIN}" -start -port ${ZINC_PORT} \
+  "${ZINC_BIN}" -start -port ${ZINC_PORT} \
     -scala-compiler "${SCALA_COMPILER}" \
     -scala-library "${SCALA_LIBRARY}" &>/dev/null
 fi

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/build/sbt-launch-lib.bash
----------------------------------------------------------------------
diff --git a/build/sbt-launch-lib.bash b/build/sbt-launch-lib.bash
index 615f848..4732669 100755
--- a/build/sbt-launch-lib.bash
+++ b/build/sbt-launch-lib.bash
@@ -117,7 +117,7 @@ get_mem_opts () {
   (( $perm < 4096 )) || perm=4096
   local codecache=$(( $perm / 2 ))
 
-  echo "-Xms${mem}m -Xmx${mem}m -XX:MaxPermSize=${perm}m -XX:ReservedCodeCacheSize=${codecache}m"
+  echo "-Xms${mem}m -Xmx${mem}m -XX:ReservedCodeCacheSize=${codecache}m"
 }
 
 require_arg () {

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
index 7e7d78d..a6f527c 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
@@ -32,8 +32,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.util.concurrent.SettableFuture;
 import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -133,40 +131,36 @@ public class TransportClient implements Closeable {
    */
   public void fetchChunk(
       long streamId,
-      final int chunkIndex,
-      final ChunkReceivedCallback callback) {
-    final long startTime = System.currentTimeMillis();
+      int chunkIndex,
+      ChunkReceivedCallback callback) {
+    long startTime = System.currentTimeMillis();
     if (logger.isDebugEnabled()) {
       logger.debug("Sending fetch chunk request {} to {}", chunkIndex, getRemoteAddress(channel));
     }
 
-    final StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);
+    StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);
     handler.addFetchRequest(streamChunkId, callback);
 
-    channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener(
-      new ChannelFutureListener() {
-        @Override
-        public void operationComplete(ChannelFuture future) throws Exception {
-          if (future.isSuccess()) {
-            long timeTaken = System.currentTimeMillis() - startTime;
-            if (logger.isTraceEnabled()) {
-              logger.trace("Sending request {} to {} took {} ms", streamChunkId,
-                getRemoteAddress(channel), timeTaken);
-            }
-          } else {
-            String errorMsg = String.format("Failed to send request %s to %s: %s", streamChunkId,
-              getRemoteAddress(channel), future.cause());
-            logger.error(errorMsg, future.cause());
-            handler.removeFetchRequest(streamChunkId);
-            channel.close();
-            try {
-              callback.onFailure(chunkIndex, new IOException(errorMsg, future.cause()));
-            } catch (Exception e) {
-              logger.error("Uncaught exception in RPC response callback handler!", e);
-            }
-          }
+    channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener(future -> {
+      if (future.isSuccess()) {
+        long timeTaken = System.currentTimeMillis() - startTime;
+        if (logger.isTraceEnabled()) {
+          logger.trace("Sending request {} to {} took {} ms", streamChunkId,
+            getRemoteAddress(channel), timeTaken);
         }
-      });
+      } else {
+        String errorMsg = String.format("Failed to send request %s to %s: %s", streamChunkId,
+          getRemoteAddress(channel), future.cause());
+        logger.error(errorMsg, future.cause());
+        handler.removeFetchRequest(streamChunkId);
+        channel.close();
+        try {
+          callback.onFailure(chunkIndex, new IOException(errorMsg, future.cause()));
+        } catch (Exception e) {
+          logger.error("Uncaught exception in RPC response callback handler!", e);
+        }
+      }
+    });
   }
 
   /**
@@ -175,8 +169,8 @@ public class TransportClient implements Closeable {
    * @param streamId The stream to fetch.
    * @param callback Object to call with the stream data.
    */
-  public void stream(final String streamId, final StreamCallback callback) {
-    final long startTime = System.currentTimeMillis();
+  public void stream(String streamId, StreamCallback callback) {
+    long startTime = System.currentTimeMillis();
     if (logger.isDebugEnabled()) {
       logger.debug("Sending stream request for {} to {}", streamId, getRemoteAddress(channel));
     }
@@ -186,29 +180,25 @@ public class TransportClient implements Closeable {
     // when responses arrive.
     synchronized (this) {
       handler.addStreamCallback(callback);
-      channel.writeAndFlush(new StreamRequest(streamId)).addListener(
-        new ChannelFutureListener() {
-          @Override
-          public void operationComplete(ChannelFuture future) throws Exception {
-            if (future.isSuccess()) {
-              long timeTaken = System.currentTimeMillis() - startTime;
-              if (logger.isTraceEnabled()) {
-                logger.trace("Sending request for {} to {} took {} ms", streamId,
-                  getRemoteAddress(channel), timeTaken);
-              }
-            } else {
-              String errorMsg = String.format("Failed to send request for %s to %s: %s", streamId,
-                getRemoteAddress(channel), future.cause());
-              logger.error(errorMsg, future.cause());
-              channel.close();
-              try {
-                callback.onFailure(streamId, new IOException(errorMsg, future.cause()));
-              } catch (Exception e) {
-                logger.error("Uncaught exception in RPC response callback handler!", e);
-              }
-            }
+      channel.writeAndFlush(new StreamRequest(streamId)).addListener(future -> {
+        if (future.isSuccess()) {
+          long timeTaken = System.currentTimeMillis() - startTime;
+          if (logger.isTraceEnabled()) {
+            logger.trace("Sending request for {} to {} took {} ms", streamId,
+              getRemoteAddress(channel), timeTaken);
           }
-        });
+        } else {
+          String errorMsg = String.format("Failed to send request for %s to %s: %s", streamId,
+            getRemoteAddress(channel), future.cause());
+          logger.error(errorMsg, future.cause());
+          channel.close();
+          try {
+            callback.onFailure(streamId, new IOException(errorMsg, future.cause()));
+          } catch (Exception e) {
+            logger.error("Uncaught exception in RPC response callback handler!", e);
+          }
+        }
+      });
     }
   }
 
@@ -220,19 +210,17 @@ public class TransportClient implements Closeable {
    * @param callback Callback to handle the RPC's reply.
    * @return The RPC's id.
    */
-  public long sendRpc(ByteBuffer message, final RpcResponseCallback callback) {
-    final long startTime = System.currentTimeMillis();
+  public long sendRpc(ByteBuffer message, RpcResponseCallback callback) {
+    long startTime = System.currentTimeMillis();
     if (logger.isTraceEnabled()) {
       logger.trace("Sending RPC to {}", getRemoteAddress(channel));
     }
 
-    final long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
+    long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
     handler.addRpcRequest(requestId, callback);
 
-    channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message))).addListener(
-      new ChannelFutureListener() {
-        @Override
-        public void operationComplete(ChannelFuture future) throws Exception {
+    channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message)))
+        .addListener(future -> {
           if (future.isSuccess()) {
             long timeTaken = System.currentTimeMillis() - startTime;
             if (logger.isTraceEnabled()) {
@@ -251,8 +239,7 @@ public class TransportClient implements Closeable {
               logger.error("Uncaught exception in RPC response callback handler!", e);
             }
           }
-        }
-      });
+        });
 
     return requestId;
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java
index 980525d..799f454 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java
@@ -20,12 +20,7 @@ package org.apache.spark.network.crypto;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.security.GeneralSecurityException;
-import java.security.Key;
-import javax.crypto.KeyGenerator;
-import javax.crypto.Mac;
-import static java.nio.charset.StandardCharsets.UTF_8;
 
-import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
@@ -37,7 +32,6 @@ import org.apache.spark.network.client.TransportClient;
 import org.apache.spark.network.client.TransportClientBootstrap;
 import org.apache.spark.network.sasl.SaslClientBootstrap;
 import org.apache.spark.network.sasl.SecretKeyHolder;
-import org.apache.spark.network.util.JavaUtils;
 import org.apache.spark.network.util.TransportConf;
 
 /**
@@ -103,20 +97,18 @@ public class AuthClientBootstrap implements TransportClientBootstrap {
   private void doSparkAuth(TransportClient client, Channel channel)
     throws GeneralSecurityException, IOException {
 
-    AuthEngine engine = new AuthEngine(authUser, secretKeyHolder.getSecretKey(authUser), conf);
-    try {
+    String secretKey = secretKeyHolder.getSecretKey(authUser);
+    try (AuthEngine engine = new AuthEngine(authUser, secretKey, conf)) {
       ClientChallenge challenge = engine.challenge();
       ByteBuf challengeData = Unpooled.buffer(challenge.encodedLength());
       challenge.encode(challengeData);
 
-      ByteBuffer responseData = client.sendRpcSync(challengeData.nioBuffer(),
-        conf.authRTTimeoutMs());
+      ByteBuffer responseData =
+          client.sendRpcSync(challengeData.nioBuffer(), conf.authRTTimeoutMs());
       ServerResponse response = ServerResponse.decodeMessage(responseData);
 
       engine.validate(response);
       engine.sessionCipher().addToChannel(channel);
-    } finally {
-      engine.close();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java
index 991d8ba..0a5c029 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java
@@ -17,9 +17,7 @@
 
 package org.apache.spark.network.crypto;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
-import javax.security.sasl.Sasl;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Throwables;
@@ -35,7 +33,6 @@ import org.apache.spark.network.sasl.SecretKeyHolder;
 import org.apache.spark.network.sasl.SaslRpcHandler;
 import org.apache.spark.network.server.RpcHandler;
 import org.apache.spark.network.server.StreamManager;
-import org.apache.spark.network.util.JavaUtils;
 import org.apache.spark.network.util.TransportConf;
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
index 900e8eb..8193bc1 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
@@ -22,8 +22,6 @@ import java.nio.ByteBuffer;
 
 import com.google.common.base.Throwables;
 import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -189,21 +187,16 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> {
    * Responds to a single message with some Encodable object. If a failure occurs while sending,
    * it will be logged and the channel closed.
    */
-  private void respond(final Encodable result) {
-    final SocketAddress remoteAddress = channel.remoteAddress();
-    channel.writeAndFlush(result).addListener(
-      new ChannelFutureListener() {
-        @Override
-        public void operationComplete(ChannelFuture future) throws Exception {
-          if (future.isSuccess()) {
-            logger.trace("Sent result {} to client {}", result, remoteAddress);
-          } else {
-            logger.error(String.format("Error sending result %s to %s; closing connection",
-              result, remoteAddress), future.cause());
-            channel.close();
-          }
-        }
+  private void respond(Encodable result) {
+    SocketAddress remoteAddress = channel.remoteAddress();
+    channel.writeAndFlush(result).addListener(future -> {
+      if (future.isSuccess()) {
+        logger.trace("Sent result {} to client {}", result, remoteAddress);
+      } else {
+        logger.error(String.format("Error sending result %s to %s; closing connection",
+          result, remoteAddress), future.cause());
+        channel.close();
       }
-    );
+    });
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthEngineSuite.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthEngineSuite.java b/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthEngineSuite.java
index 9a186f2..a3519fe 100644
--- a/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthEngineSuite.java
+++ b/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthEngineSuite.java
@@ -18,10 +18,8 @@
 package org.apache.spark.network.crypto;
 
 import java.util.Arrays;
-import java.util.Map;
 import static java.nio.charset.StandardCharsets.UTF_8;
 
-import com.google.common.collect.ImmutableMap;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import static org.junit.Assert.*;

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
index 6e02430..6daf960 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
@@ -190,12 +190,8 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
       allMetrics.put("openBlockRequestLatencyMillis", openBlockRequestLatencyMillis);
       allMetrics.put("registerExecutorRequestLatencyMillis", registerExecutorRequestLatencyMillis);
       allMetrics.put("blockTransferRateBytes", blockTransferRateBytes);
-      allMetrics.put("registeredExecutorsSize", new Gauge<Integer>() {
-        @Override
-        public Integer getValue() {
-          return blockManager.getRegisteredExecutorsSize();
-        }
-      });
+      allMetrics.put("registeredExecutorsSize",
+                     (Gauge<Integer>) () -> blockManager.getRegisteredExecutorsSize());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
index 25e9abd..62d58ab 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
@@ -205,12 +205,7 @@ public class ExternalShuffleBlockResolver {
           logger.info("Cleaning up executor {}'s {} local dirs", fullId, executor.localDirs.length);
 
           // Execute the actual deletion in a different thread, as it may take some time.
-          directoryCleaner.execute(new Runnable() {
-            @Override
-            public void run() {
-              deleteExecutorDirs(executor.localDirs);
-            }
-          });
+          directoryCleaner.execute(() -> deleteExecutorDirs(executor.localDirs));
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
index 8c0c400..2c5827b 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
@@ -82,23 +82,19 @@ public class ExternalShuffleClient extends ShuffleClient {
 
   @Override
   public void fetchBlocks(
-      final String host,
-      final int port,
-      final String execId,
+      String host,
+      int port,
+      String execId,
       String[] blockIds,
       BlockFetchingListener listener) {
     checkInit();
     logger.debug("External shuffle fetch from {}:{} (executor id {})", host, port, execId);
     try {
       RetryingBlockFetcher.BlockFetchStarter blockFetchStarter =
-        new RetryingBlockFetcher.BlockFetchStarter() {
-          @Override
-          public void createAndStart(String[] blockIds, BlockFetchingListener listener)
-              throws IOException, InterruptedException {
+          (blockIds1, listener1) -> {
             TransportClient client = clientFactory.createClient(host, port);
-            new OneForOneBlockFetcher(client, appId, execId, blockIds, listener).start();
-          }
-        };
+            new OneForOneBlockFetcher(client, appId, execId, blockIds1, listener1).start();
+          };
 
       int maxRetries = conf.maxIORetries();
       if (maxRetries > 0) {
@@ -131,12 +127,9 @@ public class ExternalShuffleClient extends ShuffleClient {
       String execId,
       ExecutorShuffleInfo executorInfo) throws IOException, InterruptedException {
     checkInit();
-    TransportClient client = clientFactory.createUnmanagedClient(host, port);
-    try {
+    try (TransportClient client = clientFactory.createUnmanagedClient(host, port)) {
       ByteBuffer registerMessage = new RegisterExecutor(appId, execId, executorInfo).toByteBuffer();
       client.sendRpcSync(registerMessage, 5000 /* timeoutMs */);
-    } finally {
-      client.close();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
index 5be8550..f309dda 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
@@ -164,12 +164,9 @@ public class RetryingBlockFetcher {
     logger.info("Retrying fetch ({}/{}) for {} outstanding blocks after {} ms",
       retryCount, maxRetries, outstandingBlocksIds.size(), retryWaitTime);
 
-    executorService.submit(new Runnable() {
-      @Override
-      public void run() {
-        Uninterruptibles.sleepUninterruptibly(retryWaitTime, TimeUnit.MILLISECONDS);
-        fetchAllOutstanding();
-      }
+    executorService.submit(() -> {
+      Uninterruptibles.sleepUninterruptibly(retryWaitTime, TimeUnit.MILLISECONDS);
+      fetchAllOutstanding();
     });
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/common/sketch/pom.xml
----------------------------------------------------------------------
diff --git a/common/sketch/pom.xml b/common/sketch/pom.xml
index bcd26d4..1356c47 100644
--- a/common/sketch/pom.xml
+++ b/common/sketch/pom.xml
@@ -61,6 +61,7 @@
         <plugin>
           <groupId>net.alchim31.maven</groupId>
           <artifactId>scala-maven-plugin</artifactId>
+          <version>3.2.2</version>
           <configuration>
             <javacArgs combine.children="append">
               <!-- This option is needed to suppress warnings from sun.misc.Unsafe usage -->
@@ -71,6 +72,7 @@
         <plugin>
           <groupId>org.apache.maven.plugins</groupId>
           <artifactId>maven-compiler-plugin</artifactId>
+          <version>3.6.1</version>
           <configuration>
             <compilerArgs combine.children="append">
               <!-- This option is needed to suppress warnings from sun.misc.Unsafe usage -->

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/common/unsafe/pom.xml
----------------------------------------------------------------------
diff --git a/common/unsafe/pom.xml b/common/unsafe/pom.xml
index dc19f4a..f03a4da 100644
--- a/common/unsafe/pom.xml
+++ b/common/unsafe/pom.xml
@@ -98,6 +98,7 @@
         <plugin>
           <groupId>net.alchim31.maven</groupId>
           <artifactId>scala-maven-plugin</artifactId>
+          <version>3.2.2</version>
           <configuration>
             <javacArgs combine.children="append">
               <!-- This option is needed to suppress warnings from sun.misc.Unsafe usage -->
@@ -108,6 +109,7 @@
         <plugin>
           <groupId>org.apache.maven.plugins</groupId>
           <artifactId>maven-compiler-plugin</artifactId>
+          <version>3.6.1</version>
           <configuration>
             <compilerArgs combine.children="append">
               <!-- This option is needed to suppress warnings from sun.misc.Unsafe usage -->

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
index 671b8c7..f13c24a 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
@@ -162,14 +162,9 @@ public final class Platform {
       constructor.setAccessible(true);
       Field cleanerField = cls.getDeclaredField("cleaner");
       cleanerField.setAccessible(true);
-      final long memory = allocateMemory(size);
+      long memory = allocateMemory(size);
       ByteBuffer buffer = (ByteBuffer) constructor.newInstance(memory, size);
-      Cleaner cleaner = Cleaner.create(buffer, new Runnable() {
-        @Override
-        public void run() {
-          freeMemory(memory);
-        }
-      });
+      Cleaner cleaner = Cleaner.create(buffer, () -> freeMemory(memory));
       cleanerField.set(buffer, cleaner);
       return buffer;
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java
index fd6e95c..621f2c6 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java
@@ -178,48 +178,52 @@ public final class CalendarInterval implements Serializable {
         "Interval string does not match day-time format of 'd h:m:s.n': " + s);
     } else {
       try {
-        if (unit.equals("year")) {
-          int year = (int) toLongWithRange("year", m.group(1),
-            Integer.MIN_VALUE / 12, Integer.MAX_VALUE / 12);
-          result = new CalendarInterval(year * 12, 0L);
-
-        } else if (unit.equals("month")) {
-          int month = (int) toLongWithRange("month", m.group(1),
-            Integer.MIN_VALUE, Integer.MAX_VALUE);
-          result = new CalendarInterval(month, 0L);
-
-        } else if (unit.equals("week")) {
-          long week = toLongWithRange("week", m.group(1),
-                  Long.MIN_VALUE / MICROS_PER_WEEK, Long.MAX_VALUE / MICROS_PER_WEEK);
-          result = new CalendarInterval(0, week * MICROS_PER_WEEK);
-
-        } else if (unit.equals("day")) {
-          long day = toLongWithRange("day", m.group(1),
-            Long.MIN_VALUE / MICROS_PER_DAY, Long.MAX_VALUE / MICROS_PER_DAY);
-          result = new CalendarInterval(0, day * MICROS_PER_DAY);
-
-        } else if (unit.equals("hour")) {
-          long hour = toLongWithRange("hour", m.group(1),
-            Long.MIN_VALUE / MICROS_PER_HOUR, Long.MAX_VALUE / MICROS_PER_HOUR);
-          result = new CalendarInterval(0, hour * MICROS_PER_HOUR);
-
-        } else if (unit.equals("minute")) {
-          long minute = toLongWithRange("minute", m.group(1),
-            Long.MIN_VALUE / MICROS_PER_MINUTE, Long.MAX_VALUE / MICROS_PER_MINUTE);
-          result = new CalendarInterval(0, minute * MICROS_PER_MINUTE);
-
-        } else if (unit.equals("second")) {
-          long micros = parseSecondNano(m.group(1));
-          result = new CalendarInterval(0, micros);
-
-        } else if (unit.equals("millisecond")) {
-          long millisecond = toLongWithRange("millisecond", m.group(1),
-                  Long.MIN_VALUE / MICROS_PER_MILLI, Long.MAX_VALUE / MICROS_PER_MILLI);
-          result = new CalendarInterval(0, millisecond * MICROS_PER_MILLI);
-
-        } else if (unit.equals("microsecond")) {
-          long micros = Long.parseLong(m.group(1));
-          result = new CalendarInterval(0, micros);
+        switch (unit) {
+          case "year":
+            int year = (int) toLongWithRange("year", m.group(1),
+              Integer.MIN_VALUE / 12, Integer.MAX_VALUE / 12);
+            result = new CalendarInterval(year * 12, 0L);
+            break;
+          case "month":
+            int month = (int) toLongWithRange("month", m.group(1),
+              Integer.MIN_VALUE, Integer.MAX_VALUE);
+            result = new CalendarInterval(month, 0L);
+            break;
+          case "week":
+            long week = toLongWithRange("week", m.group(1),
+              Long.MIN_VALUE / MICROS_PER_WEEK, Long.MAX_VALUE / MICROS_PER_WEEK);
+            result = new CalendarInterval(0, week * MICROS_PER_WEEK);
+            break;
+          case "day":
+            long day = toLongWithRange("day", m.group(1),
+              Long.MIN_VALUE / MICROS_PER_DAY, Long.MAX_VALUE / MICROS_PER_DAY);
+            result = new CalendarInterval(0, day * MICROS_PER_DAY);
+            break;
+          case "hour":
+            long hour = toLongWithRange("hour", m.group(1),
+              Long.MIN_VALUE / MICROS_PER_HOUR, Long.MAX_VALUE / MICROS_PER_HOUR);
+            result = new CalendarInterval(0, hour * MICROS_PER_HOUR);
+            break;
+          case "minute":
+            long minute = toLongWithRange("minute", m.group(1),
+              Long.MIN_VALUE / MICROS_PER_MINUTE, Long.MAX_VALUE / MICROS_PER_MINUTE);
+            result = new CalendarInterval(0, minute * MICROS_PER_MINUTE);
+            break;
+          case "second": {
+            long micros = parseSecondNano(m.group(1));
+            result = new CalendarInterval(0, micros);
+            break;
+          }
+          case "millisecond":
+            long millisecond = toLongWithRange("millisecond", m.group(1),
+              Long.MIN_VALUE / MICROS_PER_MILLI, Long.MAX_VALUE / MICROS_PER_MILLI);
+            result = new CalendarInterval(0, millisecond * MICROS_PER_MILLI);
+            break;
+          case "microsecond": {
+            long micros = Long.parseLong(m.group(1));
+            result = new CalendarInterval(0, micros);
+            break;
+          }
         }
       } catch (Exception e) {
         throw new IllegalArgumentException("Error parsing interval string: " + e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/Optional.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/Optional.java b/core/src/main/java/org/apache/spark/api/java/Optional.java
index ca7babc..fd0f495 100644
--- a/core/src/main/java/org/apache/spark/api/java/Optional.java
+++ b/core/src/main/java/org/apache/spark/api/java/Optional.java
@@ -18,6 +18,7 @@
 package org.apache.spark.api.java;
 
 import java.io.Serializable;
+import java.util.Objects;
 
 import com.google.common.base.Preconditions;
 
@@ -52,8 +53,8 @@ import com.google.common.base.Preconditions;
  *   <li>{@link #isPresent()}</li>
  * </ul>
  *
- * <p>{@code java.util.Optional} itself is not used at this time because the
- * project does not require Java 8. Using {@code com.google.common.base.Optional}
+ * <p>{@code java.util.Optional} itself was not used because at the time, the
+ * project did not require Java 8. Using {@code com.google.common.base.Optional}
  * has in the past caused serious library version conflicts with Guava that can't
  * be resolved by shading. Hence this work-alike clone.</p>
  *
@@ -171,7 +172,7 @@ public final class Optional<T> implements Serializable {
       return false;
     }
     Optional<?> other = (Optional<?>) obj;
-    return value == null ? other.value == null : value.equals(other.value);
+    return Objects.equals(value, other.value);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java b/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java
index 07aebb7..33bedf7 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
  * A function that returns zero or more output records from each grouping key and its values from 2
  * Datasets.
  */
+@FunctionalInterface
 public interface CoGroupFunction<K, V1, V2, R> extends Serializable {
   Iterator<R> call(K key, Iterator<V1> left, Iterator<V2> right) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
index 576087b..2f23da5 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
 /**
  * A function that returns zero or more records of type Double from each input record.
  */
+@FunctionalInterface
 public interface DoubleFlatMapFunction<T> extends Serializable {
   Iterator<Double> call(T t) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java b/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java
index bf16f79..3c0291c 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
 /**
  *  A function that returns Doubles, and can be used to construct DoubleRDDs.
  */
+@FunctionalInterface
 public interface DoubleFunction<T> extends Serializable {
   double call(T t) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java b/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java
index 462ca3f..a6f69f7 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java
@@ -24,6 +24,7 @@ import java.io.Serializable;
  *
  * If the function returns true, the element is included in the returned Dataset.
  */
+@FunctionalInterface
 public interface FilterFunction<T> extends Serializable {
   boolean call(T value) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
index 2d8ea6d..91d6129 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
 /**
  * A function that returns zero or more output records from each input record.
  */
+@FunctionalInterface
 public interface FlatMapFunction<T, R> extends Serializable {
   Iterator<R> call(T t) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
index fc97b63..f9f2580 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
 /**
  * A function that takes two inputs and returns zero or more output records.
  */
+@FunctionalInterface
 public interface FlatMapFunction2<T1, T2, R> extends Serializable {
   Iterator<R> call(T1 t1, T2 t2) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java
index bae574a..6423c5d 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
 /**
  * A function that returns zero or more output records from each grouping key and its values.
  */
+@FunctionalInterface
 public interface FlatMapGroupsFunction<K, V, R> extends Serializable {
   Iterator<R> call(K key, Iterator<V> values) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/ForeachFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/ForeachFunction.java b/core/src/main/java/org/apache/spark/api/java/function/ForeachFunction.java
index 07e54b2..2e6e908 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/ForeachFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/ForeachFunction.java
@@ -24,6 +24,7 @@ import java.io.Serializable;
  *
  * Spark will invoke the call function on each element in the input Dataset.
  */
+@FunctionalInterface
 public interface ForeachFunction<T> extends Serializable {
   void call(T t) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/ForeachPartitionFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/ForeachPartitionFunction.java b/core/src/main/java/org/apache/spark/api/java/function/ForeachPartitionFunction.java
index 4938a51..d8f55d0 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/ForeachPartitionFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/ForeachPartitionFunction.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
 /**
  * Base interface for a function used in Dataset's foreachPartition function.
  */
+@FunctionalInterface
 public interface ForeachPartitionFunction<T> extends Serializable {
   void call(Iterator<T> t) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/Function.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function.java b/core/src/main/java/org/apache/spark/api/java/function/Function.java
index b9d9777..8b2bbd5 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/Function.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/Function.java
@@ -24,6 +24,7 @@ import java.io.Serializable;
  * DoubleFunction are handled separately, to allow PairRDDs and DoubleRDDs to be constructed
  * when mapping RDDs of other types.
  */
+@FunctionalInterface
 public interface Function<T1, R> extends Serializable {
   R call(T1 v1) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/Function0.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function0.java b/core/src/main/java/org/apache/spark/api/java/function/Function0.java
index c86928d..5c649d9 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/Function0.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/Function0.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
 /**
  * A zero-argument function that returns an R.
  */
+@FunctionalInterface
 public interface Function0<R> extends Serializable {
   R call() throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/Function2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function2.java b/core/src/main/java/org/apache/spark/api/java/function/Function2.java
index a975ce3..a7d9647 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/Function2.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/Function2.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
 /**
  * A two-argument function that takes arguments of type T1 and T2 and returns an R.
  */
+@FunctionalInterface
 public interface Function2<T1, T2, R> extends Serializable {
   R call(T1 v1, T2 v2) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/Function3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function3.java b/core/src/main/java/org/apache/spark/api/java/function/Function3.java
index 6eecfb6..77acd21 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/Function3.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/Function3.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
 /**
  * A three-argument function that takes arguments of type T1, T2 and T3 and returns an R.
  */
+@FunctionalInterface
 public interface Function3<T1, T2, T3, R> extends Serializable {
   R call(T1 v1, T2 v2, T3 v3) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/Function4.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function4.java b/core/src/main/java/org/apache/spark/api/java/function/Function4.java
index 9c35a22..d530ba4 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/Function4.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/Function4.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
 /**
  * A four-argument function that takes arguments of type T1, T2, T3 and T4 and returns an R.
  */
+@FunctionalInterface
 public interface Function4<T1, T2, T3, T4, R> extends Serializable {
   R call(T1 v1, T2 v2, T3 v3, T4 v4) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/MapFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/MapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/MapFunction.java
index 3ae6ef4..5efff94 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/MapFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/MapFunction.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
 /**
  * Base interface for a map function used in Dataset's map function.
  */
+@FunctionalInterface
 public interface MapFunction<T, U> extends Serializable {
   U call(T value) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/MapGroupsFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/MapGroupsFunction.java b/core/src/main/java/org/apache/spark/api/java/function/MapGroupsFunction.java
index faa59ea..2c3d43a 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/MapGroupsFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/MapGroupsFunction.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
 /**
  * Base interface for a map function used in GroupedDataset's mapGroup function.
  */
+@FunctionalInterface
 public interface MapGroupsFunction<K, V, R> extends Serializable {
   R call(K key, Iterator<V> values) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java b/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java
index cf9945a..68e8557 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
 /**
  * Base interface for function used in Dataset's mapPartitions.
  */
+@FunctionalInterface
 public interface MapPartitionsFunction<T, U> extends Serializable {
   Iterator<U> call(Iterator<T> input) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
index 51eed2e..97bd2b3 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
@@ -26,6 +26,7 @@ import scala.Tuple2;
  * A function that returns zero or more key-value pair records from each input record. The
  * key-value pairs are represented as scala.Tuple2 objects.
  */
+@FunctionalInterface
 public interface PairFlatMapFunction<T, K, V> extends Serializable {
   Iterator<Tuple2<K, V>> call(T t) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java b/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java
index 2fdfa71..34a7e44 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java
@@ -25,6 +25,7 @@ import scala.Tuple2;
  * A function that returns key-value pairs (Tuple2&lt;K, V&gt;), and can be used to
  * construct PairRDDs.
  */
+@FunctionalInterface
 public interface PairFunction<T, K, V> extends Serializable {
   Tuple2<K, V> call(T t) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/ReduceFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/ReduceFunction.java b/core/src/main/java/org/apache/spark/api/java/function/ReduceFunction.java
index ee092d0..d9029d8 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/ReduceFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/ReduceFunction.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
 /**
  * Base interface for function used in Dataset's reduce.
  */
+@FunctionalInterface
 public interface ReduceFunction<T> extends Serializable {
   T call(T v1, T v2) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java
index f30d42e..aff2bc6 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
 /**
  * A function with no return value.
  */
+@FunctionalInterface
 public interface VoidFunction<T> extends Serializable {
   void call(T t) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java
index da9ae1c..ddb6162 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
 /**
  * A two-argument function that takes arguments of type T1 and T2 with no return value.
  */
+@FunctionalInterface
 public interface VoidFunction2<T1, T2> extends Serializable {
   void call(T1 v1, T2 v2) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index dcae4a3..189d607 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -162,14 +162,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
     // Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at
     // the end of the task. This is necessary to avoid memory leaks in when the downstream operator
     // does not fully consume the sorter's output (e.g. sort followed by limit).
-    taskContext.addTaskCompletionListener(
-      new TaskCompletionListener() {
-        @Override
-        public void onTaskCompletion(TaskContext context) {
-          cleanupResources();
-        }
-      }
-    );
+    taskContext.addTaskCompletionListener(context -> { cleanupResources(); });
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java
index 01aed95..cf4dfde 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java
@@ -27,22 +27,18 @@ final class UnsafeSorterSpillMerger {
   private final PriorityQueue<UnsafeSorterIterator> priorityQueue;
 
   UnsafeSorterSpillMerger(
-      final RecordComparator recordComparator,
-      final PrefixComparator prefixComparator,
-      final int numSpills) {
-    final Comparator<UnsafeSorterIterator> comparator = new Comparator<UnsafeSorterIterator>() {
-
-      @Override
-      public int compare(UnsafeSorterIterator left, UnsafeSorterIterator right) {
-        final int prefixComparisonResult =
-          prefixComparator.compare(left.getKeyPrefix(), right.getKeyPrefix());
-        if (prefixComparisonResult == 0) {
-          return recordComparator.compare(
-            left.getBaseObject(), left.getBaseOffset(),
-            right.getBaseObject(), right.getBaseOffset());
-        } else {
-          return prefixComparisonResult;
-        }
+      RecordComparator recordComparator,
+      PrefixComparator prefixComparator,
+      int numSpills) {
+    Comparator<UnsafeSorterIterator> comparator = (left, right) -> {
+      int prefixComparisonResult =
+        prefixComparator.compare(left.getKeyPrefix(), right.getKeyPrefix());
+      if (prefixComparisonResult == 0) {
+        return recordComparator.compare(
+          left.getBaseObject(), left.getBaseOffset(),
+          right.getBaseObject(), right.getBaseOffset());
+      } else {
+        return prefixComparisonResult;
       }
     };
     priorityQueue = new PriorityQueue<>(numSpills, comparator);

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index cbab7b8..7e56406 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -350,9 +350,6 @@ class SparkContext(config: SparkConf) extends Logging {
 
   private def warnDeprecatedVersions(): Unit = {
     val javaVersion = System.getProperty("java.version").split("[+.\\-]+", 3)
-    if (javaVersion.length >= 2 && javaVersion(1).toInt == 7) {
-      logWarning("Support for Java 7 is deprecated as of Spark 2.0.0")
-    }
     if (scala.util.Properties.releaseVersion.exists(_.startsWith("2.10"))) {
       logWarning("Support for Scala 2.10 is deprecated as of Spark 2.1.0")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala b/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala
index 31b9c5e..3fd812e 100644
--- a/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala
@@ -39,7 +39,6 @@ private[spark] class WorkerCommandBuilder(sparkHome: String, memoryMb: Int, comm
     val cmd = buildJavaCommand(command.classPathEntries.mkString(File.pathSeparator))
     cmd.add(s"-Xmx${memoryMb}M")
     command.javaOpts.foreach(cmd.add)
-    CommandBuilderUtils.addPermGenSizeOpt(cmd)
     addOptionString(cmd, getenv("SPARK_JAVA_OPTS"))
     cmd
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index fe6fe6a..1e6e9a2 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1882,20 +1882,17 @@ private[spark] object Utils extends Logging {
   def terminateProcess(process: Process, timeoutMs: Long): Option[Int] = {
     // Politely destroy first
     process.destroy()
-
-    if (waitForProcess(process, timeoutMs)) {
+    if (process.waitFor(timeoutMs, TimeUnit.MILLISECONDS)) {
       // Successful exit
       Option(process.exitValue())
     } else {
-      // Java 8 added a new API which will more forcibly kill the process. Use that if available.
       try {
-        classOf[Process].getMethod("destroyForcibly").invoke(process)
+        process.destroyForcibly()
       } catch {
-        case _: NoSuchMethodException => return None // Not available; give up
         case NonFatal(e) => logWarning("Exception when attempting to kill process", e)
       }
       // Wait, again, although this really should return almost immediately
-      if (waitForProcess(process, timeoutMs)) {
+      if (process.waitFor(timeoutMs, TimeUnit.MILLISECONDS)) {
         Option(process.exitValue())
       } else {
         logWarning("Timed out waiting to forcibly kill process")
@@ -1905,44 +1902,11 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * Wait for a process to terminate for at most the specified duration.
-   *
-   * @return whether the process actually terminated before the given timeout.
-   */
-  def waitForProcess(process: Process, timeoutMs: Long): Boolean = {
-    try {
-      // Use Java 8 method if available
-      classOf[Process].getMethod("waitFor", java.lang.Long.TYPE, classOf[TimeUnit])
-        .invoke(process, timeoutMs.asInstanceOf[java.lang.Long], TimeUnit.MILLISECONDS)
-        .asInstanceOf[Boolean]
-    } catch {
-      case _: NoSuchMethodException =>
-        // Otherwise implement it manually
-        var terminated = false
-        val startTime = System.currentTimeMillis
-        while (!terminated) {
-          try {
-            process.exitValue()
-            terminated = true
-          } catch {
-            case e: IllegalThreadStateException =>
-              // Process not terminated yet
-              if (System.currentTimeMillis - startTime > timeoutMs) {
-                return false
-              }
-              Thread.sleep(100)
-          }
-        }
-        true
-    }
-  }
-
-  /**
    * Return the stderr of a process after waiting for the process to terminate.
    * If the process does not terminate within the specified timeout, return None.
    */
   def getStderr(process: Process, timeoutMs: Long): Option[String] = {
-    val terminated = Utils.waitForProcess(process, timeoutMs)
+    val terminated = process.waitFor(timeoutMs, TimeUnit.MILLISECONDS)
     if (terminated) {
       Some(Source.fromInputStream(process.getErrorStream).getLines().mkString("\n"))
     } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org