You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/02/16 12:32:59 UTC
[8/8] spark git commit: [SPARK-19550][BUILD][CORE][WIP] Remove Java 7
support
[SPARK-19550][BUILD][CORE][WIP] Remove Java 7 support
- Move external/java8-tests tests into core, streaming, sql and remove
- Remove MaxPermGen and related options
- Fix some reflection / TODOs around Java 8+ methods
- Update doc references to 1.7/1.8 differences
- Remove Java 7/8 related build profiles
- Update some plugins for better Java 8 compatibility
- Fix a few Java-related warnings
For the future:
- Update Java 8 examples to fully use Java 8
- Update Java tests to use lambdas for simplicity
- Update Java internal implementations to use lambdas
## How was this patch tested?
Existing tests
Author: Sean Owen <so...@cloudera.com>
Closes #16871 from srowen/SPARK-19493.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0e240549
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0e240549
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0e240549
Branch: refs/heads/master
Commit: 0e2405490f2056728d1353abbac6f3ea177ae533
Parents: 3871d94
Author: Sean Owen <so...@cloudera.com>
Authored: Thu Feb 16 12:32:45 2017 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Thu Feb 16 12:32:45 2017 +0000
----------------------------------------------------------------------
assembly/pom.xml | 1 +
build/mvn | 8 +-
build/sbt-launch-lib.bash | 2 +-
.../spark/network/client/TransportClient.java | 111 +-
.../network/crypto/AuthClientBootstrap.java | 16 +-
.../spark/network/crypto/AuthRpcHandler.java | 3 -
.../network/server/TransportRequestHandler.java | 27 +-
.../spark/network/crypto/AuthEngineSuite.java | 2 -
.../shuffle/ExternalShuffleBlockHandler.java | 8 +-
.../shuffle/ExternalShuffleBlockResolver.java | 7 +-
.../network/shuffle/ExternalShuffleClient.java | 21 +-
.../network/shuffle/RetryingBlockFetcher.java | 9 +-
common/sketch/pom.xml | 2 +
common/unsafe/pom.xml | 2 +
.../java/org/apache/spark/unsafe/Platform.java | 9 +-
.../spark/unsafe/types/CalendarInterval.java | 88 +-
.../org/apache/spark/api/java/Optional.java | 7 +-
.../api/java/function/CoGroupFunction.java | 1 +
.../java/function/DoubleFlatMapFunction.java | 1 +
.../spark/api/java/function/DoubleFunction.java | 1 +
.../spark/api/java/function/FilterFunction.java | 1 +
.../api/java/function/FlatMapFunction.java | 1 +
.../api/java/function/FlatMapFunction2.java | 1 +
.../java/function/FlatMapGroupsFunction.java | 1 +
.../api/java/function/ForeachFunction.java | 1 +
.../java/function/ForeachPartitionFunction.java | 1 +
.../spark/api/java/function/Function.java | 1 +
.../spark/api/java/function/Function0.java | 1 +
.../spark/api/java/function/Function2.java | 1 +
.../spark/api/java/function/Function3.java | 1 +
.../spark/api/java/function/Function4.java | 1 +
.../spark/api/java/function/MapFunction.java | 1 +
.../api/java/function/MapGroupsFunction.java | 1 +
.../java/function/MapPartitionsFunction.java | 1 +
.../api/java/function/PairFlatMapFunction.java | 1 +
.../spark/api/java/function/PairFunction.java | 1 +
.../spark/api/java/function/ReduceFunction.java | 1 +
.../spark/api/java/function/VoidFunction.java | 1 +
.../spark/api/java/function/VoidFunction2.java | 1 +
.../unsafe/sort/UnsafeExternalSorter.java | 9 +-
.../unsafe/sort/UnsafeSorterSpillMerger.java | 28 +-
.../scala/org/apache/spark/SparkContext.scala | 3 -
.../spark/launcher/WorkerCommandBuilder.scala | 1 -
.../scala/org/apache/spark/util/Utils.scala | 44 +-
.../java/org/apache/spark/JavaAPISuite.java | 1836 ----------------
.../test/org/apache/spark/Java8RDDAPISuite.java | 356 ++++
.../test/org/apache/spark/JavaAPISuite.java | 1842 ++++++++++++++++
.../org/apache/spark/util/UtilsSuite.scala | 6 +-
dev/appveyor-install-dependencies.ps1 | 2 +-
dev/create-release/release-build.sh | 1 -
dev/make-distribution.sh | 2 +-
dev/mima | 1 -
dev/run-tests.py | 3 -
dev/test-dependencies.sh | 2 +-
docs/building-spark.md | 32 +-
docs/index.md | 6 +-
docs/mllib-linear-methods.md | 2 +-
docs/mllib-statistics.md | 7 +-
docs/programming-guide.md | 11 +-
docs/quick-start.md | 9 +-
docs/streaming-custom-receivers.md | 10 +-
docs/streaming-kafka-0-10-integration.md | 62 +-
docs/streaming-kafka-0-8-integration.md | 41 +-
docs/streaming-programming-guide.md | 219 +-
docs/structured-streaming-programming-guide.md | 38 +-
.../spark/examples/ml/JavaTokenizerExample.java | 4 +-
.../examples/sql/JavaSQLDataSourceExample.java | 2 +-
external/java8-tests/README.md | 22 -
external/java8-tests/pom.xml | 132 --
.../apache/spark/java8/Java8RDDAPISuite.java | 356 ----
.../spark/java8/dstream/Java8APISuite.java | 882 --------
.../java8/sql/Java8DatasetAggregatorSuite.java | 62 -
.../src/test/resources/log4j.properties | 27 -
.../org/apache/spark/java8/JDK8ScalaSuite.scala | 30 -
.../apache/spark/sql/kafka010/KafkaSource.scala | 3 +-
.../spark/streaming/kafka010/KafkaRDD.scala | 7 +-
.../spark/launcher/AbstractCommandBuilder.java | 7 +-
.../spark/launcher/ChildProcAppHandle.java | 10 +-
.../spark/launcher/CommandBuilderUtils.java | 21 -
.../apache/spark/launcher/LauncherServer.java | 7 +-
.../apache/spark/launcher/OutputRedirector.java | 7 +-
.../apache/spark/launcher/SparkAppHandle.java | 3 -
.../launcher/SparkClassCommandBuilder.java | 68 +-
.../launcher/SparkSubmitCommandBuilder.java | 101 +-
.../launcher/CommandBuilderUtilsSuite.java | 36 -
.../SparkSubmitCommandBuilderSuite.java | 8 +-
launcher/src/test/resources/spark-defaults.conf | 2 +-
pom.xml | 171 +-
project/SparkBuild.scala | 41 +-
.../org/apache/spark/deploy/yarn/Client.scala | 1 -
.../spark/deploy/yarn/ExecutorRunnable.scala | 2 -
.../launcher/YarnCommandBuilderUtils.scala | 12 -
.../org/apache/spark/sql/types/Decimal.scala | 8 +-
.../FlatMapGroupsWithStateFunction.java | 4 +-
.../function/MapGroupsWithStateFunction.java | 4 +-
.../scala/org/apache/spark/sql/Dataset.scala | 2 +-
.../spark/sql/KeyValueGroupedDataset.scala | 2 +-
.../scala/org/apache/spark/sql/SQLContext.scala | 11 -
.../org/apache/spark/sql/SparkSession.scala | 11 -
.../spark/sql/Java8DatasetAggregatorSuite.java | 61 +
.../org/apache/spark/sql/JavaDatasetSuite.java | 2 +-
sql/hive/pom.xml | 3 +-
.../execution/ScriptTransformationExec.scala | 10 +-
.../apache/spark/streaming/JavaAPISuite.java | 2000 -----------------
.../apache/spark/streaming/Java8APISuite.java | 887 ++++++++
.../apache/spark/streaming/JavaAPISuite.java | 2008 ++++++++++++++++++
106 files changed, 5641 insertions(+), 6314 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/assembly/pom.xml
----------------------------------------------------------------------
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 53f1879..9d8607d 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -187,6 +187,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
+ <version>3.0.0</version>
<executions>
<execution>
<id>dist</id>
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/build/mvn
----------------------------------------------------------------------
diff --git a/build/mvn b/build/mvn
index 866bad8..1e393c3 100755
--- a/build/mvn
+++ b/build/mvn
@@ -22,7 +22,7 @@ _DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
# Preserve the calling directory
_CALLING_DIR="$(pwd)"
# Options used during compilation
-_COMPILE_JVM_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"
+_COMPILE_JVM_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
# Installs any application tarball given a URL, the expected tarball name,
# and, optionally, a checkable binary path to determine if the binary has
@@ -141,13 +141,9 @@ cd "${_CALLING_DIR}"
# Now that zinc is ensured to be installed, check its status and, if its
# not running or just installed, start it
if [ -n "${ZINC_INSTALL_FLAG}" -o -z "`"${ZINC_BIN}" -status -port ${ZINC_PORT}`" ]; then
- ZINC_JAVA_HOME=
- if [ -n "$JAVA_7_HOME" ]; then
- ZINC_JAVA_HOME="env JAVA_HOME=$JAVA_7_HOME"
- fi
export ZINC_OPTS=${ZINC_OPTS:-"$_COMPILE_JVM_OPTS"}
"${ZINC_BIN}" -shutdown -port ${ZINC_PORT}
- $ZINC_JAVA_HOME "${ZINC_BIN}" -start -port ${ZINC_PORT} \
+ "${ZINC_BIN}" -start -port ${ZINC_PORT} \
-scala-compiler "${SCALA_COMPILER}" \
-scala-library "${SCALA_LIBRARY}" &>/dev/null
fi
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/build/sbt-launch-lib.bash
----------------------------------------------------------------------
diff --git a/build/sbt-launch-lib.bash b/build/sbt-launch-lib.bash
index 615f848..4732669 100755
--- a/build/sbt-launch-lib.bash
+++ b/build/sbt-launch-lib.bash
@@ -117,7 +117,7 @@ get_mem_opts () {
(( $perm < 4096 )) || perm=4096
local codecache=$(( $perm / 2 ))
- echo "-Xms${mem}m -Xmx${mem}m -XX:MaxPermSize=${perm}m -XX:ReservedCodeCacheSize=${codecache}m"
+ echo "-Xms${mem}m -Xmx${mem}m -XX:ReservedCodeCacheSize=${codecache}m"
}
require_arg () {
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
index 7e7d78d..a6f527c 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
@@ -32,8 +32,6 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.SettableFuture;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -133,40 +131,36 @@ public class TransportClient implements Closeable {
*/
public void fetchChunk(
long streamId,
- final int chunkIndex,
- final ChunkReceivedCallback callback) {
- final long startTime = System.currentTimeMillis();
+ int chunkIndex,
+ ChunkReceivedCallback callback) {
+ long startTime = System.currentTimeMillis();
if (logger.isDebugEnabled()) {
logger.debug("Sending fetch chunk request {} to {}", chunkIndex, getRemoteAddress(channel));
}
- final StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);
+ StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);
handler.addFetchRequest(streamChunkId, callback);
- channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener(
- new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (future.isSuccess()) {
- long timeTaken = System.currentTimeMillis() - startTime;
- if (logger.isTraceEnabled()) {
- logger.trace("Sending request {} to {} took {} ms", streamChunkId,
- getRemoteAddress(channel), timeTaken);
- }
- } else {
- String errorMsg = String.format("Failed to send request %s to %s: %s", streamChunkId,
- getRemoteAddress(channel), future.cause());
- logger.error(errorMsg, future.cause());
- handler.removeFetchRequest(streamChunkId);
- channel.close();
- try {
- callback.onFailure(chunkIndex, new IOException(errorMsg, future.cause()));
- } catch (Exception e) {
- logger.error("Uncaught exception in RPC response callback handler!", e);
- }
- }
+ channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener(future -> {
+ if (future.isSuccess()) {
+ long timeTaken = System.currentTimeMillis() - startTime;
+ if (logger.isTraceEnabled()) {
+ logger.trace("Sending request {} to {} took {} ms", streamChunkId,
+ getRemoteAddress(channel), timeTaken);
}
- });
+ } else {
+ String errorMsg = String.format("Failed to send request %s to %s: %s", streamChunkId,
+ getRemoteAddress(channel), future.cause());
+ logger.error(errorMsg, future.cause());
+ handler.removeFetchRequest(streamChunkId);
+ channel.close();
+ try {
+ callback.onFailure(chunkIndex, new IOException(errorMsg, future.cause()));
+ } catch (Exception e) {
+ logger.error("Uncaught exception in RPC response callback handler!", e);
+ }
+ }
+ });
}
/**
@@ -175,8 +169,8 @@ public class TransportClient implements Closeable {
* @param streamId The stream to fetch.
* @param callback Object to call with the stream data.
*/
- public void stream(final String streamId, final StreamCallback callback) {
- final long startTime = System.currentTimeMillis();
+ public void stream(String streamId, StreamCallback callback) {
+ long startTime = System.currentTimeMillis();
if (logger.isDebugEnabled()) {
logger.debug("Sending stream request for {} to {}", streamId, getRemoteAddress(channel));
}
@@ -186,29 +180,25 @@ public class TransportClient implements Closeable {
// when responses arrive.
synchronized (this) {
handler.addStreamCallback(callback);
- channel.writeAndFlush(new StreamRequest(streamId)).addListener(
- new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (future.isSuccess()) {
- long timeTaken = System.currentTimeMillis() - startTime;
- if (logger.isTraceEnabled()) {
- logger.trace("Sending request for {} to {} took {} ms", streamId,
- getRemoteAddress(channel), timeTaken);
- }
- } else {
- String errorMsg = String.format("Failed to send request for %s to %s: %s", streamId,
- getRemoteAddress(channel), future.cause());
- logger.error(errorMsg, future.cause());
- channel.close();
- try {
- callback.onFailure(streamId, new IOException(errorMsg, future.cause()));
- } catch (Exception e) {
- logger.error("Uncaught exception in RPC response callback handler!", e);
- }
- }
+ channel.writeAndFlush(new StreamRequest(streamId)).addListener(future -> {
+ if (future.isSuccess()) {
+ long timeTaken = System.currentTimeMillis() - startTime;
+ if (logger.isTraceEnabled()) {
+ logger.trace("Sending request for {} to {} took {} ms", streamId,
+ getRemoteAddress(channel), timeTaken);
}
- });
+ } else {
+ String errorMsg = String.format("Failed to send request for %s to %s: %s", streamId,
+ getRemoteAddress(channel), future.cause());
+ logger.error(errorMsg, future.cause());
+ channel.close();
+ try {
+ callback.onFailure(streamId, new IOException(errorMsg, future.cause()));
+ } catch (Exception e) {
+ logger.error("Uncaught exception in RPC response callback handler!", e);
+ }
+ }
+ });
}
}
@@ -220,19 +210,17 @@ public class TransportClient implements Closeable {
* @param callback Callback to handle the RPC's reply.
* @return The RPC's id.
*/
- public long sendRpc(ByteBuffer message, final RpcResponseCallback callback) {
- final long startTime = System.currentTimeMillis();
+ public long sendRpc(ByteBuffer message, RpcResponseCallback callback) {
+ long startTime = System.currentTimeMillis();
if (logger.isTraceEnabled()) {
logger.trace("Sending RPC to {}", getRemoteAddress(channel));
}
- final long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
+ long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
handler.addRpcRequest(requestId, callback);
- channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message))).addListener(
- new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
+ channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message)))
+ .addListener(future -> {
if (future.isSuccess()) {
long timeTaken = System.currentTimeMillis() - startTime;
if (logger.isTraceEnabled()) {
@@ -251,8 +239,7 @@ public class TransportClient implements Closeable {
logger.error("Uncaught exception in RPC response callback handler!", e);
}
}
- }
- });
+ });
return requestId;
}
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java
index 980525d..799f454 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java
@@ -20,12 +20,7 @@ package org.apache.spark.network.crypto;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.GeneralSecurityException;
-import java.security.Key;
-import javax.crypto.KeyGenerator;
-import javax.crypto.Mac;
-import static java.nio.charset.StandardCharsets.UTF_8;
-import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
@@ -37,7 +32,6 @@ import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.client.TransportClientBootstrap;
import org.apache.spark.network.sasl.SaslClientBootstrap;
import org.apache.spark.network.sasl.SecretKeyHolder;
-import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.TransportConf;
/**
@@ -103,20 +97,18 @@ public class AuthClientBootstrap implements TransportClientBootstrap {
private void doSparkAuth(TransportClient client, Channel channel)
throws GeneralSecurityException, IOException {
- AuthEngine engine = new AuthEngine(authUser, secretKeyHolder.getSecretKey(authUser), conf);
- try {
+ String secretKey = secretKeyHolder.getSecretKey(authUser);
+ try (AuthEngine engine = new AuthEngine(authUser, secretKey, conf)) {
ClientChallenge challenge = engine.challenge();
ByteBuf challengeData = Unpooled.buffer(challenge.encodedLength());
challenge.encode(challengeData);
- ByteBuffer responseData = client.sendRpcSync(challengeData.nioBuffer(),
- conf.authRTTimeoutMs());
+ ByteBuffer responseData =
+ client.sendRpcSync(challengeData.nioBuffer(), conf.authRTTimeoutMs());
ServerResponse response = ServerResponse.decodeMessage(responseData);
engine.validate(response);
engine.sessionCipher().addToChannel(channel);
- } finally {
- engine.close();
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java
index 991d8ba..0a5c029 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java
@@ -17,9 +17,7 @@
package org.apache.spark.network.crypto;
-import java.io.IOException;
import java.nio.ByteBuffer;
-import javax.security.sasl.Sasl;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
@@ -35,7 +33,6 @@ import org.apache.spark.network.sasl.SecretKeyHolder;
import org.apache.spark.network.sasl.SaslRpcHandler;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.StreamManager;
-import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.TransportConf;
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
index 900e8eb..8193bc1 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
@@ -22,8 +22,6 @@ import java.nio.ByteBuffer;
import com.google.common.base.Throwables;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -189,21 +187,16 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> {
* Responds to a single message with some Encodable object. If a failure occurs while sending,
* it will be logged and the channel closed.
*/
- private void respond(final Encodable result) {
- final SocketAddress remoteAddress = channel.remoteAddress();
- channel.writeAndFlush(result).addListener(
- new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (future.isSuccess()) {
- logger.trace("Sent result {} to client {}", result, remoteAddress);
- } else {
- logger.error(String.format("Error sending result %s to %s; closing connection",
- result, remoteAddress), future.cause());
- channel.close();
- }
- }
+ private void respond(Encodable result) {
+ SocketAddress remoteAddress = channel.remoteAddress();
+ channel.writeAndFlush(result).addListener(future -> {
+ if (future.isSuccess()) {
+ logger.trace("Sent result {} to client {}", result, remoteAddress);
+ } else {
+ logger.error(String.format("Error sending result %s to %s; closing connection",
+ result, remoteAddress), future.cause());
+ channel.close();
}
- );
+ });
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthEngineSuite.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthEngineSuite.java b/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthEngineSuite.java
index 9a186f2..a3519fe 100644
--- a/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthEngineSuite.java
+++ b/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthEngineSuite.java
@@ -18,10 +18,8 @@
package org.apache.spark.network.crypto;
import java.util.Arrays;
-import java.util.Map;
import static java.nio.charset.StandardCharsets.UTF_8;
-import com.google.common.collect.ImmutableMap;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
index 6e02430..6daf960 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
@@ -190,12 +190,8 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
allMetrics.put("openBlockRequestLatencyMillis", openBlockRequestLatencyMillis);
allMetrics.put("registerExecutorRequestLatencyMillis", registerExecutorRequestLatencyMillis);
allMetrics.put("blockTransferRateBytes", blockTransferRateBytes);
- allMetrics.put("registeredExecutorsSize", new Gauge<Integer>() {
- @Override
- public Integer getValue() {
- return blockManager.getRegisteredExecutorsSize();
- }
- });
+ allMetrics.put("registeredExecutorsSize",
+ (Gauge<Integer>) () -> blockManager.getRegisteredExecutorsSize());
}
@Override
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
index 25e9abd..62d58ab 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
@@ -205,12 +205,7 @@ public class ExternalShuffleBlockResolver {
logger.info("Cleaning up executor {}'s {} local dirs", fullId, executor.localDirs.length);
// Execute the actual deletion in a different thread, as it may take some time.
- directoryCleaner.execute(new Runnable() {
- @Override
- public void run() {
- deleteExecutorDirs(executor.localDirs);
- }
- });
+ directoryCleaner.execute(() -> deleteExecutorDirs(executor.localDirs));
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
index 8c0c400..2c5827b 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
@@ -82,23 +82,19 @@ public class ExternalShuffleClient extends ShuffleClient {
@Override
public void fetchBlocks(
- final String host,
- final int port,
- final String execId,
+ String host,
+ int port,
+ String execId,
String[] blockIds,
BlockFetchingListener listener) {
checkInit();
logger.debug("External shuffle fetch from {}:{} (executor id {})", host, port, execId);
try {
RetryingBlockFetcher.BlockFetchStarter blockFetchStarter =
- new RetryingBlockFetcher.BlockFetchStarter() {
- @Override
- public void createAndStart(String[] blockIds, BlockFetchingListener listener)
- throws IOException, InterruptedException {
+ (blockIds1, listener1) -> {
TransportClient client = clientFactory.createClient(host, port);
- new OneForOneBlockFetcher(client, appId, execId, blockIds, listener).start();
- }
- };
+ new OneForOneBlockFetcher(client, appId, execId, blockIds1, listener1).start();
+ };
int maxRetries = conf.maxIORetries();
if (maxRetries > 0) {
@@ -131,12 +127,9 @@ public class ExternalShuffleClient extends ShuffleClient {
String execId,
ExecutorShuffleInfo executorInfo) throws IOException, InterruptedException {
checkInit();
- TransportClient client = clientFactory.createUnmanagedClient(host, port);
- try {
+ try (TransportClient client = clientFactory.createUnmanagedClient(host, port)) {
ByteBuffer registerMessage = new RegisterExecutor(appId, execId, executorInfo).toByteBuffer();
client.sendRpcSync(registerMessage, 5000 /* timeoutMs */);
- } finally {
- client.close();
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
index 5be8550..f309dda 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
@@ -164,12 +164,9 @@ public class RetryingBlockFetcher {
logger.info("Retrying fetch ({}/{}) for {} outstanding blocks after {} ms",
retryCount, maxRetries, outstandingBlocksIds.size(), retryWaitTime);
- executorService.submit(new Runnable() {
- @Override
- public void run() {
- Uninterruptibles.sleepUninterruptibly(retryWaitTime, TimeUnit.MILLISECONDS);
- fetchAllOutstanding();
- }
+ executorService.submit(() -> {
+ Uninterruptibles.sleepUninterruptibly(retryWaitTime, TimeUnit.MILLISECONDS);
+ fetchAllOutstanding();
});
}
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/common/sketch/pom.xml
----------------------------------------------------------------------
diff --git a/common/sketch/pom.xml b/common/sketch/pom.xml
index bcd26d4..1356c47 100644
--- a/common/sketch/pom.xml
+++ b/common/sketch/pom.xml
@@ -61,6 +61,7 @@
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
+ <version>3.2.2</version>
<configuration>
<javacArgs combine.children="append">
<!-- This option is needed to suppress warnings from sun.misc.Unsafe usage -->
@@ -71,6 +72,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
+ <version>3.6.1</version>
<configuration>
<compilerArgs combine.children="append">
<!-- This option is needed to suppress warnings from sun.misc.Unsafe usage -->
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/common/unsafe/pom.xml
----------------------------------------------------------------------
diff --git a/common/unsafe/pom.xml b/common/unsafe/pom.xml
index dc19f4a..f03a4da 100644
--- a/common/unsafe/pom.xml
+++ b/common/unsafe/pom.xml
@@ -98,6 +98,7 @@
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
+ <version>3.2.2</version>
<configuration>
<javacArgs combine.children="append">
<!-- This option is needed to suppress warnings from sun.misc.Unsafe usage -->
@@ -108,6 +109,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
+ <version>3.6.1</version>
<configuration>
<compilerArgs combine.children="append">
<!-- This option is needed to suppress warnings from sun.misc.Unsafe usage -->
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
index 671b8c7..f13c24a 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
@@ -162,14 +162,9 @@ public final class Platform {
constructor.setAccessible(true);
Field cleanerField = cls.getDeclaredField("cleaner");
cleanerField.setAccessible(true);
- final long memory = allocateMemory(size);
+ long memory = allocateMemory(size);
ByteBuffer buffer = (ByteBuffer) constructor.newInstance(memory, size);
- Cleaner cleaner = Cleaner.create(buffer, new Runnable() {
- @Override
- public void run() {
- freeMemory(memory);
- }
- });
+ Cleaner cleaner = Cleaner.create(buffer, () -> freeMemory(memory));
cleanerField.set(buffer, cleaner);
return buffer;
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java
index fd6e95c..621f2c6 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java
@@ -178,48 +178,52 @@ public final class CalendarInterval implements Serializable {
"Interval string does not match day-time format of 'd h:m:s.n': " + s);
} else {
try {
- if (unit.equals("year")) {
- int year = (int) toLongWithRange("year", m.group(1),
- Integer.MIN_VALUE / 12, Integer.MAX_VALUE / 12);
- result = new CalendarInterval(year * 12, 0L);
-
- } else if (unit.equals("month")) {
- int month = (int) toLongWithRange("month", m.group(1),
- Integer.MIN_VALUE, Integer.MAX_VALUE);
- result = new CalendarInterval(month, 0L);
-
- } else if (unit.equals("week")) {
- long week = toLongWithRange("week", m.group(1),
- Long.MIN_VALUE / MICROS_PER_WEEK, Long.MAX_VALUE / MICROS_PER_WEEK);
- result = new CalendarInterval(0, week * MICROS_PER_WEEK);
-
- } else if (unit.equals("day")) {
- long day = toLongWithRange("day", m.group(1),
- Long.MIN_VALUE / MICROS_PER_DAY, Long.MAX_VALUE / MICROS_PER_DAY);
- result = new CalendarInterval(0, day * MICROS_PER_DAY);
-
- } else if (unit.equals("hour")) {
- long hour = toLongWithRange("hour", m.group(1),
- Long.MIN_VALUE / MICROS_PER_HOUR, Long.MAX_VALUE / MICROS_PER_HOUR);
- result = new CalendarInterval(0, hour * MICROS_PER_HOUR);
-
- } else if (unit.equals("minute")) {
- long minute = toLongWithRange("minute", m.group(1),
- Long.MIN_VALUE / MICROS_PER_MINUTE, Long.MAX_VALUE / MICROS_PER_MINUTE);
- result = new CalendarInterval(0, minute * MICROS_PER_MINUTE);
-
- } else if (unit.equals("second")) {
- long micros = parseSecondNano(m.group(1));
- result = new CalendarInterval(0, micros);
-
- } else if (unit.equals("millisecond")) {
- long millisecond = toLongWithRange("millisecond", m.group(1),
- Long.MIN_VALUE / MICROS_PER_MILLI, Long.MAX_VALUE / MICROS_PER_MILLI);
- result = new CalendarInterval(0, millisecond * MICROS_PER_MILLI);
-
- } else if (unit.equals("microsecond")) {
- long micros = Long.parseLong(m.group(1));
- result = new CalendarInterval(0, micros);
+ switch (unit) {
+ case "year":
+ int year = (int) toLongWithRange("year", m.group(1),
+ Integer.MIN_VALUE / 12, Integer.MAX_VALUE / 12);
+ result = new CalendarInterval(year * 12, 0L);
+ break;
+ case "month":
+ int month = (int) toLongWithRange("month", m.group(1),
+ Integer.MIN_VALUE, Integer.MAX_VALUE);
+ result = new CalendarInterval(month, 0L);
+ break;
+ case "week":
+ long week = toLongWithRange("week", m.group(1),
+ Long.MIN_VALUE / MICROS_PER_WEEK, Long.MAX_VALUE / MICROS_PER_WEEK);
+ result = new CalendarInterval(0, week * MICROS_PER_WEEK);
+ break;
+ case "day":
+ long day = toLongWithRange("day", m.group(1),
+ Long.MIN_VALUE / MICROS_PER_DAY, Long.MAX_VALUE / MICROS_PER_DAY);
+ result = new CalendarInterval(0, day * MICROS_PER_DAY);
+ break;
+ case "hour":
+ long hour = toLongWithRange("hour", m.group(1),
+ Long.MIN_VALUE / MICROS_PER_HOUR, Long.MAX_VALUE / MICROS_PER_HOUR);
+ result = new CalendarInterval(0, hour * MICROS_PER_HOUR);
+ break;
+ case "minute":
+ long minute = toLongWithRange("minute", m.group(1),
+ Long.MIN_VALUE / MICROS_PER_MINUTE, Long.MAX_VALUE / MICROS_PER_MINUTE);
+ result = new CalendarInterval(0, minute * MICROS_PER_MINUTE);
+ break;
+ case "second": {
+ long micros = parseSecondNano(m.group(1));
+ result = new CalendarInterval(0, micros);
+ break;
+ }
+ case "millisecond":
+ long millisecond = toLongWithRange("millisecond", m.group(1),
+ Long.MIN_VALUE / MICROS_PER_MILLI, Long.MAX_VALUE / MICROS_PER_MILLI);
+ result = new CalendarInterval(0, millisecond * MICROS_PER_MILLI);
+ break;
+ case "microsecond": {
+ long micros = Long.parseLong(m.group(1));
+ result = new CalendarInterval(0, micros);
+ break;
+ }
}
} catch (Exception e) {
throw new IllegalArgumentException("Error parsing interval string: " + e.getMessage(), e);
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/Optional.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/Optional.java b/core/src/main/java/org/apache/spark/api/java/Optional.java
index ca7babc..fd0f495 100644
--- a/core/src/main/java/org/apache/spark/api/java/Optional.java
+++ b/core/src/main/java/org/apache/spark/api/java/Optional.java
@@ -18,6 +18,7 @@
package org.apache.spark.api.java;
import java.io.Serializable;
+import java.util.Objects;
import com.google.common.base.Preconditions;
@@ -52,8 +53,8 @@ import com.google.common.base.Preconditions;
* <li>{@link #isPresent()}</li>
* </ul>
*
- * <p>{@code java.util.Optional} itself is not used at this time because the
- * project does not require Java 8. Using {@code com.google.common.base.Optional}
+ * <p>{@code java.util.Optional} itself was not used because at the time, the
+ * project did not require Java 8. Using {@code com.google.common.base.Optional}
* has in the past caused serious library version conflicts with Guava that can't
* be resolved by shading. Hence this work-alike clone.</p>
*
@@ -171,7 +172,7 @@ public final class Optional<T> implements Serializable {
return false;
}
Optional<?> other = (Optional<?>) obj;
- return value == null ? other.value == null : value.equals(other.value);
+ return Objects.equals(value, other.value);
}
@Override
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java b/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java
index 07aebb7..33bedf7 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
* A function that returns zero or more output records from each grouping key and its values from 2
* Datasets.
*/
+@FunctionalInterface
public interface CoGroupFunction<K, V1, V2, R> extends Serializable {
Iterator<R> call(K key, Iterator<V1> left, Iterator<V2> right) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
index 576087b..2f23da5 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
/**
* A function that returns zero or more records of type Double from each input record.
*/
+@FunctionalInterface
public interface DoubleFlatMapFunction<T> extends Serializable {
Iterator<Double> call(T t) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java b/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java
index bf16f79..3c0291c 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
/**
* A function that returns Doubles, and can be used to construct DoubleRDDs.
*/
+@FunctionalInterface
public interface DoubleFunction<T> extends Serializable {
double call(T t) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java b/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java
index 462ca3f..a6f69f7 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java
@@ -24,6 +24,7 @@ import java.io.Serializable;
*
* If the function returns true, the element is included in the returned Dataset.
*/
+@FunctionalInterface
public interface FilterFunction<T> extends Serializable {
boolean call(T value) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
index 2d8ea6d..91d6129 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
/**
* A function that returns zero or more output records from each input record.
*/
+@FunctionalInterface
public interface FlatMapFunction<T, R> extends Serializable {
Iterator<R> call(T t) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
index fc97b63..f9f2580 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
/**
* A function that takes two inputs and returns zero or more output records.
*/
+@FunctionalInterface
public interface FlatMapFunction2<T1, T2, R> extends Serializable {
Iterator<R> call(T1 t1, T2 t2) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java
index bae574a..6423c5d 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
/**
* A function that returns zero or more output records from each grouping key and its values.
*/
+@FunctionalInterface
public interface FlatMapGroupsFunction<K, V, R> extends Serializable {
Iterator<R> call(K key, Iterator<V> values) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/ForeachFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/ForeachFunction.java b/core/src/main/java/org/apache/spark/api/java/function/ForeachFunction.java
index 07e54b2..2e6e908 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/ForeachFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/ForeachFunction.java
@@ -24,6 +24,7 @@ import java.io.Serializable;
*
* Spark will invoke the call function on each element in the input Dataset.
*/
+@FunctionalInterface
public interface ForeachFunction<T> extends Serializable {
void call(T t) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/ForeachPartitionFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/ForeachPartitionFunction.java b/core/src/main/java/org/apache/spark/api/java/function/ForeachPartitionFunction.java
index 4938a51..d8f55d0 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/ForeachPartitionFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/ForeachPartitionFunction.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
/**
* Base interface for a function used in Dataset's foreachPartition function.
*/
+@FunctionalInterface
public interface ForeachPartitionFunction<T> extends Serializable {
void call(Iterator<T> t) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/Function.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function.java b/core/src/main/java/org/apache/spark/api/java/function/Function.java
index b9d9777..8b2bbd5 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/Function.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/Function.java
@@ -24,6 +24,7 @@ import java.io.Serializable;
* DoubleFunction are handled separately, to allow PairRDDs and DoubleRDDs to be constructed
* when mapping RDDs of other types.
*/
+@FunctionalInterface
public interface Function<T1, R> extends Serializable {
R call(T1 v1) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/Function0.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function0.java b/core/src/main/java/org/apache/spark/api/java/function/Function0.java
index c86928d..5c649d9 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/Function0.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/Function0.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
/**
* A zero-argument function that returns an R.
*/
+@FunctionalInterface
public interface Function0<R> extends Serializable {
R call() throws Exception;
}
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/Function2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function2.java b/core/src/main/java/org/apache/spark/api/java/function/Function2.java
index a975ce3..a7d9647 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/Function2.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/Function2.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
/**
* A two-argument function that takes arguments of type T1 and T2 and returns an R.
*/
+@FunctionalInterface
public interface Function2<T1, T2, R> extends Serializable {
R call(T1 v1, T2 v2) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/Function3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function3.java b/core/src/main/java/org/apache/spark/api/java/function/Function3.java
index 6eecfb6..77acd21 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/Function3.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/Function3.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
/**
* A three-argument function that takes arguments of type T1, T2 and T3 and returns an R.
*/
+@FunctionalInterface
public interface Function3<T1, T2, T3, R> extends Serializable {
R call(T1 v1, T2 v2, T3 v3) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/Function4.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function4.java b/core/src/main/java/org/apache/spark/api/java/function/Function4.java
index 9c35a22..d530ba4 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/Function4.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/Function4.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
/**
* A four-argument function that takes arguments of type T1, T2, T3 and T4 and returns an R.
*/
+@FunctionalInterface
public interface Function4<T1, T2, T3, T4, R> extends Serializable {
R call(T1 v1, T2 v2, T3 v3, T4 v4) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/MapFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/MapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/MapFunction.java
index 3ae6ef4..5efff94 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/MapFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/MapFunction.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
/**
* Base interface for a map function used in Dataset's map function.
*/
+@FunctionalInterface
public interface MapFunction<T, U> extends Serializable {
U call(T value) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/MapGroupsFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/MapGroupsFunction.java b/core/src/main/java/org/apache/spark/api/java/function/MapGroupsFunction.java
index faa59ea..2c3d43a 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/MapGroupsFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/MapGroupsFunction.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
/**
* Base interface for a map function used in GroupedDataset's mapGroup function.
*/
+@FunctionalInterface
public interface MapGroupsFunction<K, V, R> extends Serializable {
R call(K key, Iterator<V> values) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java b/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java
index cf9945a..68e8557 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
/**
* Base interface for function used in Dataset's mapPartitions.
*/
+@FunctionalInterface
public interface MapPartitionsFunction<T, U> extends Serializable {
Iterator<U> call(Iterator<T> input) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
index 51eed2e..97bd2b3 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
@@ -26,6 +26,7 @@ import scala.Tuple2;
* A function that returns zero or more key-value pair records from each input record. The
* key-value pairs are represented as scala.Tuple2 objects.
*/
+@FunctionalInterface
public interface PairFlatMapFunction<T, K, V> extends Serializable {
Iterator<Tuple2<K, V>> call(T t) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java b/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java
index 2fdfa71..34a7e44 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java
@@ -25,6 +25,7 @@ import scala.Tuple2;
* A function that returns key-value pairs (Tuple2<K, V>), and can be used to
* construct PairRDDs.
*/
+@FunctionalInterface
public interface PairFunction<T, K, V> extends Serializable {
Tuple2<K, V> call(T t) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/ReduceFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/ReduceFunction.java b/core/src/main/java/org/apache/spark/api/java/function/ReduceFunction.java
index ee092d0..d9029d8 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/ReduceFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/ReduceFunction.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
/**
* Base interface for function used in Dataset's reduce.
*/
+@FunctionalInterface
public interface ReduceFunction<T> extends Serializable {
T call(T v1, T v2) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java
index f30d42e..aff2bc6 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
/**
* A function with no return value.
*/
+@FunctionalInterface
public interface VoidFunction<T> extends Serializable {
void call(T t) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java
index da9ae1c..ddb6162 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
/**
* A two-argument function that takes arguments of type T1 and T2 with no return value.
*/
+@FunctionalInterface
public interface VoidFunction2<T1, T2> extends Serializable {
void call(T1 v1, T2 v2) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index dcae4a3..189d607 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -162,14 +162,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
// Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at
// the end of the task. This is necessary to avoid memory leaks in when the downstream operator
// does not fully consume the sorter's output (e.g. sort followed by limit).
- taskContext.addTaskCompletionListener(
- new TaskCompletionListener() {
- @Override
- public void onTaskCompletion(TaskContext context) {
- cleanupResources();
- }
- }
- );
+ taskContext.addTaskCompletionListener(context -> { cleanupResources(); });
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java
index 01aed95..cf4dfde 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java
@@ -27,22 +27,18 @@ final class UnsafeSorterSpillMerger {
private final PriorityQueue<UnsafeSorterIterator> priorityQueue;
UnsafeSorterSpillMerger(
- final RecordComparator recordComparator,
- final PrefixComparator prefixComparator,
- final int numSpills) {
- final Comparator<UnsafeSorterIterator> comparator = new Comparator<UnsafeSorterIterator>() {
-
- @Override
- public int compare(UnsafeSorterIterator left, UnsafeSorterIterator right) {
- final int prefixComparisonResult =
- prefixComparator.compare(left.getKeyPrefix(), right.getKeyPrefix());
- if (prefixComparisonResult == 0) {
- return recordComparator.compare(
- left.getBaseObject(), left.getBaseOffset(),
- right.getBaseObject(), right.getBaseOffset());
- } else {
- return prefixComparisonResult;
- }
+ RecordComparator recordComparator,
+ PrefixComparator prefixComparator,
+ int numSpills) {
+ Comparator<UnsafeSorterIterator> comparator = (left, right) -> {
+ int prefixComparisonResult =
+ prefixComparator.compare(left.getKeyPrefix(), right.getKeyPrefix());
+ if (prefixComparisonResult == 0) {
+ return recordComparator.compare(
+ left.getBaseObject(), left.getBaseOffset(),
+ right.getBaseObject(), right.getBaseOffset());
+ } else {
+ return prefixComparisonResult;
}
};
priorityQueue = new PriorityQueue<>(numSpills, comparator);
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index cbab7b8..7e56406 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -350,9 +350,6 @@ class SparkContext(config: SparkConf) extends Logging {
private def warnDeprecatedVersions(): Unit = {
val javaVersion = System.getProperty("java.version").split("[+.\\-]+", 3)
- if (javaVersion.length >= 2 && javaVersion(1).toInt == 7) {
- logWarning("Support for Java 7 is deprecated as of Spark 2.0.0")
- }
if (scala.util.Properties.releaseVersion.exists(_.startsWith("2.10"))) {
logWarning("Support for Scala 2.10 is deprecated as of Spark 2.1.0")
}
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala b/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala
index 31b9c5e..3fd812e 100644
--- a/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala
@@ -39,7 +39,6 @@ private[spark] class WorkerCommandBuilder(sparkHome: String, memoryMb: Int, comm
val cmd = buildJavaCommand(command.classPathEntries.mkString(File.pathSeparator))
cmd.add(s"-Xmx${memoryMb}M")
command.javaOpts.foreach(cmd.add)
- CommandBuilderUtils.addPermGenSizeOpt(cmd)
addOptionString(cmd, getenv("SPARK_JAVA_OPTS"))
cmd
}
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index fe6fe6a..1e6e9a2 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1882,20 +1882,17 @@ private[spark] object Utils extends Logging {
def terminateProcess(process: Process, timeoutMs: Long): Option[Int] = {
// Politely destroy first
process.destroy()
-
- if (waitForProcess(process, timeoutMs)) {
+ if (process.waitFor(timeoutMs, TimeUnit.MILLISECONDS)) {
// Successful exit
Option(process.exitValue())
} else {
- // Java 8 added a new API which will more forcibly kill the process. Use that if available.
try {
- classOf[Process].getMethod("destroyForcibly").invoke(process)
+ process.destroyForcibly()
} catch {
- case _: NoSuchMethodException => return None // Not available; give up
case NonFatal(e) => logWarning("Exception when attempting to kill process", e)
}
// Wait, again, although this really should return almost immediately
- if (waitForProcess(process, timeoutMs)) {
+ if (process.waitFor(timeoutMs, TimeUnit.MILLISECONDS)) {
Option(process.exitValue())
} else {
logWarning("Timed out waiting to forcibly kill process")
@@ -1905,44 +1902,11 @@ private[spark] object Utils extends Logging {
}
/**
- * Wait for a process to terminate for at most the specified duration.
- *
- * @return whether the process actually terminated before the given timeout.
- */
- def waitForProcess(process: Process, timeoutMs: Long): Boolean = {
- try {
- // Use Java 8 method if available
- classOf[Process].getMethod("waitFor", java.lang.Long.TYPE, classOf[TimeUnit])
- .invoke(process, timeoutMs.asInstanceOf[java.lang.Long], TimeUnit.MILLISECONDS)
- .asInstanceOf[Boolean]
- } catch {
- case _: NoSuchMethodException =>
- // Otherwise implement it manually
- var terminated = false
- val startTime = System.currentTimeMillis
- while (!terminated) {
- try {
- process.exitValue()
- terminated = true
- } catch {
- case e: IllegalThreadStateException =>
- // Process not terminated yet
- if (System.currentTimeMillis - startTime > timeoutMs) {
- return false
- }
- Thread.sleep(100)
- }
- }
- true
- }
- }
-
- /**
* Return the stderr of a process after waiting for the process to terminate.
* If the process does not terminate within the specified timeout, return None.
*/
def getStderr(process: Process, timeoutMs: Long): Option[String] = {
- val terminated = Utils.waitForProcess(process, timeoutMs)
+ val terminated = process.waitFor(timeoutMs, TimeUnit.MILLISECONDS)
if (terminated) {
Some(Source.fromInputStream(process.getErrorStream).getLines().mkString("\n"))
} else {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org