You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/19 08:01:14 UTC
[1/9] flink git commit: [FLINK-9180] [conf] Remove REST_ prefix from
rest options
Repository: flink
Updated Branches:
refs/heads/release-1.5 84469f841 -> ed4f4f1af
[FLINK-9180] [conf] Remove REST_ prefix from rest options
This closes #5852.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5fed1173
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5fed1173
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5fed1173
Branch: refs/heads/release-1.5
Commit: 5fed11735ecbf31f6108c9542af764892168b60c
Parents: 84469f8
Author: zhangminglei <zm...@163.com>
Authored: Tue Apr 17 10:52:20 2018 +0800
Committer: zentol <ch...@apache.org>
Committed: Thu Apr 19 10:00:59 2018 +0200
----------------------------------------------------------------------
.../java/org/apache/flink/client/LocalExecutor.java | 6 +++---
.../java/org/apache/flink/client/RemoteExecutor.java | 2 +-
.../java/org/apache/flink/client/cli/CliFrontend.java | 4 ++--
.../client/program/rest/RestClusterClientTest.java | 4 ++--
.../org/apache/flink/configuration/RestOptions.java | 10 +++++-----
.../org/apache/flink/docs/rest/RestAPIDocGenerator.java | 2 +-
.../org/apache/flink/api/java/ExecutionEnvironment.java | 4 ++--
.../flink/runtime/entrypoint/ClusterEntrypoint.java | 2 +-
.../highavailability/HighAvailabilityServicesUtils.java | 6 +++---
.../runtime/minicluster/MiniClusterConfiguration.java | 4 ++--
.../flink/runtime/rest/FlinkHttpObjectAggregator.java | 2 +-
.../java/org/apache/flink/runtime/rest/RestClient.java | 2 +-
.../flink/runtime/rest/RestClientConfiguration.java | 2 +-
.../runtime/rest/RestServerEndpointConfiguration.java | 12 ++++++------
.../flink/runtime/rest/RestServerEndpointITCase.java | 8 ++++----
.../api/environment/LocalStreamEnvironment.java | 6 +++---
.../api/environment/StreamExecutionEnvironment.java | 4 ++--
.../org/apache/flink/test/util/MiniClusterResource.java | 6 +++---
.../runtime/BigUserProgramJobSubmitITCase.java | 2 +-
.../flink/yarn/AbstractYarnClusterDescriptor.java | 8 ++++----
.../flink/yarn/entrypoint/YarnEntrypointUtils.java | 6 +++---
21 files changed, 51 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5fed1173/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index f209b45..01c281f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -126,8 +126,8 @@ public class LocalExecutor extends PlanExecutor {
final JobExecutorService newJobExecutorService;
if (CoreOptions.NEW_MODE.equals(configuration.getString(CoreOptions.MODE))) {
- if (!configuration.contains(RestOptions.REST_PORT)) {
- configuration.setInteger(RestOptions.REST_PORT, 0);
+ if (!configuration.contains(RestOptions.PORT)) {
+ configuration.setInteger(RestOptions.PORT, 0);
}
final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
@@ -145,7 +145,7 @@ public class LocalExecutor extends PlanExecutor {
final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration);
miniCluster.start();
- configuration.setInteger(RestOptions.REST_PORT, miniCluster.getRestAddress().getPort());
+ configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort());
newJobExecutorService = miniCluster;
} else {
http://git-wip-us.apache.org/repos/asf/flink/blob/5fed1173/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index f6242e7..0a2f1b4 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -113,7 +113,7 @@ public class RemoteExecutor extends PlanExecutor {
clientConfiguration.setString(JobManagerOptions.ADDRESS, inet.getHostName());
clientConfiguration.setInteger(JobManagerOptions.PORT, inet.getPort());
- clientConfiguration.setInteger(RestOptions.REST_PORT, inet.getPort());
+ clientConfiguration.setInteger(RestOptions.PORT, inet.getPort());
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5fed1173/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 65f470b..7745ca0 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -1142,8 +1142,8 @@ public class CliFrontend {
public static void setJobManagerAddressInConfig(Configuration config, InetSocketAddress address) {
config.setString(JobManagerOptions.ADDRESS, address.getHostString());
config.setInteger(JobManagerOptions.PORT, address.getPort());
- config.setString(RestOptions.REST_ADDRESS, address.getHostString());
- config.setInteger(RestOptions.REST_PORT, address.getPort());
+ config.setString(RestOptions.ADDRESS, address.getHostString());
+ config.setInteger(RestOptions.PORT, address.getPort());
}
public static List<CustomCommandLine<?>> loadCustomCommandLines(Configuration configuration, String configurationDirectory) {
http://git-wip-us.apache.org/repos/asf/flink/blob/5fed1173/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index e2daad6..fd05cad 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -692,8 +692,8 @@ public class RestClusterClientTest extends TestLogger {
configuration.setString(JobManagerOptions.ADDRESS, configuredHostname);
configuration.setInteger(JobManagerOptions.PORT, configuredPort);
- configuration.setString(RestOptions.REST_ADDRESS, configuredHostname);
- configuration.setInteger(RestOptions.REST_PORT, configuredPort);
+ configuration.setString(RestOptions.ADDRESS, configuredHostname);
+ configuration.setInteger(RestOptions.PORT, configuredPort);
final DefaultCLI defaultCLI = new DefaultCLI(configuration);
http://git-wip-us.apache.org/repos/asf/flink/blob/5fed1173/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
index e7421c4..5cbd027 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
@@ -31,7 +31,7 @@ public class RestOptions {
/**
* The address that the server binds itself to.
*/
- public static final ConfigOption<String> REST_BIND_ADDRESS =
+ public static final ConfigOption<String> BIND_ADDRESS =
key("rest.bind-address")
.noDefaultValue()
.withDescription("The address that the server binds itself.");
@@ -39,7 +39,7 @@ public class RestOptions {
/**
* The address that should be used by clients to connect to the server.
*/
- public static final ConfigOption<String> REST_ADDRESS =
+ public static final ConfigOption<String> ADDRESS =
key("rest.address")
.noDefaultValue()
.withDeprecatedKeys(JobManagerOptions.ADDRESS.key())
@@ -48,7 +48,7 @@ public class RestOptions {
/**
* The port that the server listens on / the client connects to.
*/
- public static final ConfigOption<Integer> REST_PORT =
+ public static final ConfigOption<Integer> PORT =
key("rest.port")
.defaultValue(8081)
.withDescription("The port that the server listens on / the client connects to.");
@@ -94,7 +94,7 @@ public class RestOptions {
/**
* The maximum content length that the server will handle.
*/
- public static final ConfigOption<Integer> REST_SERVER_MAX_CONTENT_LENGTH =
+ public static final ConfigOption<Integer> SERVER_MAX_CONTENT_LENGTH =
key("rest.server.max-content-length")
.defaultValue(104_857_600)
.withDescription("The maximum content length in bytes that the server will handle.");
@@ -102,7 +102,7 @@ public class RestOptions {
/**
* The maximum content length that the client will handle.
*/
- public static final ConfigOption<Integer> REST_CLIENT_MAX_CONTENT_LENGTH =
+ public static final ConfigOption<Integer> CLIENT_MAX_CONTENT_LENGTH =
key("rest.client.max-content-length")
.defaultValue(104_857_600)
.withDescription("The maximum content length in bytes that the client will handle.");
http://git-wip-us.apache.org/repos/asf/flink/blob/5fed1173/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
----------------------------------------------------------------------
diff --git a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
index 5545272..79bf677 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
@@ -313,7 +313,7 @@ public class RestAPIDocGenerator {
static {
config = new Configuration();
- config.setString(RestOptions.REST_ADDRESS, "localhost");
+ config.setString(RestOptions.ADDRESS, "localhost");
try {
restConfig = RestServerEndpointConfiguration.fromConfiguration(config);
} catch (ConfigurationException e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/5fed1173/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 3ea99ea..3d858aa 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -1126,9 +1126,9 @@ public abstract class ExecutionEnvironment {
conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
- if (!conf.contains(RestOptions.REST_PORT)) {
+ if (!conf.contains(RestOptions.PORT)) {
// explicitly set this option so that it's not set to 0 later
- conf.setInteger(RestOptions.REST_PORT, RestOptions.REST_PORT.defaultValue());
+ conf.setInteger(RestOptions.PORT, RestOptions.PORT.defaultValue());
}
return createLocalEnvironment(conf, -1);
http://git-wip-us.apache.org/repos/asf/flink/blob/5fed1173/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 0993cb6..42a3d1a 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -704,7 +704,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
final int restPort = clusterConfiguration.getRestPort();
if (restPort >= 0) {
- configuration.setInteger(RestOptions.REST_PORT, restPort);
+ configuration.setInteger(RestOptions.PORT, restPort);
}
return configuration;
http://git-wip-us.apache.org/repos/asf/flink/blob/5fed1173/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
index f19a421..918f1f0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
@@ -99,10 +99,10 @@ public class HighAvailabilityServicesUtils {
addressResolution,
configuration);
- final String address = checkNotNull(configuration.getString(RestOptions.REST_ADDRESS),
+ final String address = checkNotNull(configuration.getString(RestOptions.ADDRESS),
"%s must be set",
- RestOptions.REST_ADDRESS.key());
- final int port = configuration.getInteger(RestOptions.REST_PORT);
+ RestOptions.ADDRESS.key());
+ final int port = configuration.getInteger(RestOptions.PORT);
final boolean enableSSL = configuration.getBoolean(SecurityOptions.SSL_ENABLED);
final String protocol = enableSSL ? "https://" : "http://";
http://git-wip-us.apache.org/repos/asf/flink/blob/5fed1173/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
index fe76694..44a567b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -169,8 +169,8 @@ public class MiniClusterConfiguration {
final Configuration modifiedConfiguration = new Configuration(configuration);
modifiedConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
modifiedConfiguration.setString(
- RestOptions.REST_ADDRESS,
- modifiedConfiguration.getString(RestOptions.REST_ADDRESS, "localhost"));
+ RestOptions.ADDRESS,
+ modifiedConfiguration.getString(RestOptions.ADDRESS, "localhost"));
return new MiniClusterConfiguration(
modifiedConfiguration,
http://git-wip-us.apache.org/repos/asf/flink/blob/5fed1173/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FlinkHttpObjectAggregator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FlinkHttpObjectAggregator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FlinkHttpObjectAggregator.java
index 4ee0256..79ad598 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FlinkHttpObjectAggregator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FlinkHttpObjectAggregator.java
@@ -59,7 +59,7 @@ public class FlinkHttpObjectAggregator extends org.apache.flink.shaded.netty4.io
false,
new ErrorResponseBody(String.format(
e.getMessage() + " Try to raise [%s]",
- RestOptions.REST_SERVER_MAX_CONTENT_LENGTH.key())),
+ RestOptions.SERVER_MAX_CONTENT_LENGTH.key())),
HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE,
responseHeaders);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5fed1173/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index df97f20..8f7dfed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -273,7 +273,7 @@ public class RestClient {
if (cause instanceof TooLongFrameException) {
jsonFuture.completeExceptionally(new TooLongFrameException(String.format(
cause.getMessage() + " Try to raise [%s]",
- RestOptions.REST_CLIENT_MAX_CONTENT_LENGTH.key())));
+ RestOptions.CLIENT_MAX_CONTENT_LENGTH.key())));
} else {
jsonFuture.completeExceptionally(cause);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5fed1173/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
index 17d4264..0e98e8f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
@@ -107,7 +107,7 @@ public final class RestClientConfiguration {
final long connectionTimeout = config.getLong(RestOptions.CONNECTION_TIMEOUT);
- int maxContentLength = config.getInteger(RestOptions.REST_CLIENT_MAX_CONTENT_LENGTH);
+ int maxContentLength = config.getInteger(RestOptions.CLIENT_MAX_CONTENT_LENGTH);
return new RestClientConfiguration(sslEngine, connectionTimeout, maxContentLength);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5fed1173/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
index 8af76f5..542a937 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
@@ -81,7 +81,7 @@ public final class RestServerEndpointConfiguration {
}
/**
- * @see RestOptions#REST_ADDRESS
+ * @see RestOptions#ADDRESS
*/
public String getRestAddress() {
return restAddress;
@@ -147,12 +147,12 @@ public final class RestServerEndpointConfiguration {
public static RestServerEndpointConfiguration fromConfiguration(Configuration config) throws ConfigurationException {
Preconditions.checkNotNull(config);
- final String restAddress = Preconditions.checkNotNull(config.getString(RestOptions.REST_ADDRESS),
+ final String restAddress = Preconditions.checkNotNull(config.getString(RestOptions.ADDRESS),
"%s must be set",
- RestOptions.REST_ADDRESS.key());
+ RestOptions.ADDRESS.key());
- final String restBindAddress = config.getString(RestOptions.REST_BIND_ADDRESS);
- final int port = config.getInteger(RestOptions.REST_PORT);
+ final String restBindAddress = config.getString(RestOptions.BIND_ADDRESS);
+ final int port = config.getInteger(RestOptions.PORT);
SSLEngine sslEngine = null;
final boolean enableSSL = config.getBoolean(SecurityOptions.SSL_ENABLED);
@@ -173,7 +173,7 @@ public final class RestServerEndpointConfiguration {
config.getString(WebOptions.UPLOAD_DIR, config.getString(WebOptions.TMP_DIR)),
"flink-web-upload");
- final int maxContentLength = config.getInteger(RestOptions.REST_SERVER_MAX_CONTENT_LENGTH);
+ final int maxContentLength = config.getInteger(RestOptions.SERVER_MAX_CONTENT_LENGTH);
final Map<String, String> responseHeaders = Collections.singletonMap(
HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN,
http://git-wip-us.apache.org/repos/asf/flink/blob/5fed1173/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index 88fdeb8..09e36de 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -114,11 +114,11 @@ public class RestServerEndpointITCase extends TestLogger {
@Before
public void setup() throws Exception {
Configuration config = new Configuration();
- config.setInteger(RestOptions.REST_PORT, 0);
- config.setString(RestOptions.REST_ADDRESS, "localhost");
+ config.setInteger(RestOptions.PORT, 0);
+ config.setString(RestOptions.ADDRESS, "localhost");
config.setString(WebOptions.UPLOAD_DIR, temporaryFolder.newFolder().getCanonicalPath());
- config.setInteger(RestOptions.REST_SERVER_MAX_CONTENT_LENGTH, TEST_REST_MAX_CONTENT_LENGTH);
- config.setInteger(RestOptions.REST_CLIENT_MAX_CONTENT_LENGTH, TEST_REST_MAX_CONTENT_LENGTH);
+ config.setInteger(RestOptions.SERVER_MAX_CONTENT_LENGTH, TEST_REST_MAX_CONTENT_LENGTH);
+ config.setInteger(RestOptions.CLIENT_MAX_CONTENT_LENGTH, TEST_REST_MAX_CONTENT_LENGTH);
RestServerEndpointConfiguration serverConfig = RestServerEndpointConfiguration.fromConfiguration(config);
RestClientConfiguration clientConfig = RestClientConfiguration.fromConfiguration(config);
http://git-wip-us.apache.org/repos/asf/flink/blob/5fed1173/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index b9c76b2..8295e3c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -99,8 +99,8 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
// add (and override) the settings with what the user defined
configuration.addAll(this.configuration);
- if (!configuration.contains(RestOptions.REST_PORT)) {
- configuration.setInteger(RestOptions.REST_PORT, 0);
+ if (!configuration.contains(RestOptions.PORT)) {
+ configuration.setInteger(RestOptions.PORT, 0);
}
MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
@@ -116,7 +116,7 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
try {
miniCluster.start();
- configuration.setInteger(RestOptions.REST_PORT, miniCluster.getRestAddress().getPort());
+ configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort());
return miniCluster.executeJobBlocking(jobGraph);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5fed1173/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 7372fe8..624c938 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -1680,9 +1680,9 @@ public abstract class StreamExecutionEnvironment {
conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
- if (!conf.contains(RestOptions.REST_PORT)) {
+ if (!conf.contains(RestOptions.PORT)) {
// explicitly set this option so that it's not set to 0 later
- conf.setInteger(RestOptions.REST_PORT, RestOptions.REST_PORT.defaultValue());
+ conf.setInteger(RestOptions.PORT, RestOptions.PORT.defaultValue());
}
return createLocalEnvironment(defaultLocalParallelism, conf);
http://git-wip-us.apache.org/repos/asf/flink/blob/5fed1173/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
index 531a3c7..324c9ee 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -221,7 +221,7 @@ public class MiniClusterResource extends ExternalResource {
}
// set rest port to 0 to avoid clashes with concurrent MiniClusters
- configuration.setInteger(RestOptions.REST_PORT, 0);
+ configuration.setInteger(RestOptions.PORT, 0);
final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
.setConfiguration(configuration)
@@ -234,7 +234,7 @@ public class MiniClusterResource extends ExternalResource {
miniCluster.start();
// update the port of the rest endpoint
- configuration.setInteger(RestOptions.REST_PORT, miniCluster.getRestAddress().getPort());
+ configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort());
jobExecutorService = miniCluster;
if (enableClusterClient) {
@@ -242,7 +242,7 @@ public class MiniClusterResource extends ExternalResource {
}
Configuration restClientConfig = new Configuration();
restClientConfig.setString(JobManagerOptions.ADDRESS, miniCluster.getRestAddress().getHost());
- restClientConfig.setInteger(RestOptions.REST_PORT, miniCluster.getRestAddress().getPort());
+ restClientConfig.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort());
this.restClusterClientConfig = new UnmodifiableConfiguration(restClientConfig);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5fed1173/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java
index b10dbec..5fb3e4d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java
@@ -73,7 +73,7 @@ public class BigUserProgramJobSubmitITCase extends TestLogger {
final Configuration clientConfig = new Configuration();
clientConfig.setString(JobManagerOptions.ADDRESS, restAddress.getHost());
- clientConfig.setInteger(RestOptions.REST_PORT, restAddress.getPort());
+ clientConfig.setInteger(RestOptions.PORT, restAddress.getPort());
CLIENT = new RestClusterClient<>(
clientConfig,
http://git-wip-us.apache.org/repos/asf/flink/blob/5fed1173/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 8538c1f..aec5fdb 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -382,8 +382,8 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
flinkConfiguration.setString(JobManagerOptions.ADDRESS, host);
flinkConfiguration.setInteger(JobManagerOptions.PORT, rpcPort);
- flinkConfiguration.setString(RestOptions.REST_ADDRESS, host);
- flinkConfiguration.setInteger(RestOptions.REST_PORT, rpcPort);
+ flinkConfiguration.setString(RestOptions.ADDRESS, host);
+ flinkConfiguration.setInteger(RestOptions.PORT, rpcPort);
return createYarnClusterClient(
this,
@@ -542,8 +542,8 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
flinkConfiguration.setString(JobManagerOptions.ADDRESS, host);
flinkConfiguration.setInteger(JobManagerOptions.PORT, port);
- flinkConfiguration.setString(RestOptions.REST_ADDRESS, host);
- flinkConfiguration.setInteger(RestOptions.REST_PORT, port);
+ flinkConfiguration.setString(RestOptions.ADDRESS, host);
+ flinkConfiguration.setInteger(RestOptions.PORT, port);
// the Flink cluster is deployed in YARN. Represent cluster
return createYarnClusterClient(
http://git-wip-us.apache.org/repos/asf/flink/blob/5fed1173/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
index c50c043..25d138d 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
@@ -95,7 +95,7 @@ public class YarnEntrypointUtils {
ApplicationConstants.Environment.NM_HOST.key());
configuration.setString(JobManagerOptions.ADDRESS, hostname);
- configuration.setString(RestOptions.REST_ADDRESS, hostname);
+ configuration.setString(RestOptions.ADDRESS, hostname);
// TODO: Support port ranges for the AM
// final String portRange = configuration.getString(
@@ -115,9 +115,9 @@ public class YarnEntrypointUtils {
configuration.setInteger(WebOptions.PORT, 0);
}
- if (configuration.getInteger(RestOptions.REST_PORT) >= 0) {
+ if (configuration.getInteger(RestOptions.PORT) >= 0) {
// set the REST port to 0 to select it randomly
- configuration.setInteger(RestOptions.REST_PORT, 0);
+ configuration.setInteger(RestOptions.PORT, 0);
}
// if the user has set the deprecated YARN-specific config keys, we add the
[8/9] flink git commit: [FLINK-9206][checkpoints] add job IDs to
CheckpointCoordinator log messages This closes #5872.
Posted by ch...@apache.org.
[FLINK-9206][checkpoints] add job IDs to CheckpointCoordinator log messages This closes #5872.
This closes #5873.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/71c2ac31
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/71c2ac31
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/71c2ac31
Branch: refs/heads/release-1.5
Commit: 71c2ac313315cb6b2b5d269041788b9907e07de1
Parents: f1a82d5
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Wed Apr 18 15:01:32 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Apr 19 10:01:01 2018 +0200
----------------------------------------------------------------------
.../checkpoint/CheckpointCoordinator.java | 57 +++++++++++---------
1 file changed, 31 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/71c2ac31/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 59916fd..4ddac003 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -316,7 +316,7 @@ public class CheckpointCoordinator {
synchronized (lock) {
if (!shutdown) {
shutdown = true;
- LOG.info("Stopping checkpoint coordinator for job " + job);
+ LOG.info("Stopping checkpoint coordinator for job {}.", job);
periodicScheduling = false;
triggerRequestQueued = false;
@@ -414,7 +414,7 @@ public class CheckpointCoordinator {
if (!props.forceCheckpoint()) {
// sanity check: there should never be more than one trigger request queued
if (triggerRequestQueued) {
- LOG.warn("Trying to trigger another checkpoint while one was queued already");
+ LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", job);
return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
}
@@ -455,8 +455,9 @@ public class CheckpointCoordinator {
if (ee != null && ee.getState() == ExecutionState.RUNNING) {
executions[i] = ee;
} else {
- LOG.info("Checkpoint triggering task {} is not being executed at the moment. Aborting checkpoint.",
- tasksToTrigger[i].getTaskNameWithSubtaskIndex());
+ LOG.info("Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.",
+ tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
+ job);
return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
}
}
@@ -470,8 +471,9 @@ public class CheckpointCoordinator {
if (ee != null) {
ackTasks.put(ee.getAttemptId(), ev);
} else {
- LOG.info("Checkpoint acknowledging task {} is not being executed at the moment. Aborting checkpoint.",
- ev.getTaskNameWithSubtaskIndex());
+ LOG.info("Checkpoint acknowledging task {} of job {} is not being executed at the moment. Aborting checkpoint.",
+ ev.getTaskNameWithSubtaskIndex(),
+ job);
return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
}
}
@@ -498,7 +500,10 @@ public class CheckpointCoordinator {
}
catch (Throwable t) {
int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
- LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t);
+ LOG.warn("Failed to trigger checkpoint for job {} ({} consecutive failed attempts so far).",
+ job,
+ numUnsuccessful,
+ t);
return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
}
@@ -526,7 +531,7 @@ public class CheckpointCoordinator {
// only do the work if the checkpoint is not discarded anyways
// note that checkpoint completion discards the pending checkpoint object
if (!checkpoint.isDiscarded()) {
- LOG.info("Checkpoint " + checkpointID + " expired before completing.");
+ LOG.info("Checkpoint {} of job {} expired before completing.", checkpointID, job);
checkpoint.abortExpired();
pendingCheckpoints.remove(checkpointID);
@@ -547,7 +552,7 @@ public class CheckpointCoordinator {
}
else if (!props.forceCheckpoint()) {
if (triggerRequestQueued) {
- LOG.warn("Trying to trigger another checkpoint while one was queued already");
+ LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", job);
return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
}
@@ -579,7 +584,7 @@ public class CheckpointCoordinator {
}
}
- LOG.info("Triggering checkpoint " + checkpointID + " @ " + timestamp);
+ LOG.info("Triggering checkpoint {} @ {} for job {}.", checkpointID, timestamp, job);
pendingCheckpoints.put(checkpointID, checkpoint);
@@ -620,8 +625,8 @@ public class CheckpointCoordinator {
}
int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
- LOG.warn("Failed to trigger checkpoint {}. ({} consecutive failed attempts so far)",
- checkpointID, numUnsuccessful, t);
+ LOG.warn("Failed to trigger checkpoint {} for job {}. ({} consecutive failed attempts so far)",
+ checkpointID, job, numUnsuccessful, t);
if (!checkpoint.isDiscarded()) {
checkpoint.abortError(new Exception("Failed to trigger checkpoint", t));
@@ -673,7 +678,7 @@ public class CheckpointCoordinator {
checkpoint = pendingCheckpoints.remove(checkpointId);
if (checkpoint != null && !checkpoint.isDiscarded()) {
- LOG.info("Decline checkpoint {} by task {}.", checkpointId, message.getTaskExecutionId());
+ LOG.info("Decline checkpoint {} by task {} of job {}.", checkpointId, message.getTaskExecutionId(), job);
discardCheckpoint(checkpoint, message.getReason());
}
else if (checkpoint != null) {
@@ -684,12 +689,12 @@ public class CheckpointCoordinator {
else if (LOG.isDebugEnabled()) {
if (recentPendingCheckpoints.contains(checkpointId)) {
// message is for an unknown checkpoint, or comes too late (checkpoint disposed)
- LOG.debug("Received another decline message for now expired checkpoint attempt {} : {}",
- checkpointId, reason);
+ LOG.debug("Received another decline message for now expired checkpoint attempt {} of job {} : {}",
+ checkpointId, job, reason);
} else {
// message is for an unknown checkpoint. might be so old that we don't even remember it any more
- LOG.debug("Received decline message for unknown (too old?) checkpoint attempt {} : {}",
- checkpointId, reason);
+ LOG.debug("Received decline message for unknown (too old?) checkpoint attempt {} of job {} : {}",
+ checkpointId, job, reason);
}
}
}
@@ -834,7 +839,7 @@ public class CheckpointCoordinator {
try {
completedCheckpoint.discardOnFailedStoring();
} catch (Throwable t) {
- LOG.warn("Could not properly discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), t);
+ LOG.warn("Could not properly discard completed checkpoint {} of job {}.", completedCheckpoint.getCheckpointID(), job, t);
}
}
});
@@ -857,7 +862,7 @@ public class CheckpointCoordinator {
// the 'min delay between checkpoints'
lastCheckpointCompletionNanos = System.nanoTime();
- LOG.info("Completed checkpoint {} ({} bytes in {} ms).", checkpointId,
+ LOG.info("Completed checkpoint {} for job {} ({} bytes in {} ms).", checkpointId, job,
completedCheckpoint.getStateSize(), completedCheckpoint.getDuration());
if (LOG.isDebugEnabled()) {
@@ -1007,7 +1012,7 @@ public class CheckpointCoordinator {
completedCheckpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
}
- LOG.debug("Status of the shared state registry after restore: {}.", sharedStateRegistry);
+ LOG.debug("Status of the shared state registry of job {} after restore: {}.", job, sharedStateRegistry);
// Restore from the latest checkpoint
CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint();
@@ -1020,7 +1025,7 @@ public class CheckpointCoordinator {
}
}
- LOG.info("Restoring from latest valid checkpoint: {}.", latest);
+ LOG.info("Restoring job {} from latest valid checkpoint: {}.", job, latest);
// re-assign the task states
final Map<OperatorID, OperatorState> operatorStates = latest.getOperatorStates();
@@ -1076,8 +1081,8 @@ public class CheckpointCoordinator {
Preconditions.checkNotNull(savepointPointer, "The savepoint path cannot be null.");
- LOG.info("Starting job from savepoint {} ({})",
- savepointPointer, (allowNonRestored ? "allowing non restored state" : ""));
+ LOG.info("Starting job {} from savepoint {} ({})",
+ job, savepointPointer, (allowNonRestored ? "allowing non restored state" : ""));
final CompletedCheckpointStorageLocation checkpointLocation = checkpointStorage.resolveCheckpoint(savepointPointer);
@@ -1091,7 +1096,7 @@ public class CheckpointCoordinator {
long nextCheckpointId = savepoint.getCheckpointID() + 1;
checkpointIdCounter.setCount(nextCheckpointId);
- LOG.info("Reset the checkpoint ID to {}.", nextCheckpointId);
+ LOG.info("Reset the checkpoint ID of job {} to {}.", job, nextCheckpointId);
return restoreLatestCheckpointedState(tasks, true, allowNonRestored);
}
@@ -1214,7 +1219,7 @@ public class CheckpointCoordinator {
triggerCheckpoint(System.currentTimeMillis(), true);
}
catch (Exception e) {
- LOG.error("Exception while triggering checkpoint.", e);
+ LOG.error("Exception while triggering checkpoint for job {}.", job, e);
}
}
}
@@ -1233,7 +1238,7 @@ public class CheckpointCoordinator {
final String reason = (cause != null) ? cause.getMessage() : "";
- LOG.info("Discarding checkpoint {} because: {}", checkpointId, reason);
+ LOG.info("Discarding checkpoint {} of job {} because: {}", checkpointId, job, reason);
pendingCheckpoint.abortDeclined();
rememberRecentCheckpointId(checkpointId);
[4/9] flink git commit: [FLINK-8661] Replace Collections.EMPTY_MAP
with Collections.emptyMap()
Posted by ch...@apache.org.
[FLINK-8661] Replace Collections.EMPTY_MAP with Collections.emptyMap()
This closes #5864.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7db83b6b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7db83b6b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7db83b6b
Branch: refs/heads/release-1.5
Commit: 7db83b6b5f231095b6ff0adb092740c2b316223d
Parents: 5fed117
Author: zhangminglei <zm...@163.com>
Authored: Wed Apr 18 09:24:57 2018 +0800
Committer: zentol <ch...@apache.org>
Committed: Thu Apr 19 10:01:00 2018 +0200
----------------------------------------------------------------------
.../runtime/clusterframework/types/ResourceProfile.java | 2 +-
.../runtime/clusterframework/types/ResourceProfileTest.java | 8 ++++----
.../flink/runtime/rest/messages/JobAccumulatorsInfoTest.java | 2 +-
.../src/test/java/org/apache/flink/yarn/UtilsTest.java | 4 ++--
4 files changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7db83b6b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
index 8fbaed1..a89b9f9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -106,7 +106,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
* @param heapMemoryInMB The size of the heap memory, in megabytes.
*/
public ResourceProfile(double cpuCores, int heapMemoryInMB) {
- this(cpuCores, heapMemoryInMB, 0, 0, 0, Collections.EMPTY_MAP);
+ this(cpuCores, heapMemoryInMB, 0, 0, 0, Collections.emptyMap());
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/7db83b6b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
index 7ed688a..6f54d7f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
@@ -31,10 +31,10 @@ public class ResourceProfileTest {
@Test
public void testMatchRequirement() throws Exception {
- ResourceProfile rp1 = new ResourceProfile(1.0, 100, 100, 100, 0, Collections.EMPTY_MAP);
- ResourceProfile rp2 = new ResourceProfile(1.0, 200, 200, 200, 0, Collections.EMPTY_MAP);
- ResourceProfile rp3 = new ResourceProfile(2.0, 100, 100, 100, 0, Collections.EMPTY_MAP);
- ResourceProfile rp4 = new ResourceProfile(2.0, 200, 200, 200, 0, Collections.EMPTY_MAP);
+ ResourceProfile rp1 = new ResourceProfile(1.0, 100, 100, 100, 0, Collections.emptyMap());
+ ResourceProfile rp2 = new ResourceProfile(1.0, 200, 200, 200, 0, Collections.emptyMap());
+ ResourceProfile rp3 = new ResourceProfile(2.0, 100, 100, 100, 0, Collections.emptyMap());
+ ResourceProfile rp4 = new ResourceProfile(2.0, 200, 200, 200, 0, Collections.emptyMap());
assertFalse(rp1.isMatching(rp2));
assertTrue(rp2.isMatching(rp1));
http://git-wip-us.apache.org/repos/asf/flink/blob/7db83b6b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfoTest.java
index e0e9649..856d855 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfoTest.java
@@ -47,6 +47,6 @@ public class JobAccumulatorsInfoTest extends RestResponseMarshallingTestBase<Job
"uta3.type",
"uta3.value"));
- return new JobAccumulatorsInfo(Collections.emptyList(), userAccumulatorList, Collections.EMPTY_MAP);
+ return new JobAccumulatorsInfo(Collections.emptyList(), userAccumulatorList, Collections.emptyMap());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7db83b6b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
index 578e8e2..b7a38b0 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -201,7 +201,7 @@ public class UtilsTest extends TestLogger {
expectMsgClass(deadline.timeLeft(), RegisterResourceManager.class);
- resourceManagerGateway.tell(new RegisterResourceManagerSuccessful(leader1, Collections.EMPTY_LIST));
+ resourceManagerGateway.tell(new RegisterResourceManagerSuccessful(leader1, Collections.emptyList()));
for (int i = 0; i < containerList.size(); i++) {
expectMsgClass(deadline.timeLeft(), Acknowledge.class);
@@ -217,7 +217,7 @@ public class UtilsTest extends TestLogger {
expectMsgClass(deadline.timeLeft(), RegisterResourceManager.class);
- resourceManagerGateway.tell(new RegisterResourceManagerSuccessful(leader1, Collections.EMPTY_LIST));
+ resourceManagerGateway.tell(new RegisterResourceManagerSuccessful(leader1, Collections.emptyList()));
for (Container container: containerList) {
resourceManagerGateway.tell(
[2/9] flink git commit: [FLINK-8703][tests] Expose WebUI port
Posted by ch...@apache.org.
[FLINK-8703][tests] Expose WebUI port
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0f39ffd3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0f39ffd3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0f39ffd3
Branch: refs/heads/release-1.5
Commit: 0f39ffd308c88d791b2dfc1f3fd57e415b3ecfeb
Parents: c53d59f
Author: zentol <ch...@apache.org>
Authored: Wed Mar 7 11:14:20 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Thu Apr 19 10:01:00 2018 +0200
----------------------------------------------------------------------
.../org/apache/flink/test/util/MiniClusterResource.java | 12 ++++++++++++
1 file changed, 12 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0f39ffd3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
index 324c9ee..160c1d1 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -75,6 +75,8 @@ public class MiniClusterResource extends ExternalResource {
private TestEnvironment executionEnvironment;
+ private int webUIPort = -1;
+
public MiniClusterResource(final MiniClusterResourceConfiguration miniClusterResourceConfiguration) {
this(miniClusterResourceConfiguration, false);
}
@@ -129,6 +131,10 @@ public class MiniClusterResource extends ExternalResource {
return executionEnvironment;
}
+ public int getWebUIPort() {
+ return webUIPort;
+ }
+
@Override
public void before() throws Exception {
@@ -205,6 +211,10 @@ public class MiniClusterResource extends ExternalResource {
Configuration restClientConfig = new Configuration();
restClientConfig.setInteger(JobManagerOptions.PORT, flinkMiniCluster.getLeaderRPCPort());
this.restClusterClientConfig = new UnmodifiableConfiguration(restClientConfig);
+
+ if (flinkMiniCluster.webMonitor().isDefined()) {
+ webUIPort = flinkMiniCluster.webMonitor().get().getServerPort();
+ }
}
private void startMiniCluster() throws Exception {
@@ -244,6 +254,8 @@ public class MiniClusterResource extends ExternalResource {
restClientConfig.setString(JobManagerOptions.ADDRESS, miniCluster.getRestAddress().getHost());
restClientConfig.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort());
this.restClusterClientConfig = new UnmodifiableConfiguration(restClientConfig);
+
+ webUIPort = miniCluster.getRestAddress().getPort();
}
/**
[3/9] flink git commit: [FLINK-9199][REST] Fix URLs and remove
subtask index parameter
Posted by ch...@apache.org.
[FLINK-9199][REST] Fix URLs and remove subtask index parameter
This closes #5865.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c53d59f1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c53d59f1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c53d59f1
Branch: refs/heads/release-1.5
Commit: c53d59f14f6abe5ba7b3355be99dd3f8785cc042
Parents: 7db83b6
Author: Rong Rong <ro...@uber.com>
Authored: Tue Apr 17 23:44:02 2018 -0700
Committer: zentol <ch...@apache.org>
Committed: Thu Apr 19 10:01:00 2018 +0200
----------------------------------------------------------------------
docs/_includes/generated/rest_dispatcher.html | 181 ++++++++++++++++++-
...taskExecutionAttemptAccumulatorsHeaders.java | 2 +-
.../SubtaskExecutionAttemptDetailsHeaders.java | 2 +-
.../AggregatedSubtaskMetricsHeaders.java | 2 +-
.../AggregatedSubtaskMetricsParameters.java | 5 +-
5 files changed, 175 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c53d59f1/docs/_includes/generated/rest_dispatcher.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/rest_dispatcher.html b/docs/_includes/generated/rest_dispatcher.html
index 5889f88..8ac36e1 100644
--- a/docs/_includes/generated/rest_dispatcher.html
+++ b/docs/_includes/generated/rest_dispatcher.html
@@ -311,6 +311,56 @@
<table class="table table-bordered">
<tbody>
<tr>
+ <td class="text-left" colspan="2"><strong>/jobs/metrics</strong></td>
+ </tr>
+ <tr>
+ <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
+ <td class="text-left">Response code: <code>200 OK</code></td>
+ </tr>
+ <tr>
+ <td colspan="2">description</td>
+ </tr>
+ <tr>
+ <td colspan="2">Query parameters</td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <ul>
+<li><code>get</code> (optional): description</li>
+<li><code>agg</code> (optional): description</li>
+<li><code>jobs</code> (optional): description</li>
+ </ul>
+ </td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <button data-toggle="collapse" data-target="#-790902571">Request</button>
+ <div id="-790902571" class="collapse">
+ <pre>
+ <code>
+{} </code>
+ </pre>
+ </div>
+ </td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <button data-toggle="collapse" data-target="#128306770">Response</button>
+ <div id="128306770" class="collapse">
+ <pre>
+ <code>
+{
+ "type" : "any"
+} </code>
+ </pre>
+ </div>
+ </td>
+ </tr>
+ </tbody>
+</table>
+<table class="table table-bordered">
+ <tbody>
+ <tr>
<td class="text-left" colspan="2"><strong>/jobs/overview</strong></td>
</tr>
<tr>
@@ -2208,6 +2258,67 @@
<table class="table table-bordered">
<tbody>
<tr>
+ <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/subtasks/metrics</strong></td>
+ </tr>
+ <tr>
+ <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
+ <td class="text-left">Response code: <code>200 OK</code></td>
+ </tr>
+ <tr>
+ <td colspan="2">description</td>
+ </tr>
+ <tr>
+ <td colspan="2">Path parameters</td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <ul>
+<li><code>jobid</code> - description</li>
+<li><code>vertexid</code> - description</li>
+ </ul>
+ </td>
+ </tr>
+ <tr>
+ <td colspan="2">Query parameters</td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <ul>
+<li><code>get</code> (optional): description</li>
+<li><code>agg</code> (optional): description</li>
+<li><code>subtasks</code> (optional): description</li>
+ </ul>
+ </td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <button data-toggle="collapse" data-target="#265134755">Request</button>
+ <div id="265134755" class="collapse">
+ <pre>
+ <code>
+{} </code>
+ </pre>
+ </div>
+ </td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <button data-toggle="collapse" data-target="#1184344096">Response</button>
+ <div id="1184344096" class="collapse">
+ <pre>
+ <code>
+{
+ "type" : "any"
+} </code>
+ </pre>
+ </div>
+ </td>
+ </tr>
+ </tbody>
+</table>
+<table class="table table-bordered">
+ <tbody>
+ <tr>
<td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskindex</strong></td>
</tr>
<tr>
@@ -2313,7 +2424,7 @@
<table class="table table-bordered">
<tbody>
<tr>
- <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskindex/attempts/attempt</strong></td>
+ <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskindex/attempts/:attempt</strong></td>
</tr>
<tr>
<td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
@@ -2337,8 +2448,8 @@
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#1633948804">Request</button>
- <div id="1633948804" class="collapse">
+ <button data-toggle="collapse" data-target="#168850740">Request</button>
+ <div id="168850740" class="collapse">
<pre>
<code>
{} </code>
@@ -2348,8 +2459,8 @@
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#-384785209">Response</button>
- <div id="-384785209" class="collapse">
+ <button data-toggle="collapse" data-target="#-1849883273">Response</button>
+ <div id="-1849883273" class="collapse">
<pre>
<code>
{
@@ -2419,7 +2530,7 @@
<table class="table table-bordered">
<tbody>
<tr>
- <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskindex/attempts/attempt/accumulators</strong></td>
+ <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskindex/attempts/:attempt/accumulators</strong></td>
</tr>
<tr>
<td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
@@ -2443,8 +2554,8 @@
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#1869002000">Request</button>
- <div id="1869002000" class="collapse">
+ <button data-toggle="collapse" data-target="#-1303317920">Request</button>
+ <div id="-1303317920" class="collapse">
<pre>
<code>
{} </code>
@@ -2454,8 +2565,8 @@
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#1717045676">Response</button>
- <div id="1717045676" class="collapse">
+ <button data-toggle="collapse" data-target="#-1455274244">Response</button>
+ <div id="-1455274244" class="collapse">
<pre>
<code>
{
@@ -3038,6 +3149,56 @@
<table class="table table-bordered">
<tbody>
<tr>
+ <td class="text-left" colspan="2"><strong>/taskmanagers/metrics</strong></td>
+ </tr>
+ <tr>
+ <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
+ <td class="text-left">Response code: <code>200 OK</code></td>
+ </tr>
+ <tr>
+ <td colspan="2">description</td>
+ </tr>
+ <tr>
+ <td colspan="2">Query parameters</td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <ul>
+<li><code>get</code> (optional): description</li>
+<li><code>agg</code> (optional): description</li>
+<li><code>taskmanagers</code> (optional): description</li>
+ </ul>
+ </td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <button data-toggle="collapse" data-target="#521351018">Request</button>
+ <div id="521351018" class="collapse">
+ <pre>
+ <code>
+{} </code>
+ </pre>
+ </div>
+ </td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <button data-toggle="collapse" data-target="#1440560359">Response</button>
+ <div id="1440560359" class="collapse">
+ <pre>
+ <code>
+{
+ "type" : "any"
+} </code>
+ </pre>
+ </div>
+ </td>
+ </tr>
+ </tbody>
+</table>
+<table class="table table-bordered">
+ <tbody>
+ <tr>
<td class="text-left" colspan="2"><strong>/taskmanagers/:taskmanagerid</strong></td>
</tr>
<tr>
http://git-wip-us.apache.org/repos/asf/flink/blob/c53d59f1/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptAccumulatorsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptAccumulatorsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptAccumulatorsHeaders.java
index 662e87c..5cc159c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptAccumulatorsHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptAccumulatorsHeaders.java
@@ -36,7 +36,7 @@ public class SubtaskExecutionAttemptAccumulatorsHeaders implements MessageHeader
private static final SubtaskExecutionAttemptAccumulatorsHeaders INSTANCE = new SubtaskExecutionAttemptAccumulatorsHeaders();
public static final String URL = String.format(
- "/jobs/:%s/vertices/:%s/subtasks/:%s/attempts/%s/accumulators",
+ "/jobs/:%s/vertices/:%s/subtasks/:%s/attempts/:%s/accumulators",
JobIDPathParameter.KEY,
JobVertexIdPathParameter.KEY,
SubtaskIndexPathParameter.KEY,
http://git-wip-us.apache.org/repos/asf/flink/blob/c53d59f1/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsHeaders.java
index aa65007..6f8eb21 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsHeaders.java
@@ -36,7 +36,7 @@ public class SubtaskExecutionAttemptDetailsHeaders implements MessageHeaders<Emp
private static final SubtaskExecutionAttemptDetailsHeaders INSTANCE = new SubtaskExecutionAttemptDetailsHeaders();
public static final String URL = String.format(
- "/jobs/:%s/vertices/:%s/subtasks/:%s/attempts/%s",
+ "/jobs/:%s/vertices/:%s/subtasks/:%s/attempts/:%s",
JobIDPathParameter.KEY,
JobVertexIdPathParameter.KEY,
SubtaskIndexPathParameter.KEY,
http://git-wip-us.apache.org/repos/asf/flink/blob/c53d59f1/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsHeaders.java
index e1d0790..bfeab5f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsHeaders.java
@@ -38,7 +38,7 @@ public class AggregatedSubtaskMetricsHeaders extends AbstractAggregatedMetricsHe
@Override
public String getTargetRestEndpointURL() {
- return "/jobs/" + JobIDPathParameter.KEY + "/vertices/" + JobVertexIdPathParameter.KEY + "/subtasks/metrics";
+ return "/jobs/:" + JobIDPathParameter.KEY + "/vertices/:" + JobVertexIdPathParameter.KEY + "/subtasks/metrics";
}
public static AggregatedSubtaskMetricsHeaders getInstance() {
http://git-wip-us.apache.org/repos/asf/flink/blob/c53d59f1/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsParameters.java
index 34e1b52..f3969b3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsParameters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsParameters.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.rest.messages.job.metrics;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
import org.apache.flink.runtime.rest.messages.MessagePathParameter;
-import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
import java.util.Arrays;
import java.util.Collection;
@@ -34,7 +33,6 @@ public class AggregatedSubtaskMetricsParameters extends AbstractAggregatedMetric
private final JobIDPathParameter jobId = new JobIDPathParameter();
private final JobVertexIdPathParameter vertexId = new JobVertexIdPathParameter();
- private final SubtaskIndexPathParameter subtaskIndex = new SubtaskIndexPathParameter();
public AggregatedSubtaskMetricsParameters() {
super(new SubtasksFilterQueryParameter());
@@ -44,8 +42,7 @@ public class AggregatedSubtaskMetricsParameters extends AbstractAggregatedMetric
public Collection<MessagePathParameter<?>> getPathParameters() {
return Collections.unmodifiableCollection(Arrays.asList(
jobId,
- vertexId,
- subtaskIndex
+ vertexId
));
}
}
[7/9] flink git commit: [FLINK-8704][tests] Port ClassLoaderITCase to
flip6
Posted by ch...@apache.org.
[FLINK-8704][tests] Port ClassLoaderITCase to flip6
This closes #5780.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/390db6f3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/390db6f3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/390db6f3
Branch: refs/heads/release-1.5
Commit: 390db6f39cc60e9965394a178b4b69efeba79c51
Parents: c0bba03
Author: zentol <ch...@apache.org>
Authored: Thu Apr 5 11:49:48 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Apr 19 10:01:00 2018 +0200
----------------------------------------------------------------------
.../test/classloading/ClassLoaderITCase.java | 159 ++++----
.../classloading/LegacyClassLoaderITCase.java | 399 +++++++++++++++++++
2 files changed, 472 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/390db6f3/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index b357904..089ade4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -19,35 +19,35 @@
package org.apache.flink.test.classloading;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.MiniClusterClient;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.client.JobCancellationException;
import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
-import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepointFailure;
-import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
-import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
-import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.testdata.KMeansData;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.test.util.TestEnvironment;
+import org.apache.flink.testutils.category.New;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
+import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
-import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
@@ -56,25 +56,22 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.net.URL;
+import java.util.Collection;
import java.util.Collections;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
-import scala.Option;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
-import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.isA;
-import static org.hamcrest.Matchers.hasProperty;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
/**
* Test job classloader.
*/
+@Category(New.class)
public class ClassLoaderITCase extends TestLogger {
private static final Logger LOG = LoggerFactory.getLogger(ClassLoaderITCase.class);
@@ -95,22 +92,21 @@ public class ClassLoaderITCase extends TestLogger {
private static final String CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH = "checkpointing_custom_kv_state-test-jar.jar";
- @ClassRule
- public static final TemporaryFolder FOLDER = new TemporaryFolder();
+
+ private static final TemporaryFolder FOLDER = new TemporaryFolder();
@Rule
public ExpectedException expectedException = ExpectedException.none();
- private static TestingCluster testCluster;
+ private static MiniCluster testCluster;
- private static int parallelism;
+ private static final int parallelism = 4;
@BeforeClass
public static void setUp() throws Exception {
+ FOLDER.create();
+
Configuration config = new Configuration();
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
- parallelism = 4;
// we need to use the "filesystem" state backend to ensure FLINK-2543 is not happening again.
config.setString(CheckpointingOptions.STATE_BACKEND, "filesystem");
@@ -121,16 +117,29 @@ public class ClassLoaderITCase extends TestLogger {
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY,
FOLDER.newFolder().getAbsoluteFile().toURI().toString());
- testCluster = new TestingCluster(config, false);
+ // required as we otherwise run out of memory
+ config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 80);
+
+ testCluster = new MiniCluster(
+ new MiniClusterConfiguration.Builder()
+ .setNumTaskManagers(2)
+ .setNumSlotsPerTaskManager(2)
+ .setConfiguration(config)
+ .build()
+ );
testCluster.start();
}
@AfterClass
- public static void tearDown() throws Exception {
+ public static void tearDownClass() throws Exception {
if (testCluster != null) {
- testCluster.stop();
+ testCluster.close();
}
+ FOLDER.delete();
+ }
+ @After
+ public void tearDown() throws Exception {
TestStreamEnvironment.unsetAsContext();
TestEnvironment.unsetAsContext();
}
@@ -202,15 +211,27 @@ public class ClassLoaderITCase extends TestLogger {
Collections.singleton(new Path(STREAMING_CHECKPOINTED_PROG_JAR_FILE)),
Collections.<URL>emptyList());
- // Program should terminate with a 'SuccessException':
- // we can not access the SuccessException here when executing the tests with maven, because its not available in the jar.
- expectedException.expectCause(
- Matchers.<Throwable>hasProperty("cause",
- hasProperty("class",
- hasProperty("canonicalName", equalTo(
- "org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException")))));
-
- streamingCheckpointedProg.invokeInteractiveModeForExecution();
+ try {
+ streamingCheckpointedProg.invokeInteractiveModeForExecution();
+ } catch (Exception e) {
+ // Program should terminate with a 'SuccessException':
+ // the exception class is contained in the user-jar, but is not present on the maven classpath
+ // the deserialization of the exception should thus fail here
+ try {
+ Optional<Throwable> exception = ExceptionUtils.findThrowable(e,
+ candidate -> candidate.getClass().getCanonicalName().equals("org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException"));
+
+ // if we reach this point we either failed due to another exception,
+ // or the deserialization of the user-exception did not fail
+ if (!exception.isPresent()) {
+ throw e;
+ } else {
+ Assert.fail("Deserialization of user exception should have failed.");
+ }
+ } catch (NoClassDefFoundError expected) {
+ // expected
+ }
+ }
}
@Test
@@ -234,15 +255,7 @@ public class ClassLoaderITCase extends TestLogger {
@Test
public void testUserCodeTypeJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
- int port = testCluster.getLeaderRPCPort();
-
- // test FLINK-3633
- final PackagedProgram userCodeTypeProg = new PackagedProgram(
- new File(USERCODETYPE_JAR_PATH),
- new String[] { USERCODETYPE_JAR_PATH,
- "localhost",
- String.valueOf(port),
- });
+ PackagedProgram userCodeTypeProg = new PackagedProgram(new File(USERCODETYPE_JAR_PATH));
TestEnvironment.setAsContext(
testCluster,
@@ -282,6 +295,8 @@ public class ClassLoaderITCase extends TestLogger {
*/
@Test
public void testDisposeSavepointWithCustomKvState() throws Exception {
+ ClusterClient<?> clusterClient = new MiniClusterClient(new Configuration(), testCluster);
+
Deadline deadline = new FiniteDuration(100, TimeUnit.SECONDS).fromNow();
File checkpointDir = FOLDER.newFolder();
@@ -324,19 +339,18 @@ public class ClassLoaderITCase extends TestLogger {
// The job ID
JobID jobId = null;
- ActorGateway jm = testCluster.getLeaderGateway(deadline.timeLeft());
-
LOG.info("Waiting for job status running.");
// Wait for running job
while (jobId == null && deadline.hasTimeLeft()) {
- Future<Object> jobsFuture = jm.ask(JobManagerMessages.getRequestRunningJobsStatus(), deadline.timeLeft());
- RunningJobsStatus runningJobs = (RunningJobsStatus) Await.result(jobsFuture, deadline.timeLeft());
- for (JobStatusMessage runningJob : runningJobs.getStatusMessages()) {
- jobId = runningJob.getJobId();
- LOG.info("Job running. ID: " + jobId);
- break;
+ Collection<JobStatusMessage> jobs = clusterClient.listJobs().get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ for (JobStatusMessage job : jobs) {
+ if (job.getJobState() == JobStatus.RUNNING) {
+ jobId = job.getJobId();
+ LOG.info("Job running. ID: " + jobId);
+ break;
+ }
}
// Retry if job is not available yet
@@ -345,52 +359,25 @@ public class ClassLoaderITCase extends TestLogger {
}
}
- LOG.info("Wait for all tasks to be running.");
- Future<Object> allRunning = jm.ask(new WaitForAllVerticesToBeRunning(jobId), deadline.timeLeft());
- Await.ready(allRunning, deadline.timeLeft());
- LOG.info("All tasks are running.");
-
// Trigger savepoint
String savepointPath = null;
for (int i = 0; i < 20; i++) {
LOG.info("Triggering savepoint (" + (i + 1) + "/20).");
- Future<Object> savepointFuture = jm.ask(new TriggerSavepoint(jobId, Option.<String>empty()), deadline.timeLeft());
-
- Object savepointResponse = Await.result(savepointFuture, deadline.timeLeft());
-
- if (savepointResponse.getClass() == TriggerSavepointSuccess.class) {
- savepointPath = ((TriggerSavepointSuccess) savepointResponse).savepointPath();
- LOG.info("Triggered savepoint. Path: " + savepointPath);
- } else if (savepointResponse.getClass() == JobManagerMessages.TriggerSavepointFailure.class) {
- Throwable cause = ((JobManagerMessages.TriggerSavepointFailure) savepointResponse).cause();
+ try {
+ savepointPath = clusterClient.triggerSavepoint(jobId, null)
+ .get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ } catch (Exception cause) {
LOG.info("Failed to trigger savepoint. Retrying...", cause);
// This can fail if the operators are not opened yet
Thread.sleep(500);
- } else {
- throw new IllegalStateException("Unexpected response to TriggerSavepoint");
}
}
assertNotNull("Failed to trigger savepoint", savepointPath);
- // Dispose savepoint
- LOG.info("Disposing savepoint at " + savepointPath);
- Future<Object> disposeFuture = jm.ask(new DisposeSavepoint(savepointPath), deadline.timeLeft());
- Object disposeResponse = Await.result(disposeFuture, deadline.timeLeft());
-
- if (disposeResponse.getClass() == JobManagerMessages.getDisposeSavepointSuccess().getClass()) {
- // Success :-)
- LOG.info("Disposed savepoint at " + savepointPath);
- } else if (disposeResponse instanceof DisposeSavepointFailure) {
- throw new IllegalStateException("Failed to dispose savepoint " + disposeResponse);
- } else {
- throw new IllegalStateException("Unexpected response to DisposeSavepoint");
- }
+ clusterClient.disposeSavepoint(savepointPath).get();
- // Cancel job, wait for success
- Future<?> cancelFuture = jm.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft());
- Object response = Await.result(cancelFuture, deadline.timeLeft());
- assertTrue("Unexpected response: " + response, response instanceof JobManagerMessages.CancellationSuccess);
+ clusterClient.cancel(jobId);
// make sure, the execution is finished to not influence other test methods
invokeThread.join(deadline.timeLeft().toMillis());
http://git-wip-us.apache.org/repos/asf/flink/blob/390db6f3/flink-tests/src/test/java/org/apache/flink/test/classloading/LegacyClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/LegacyClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/LegacyClassLoaderITCase.java
new file mode 100644
index 0000000..fa114e1
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/LegacyClassLoaderITCase.java
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.classloading;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.client.JobCancellationException;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
+import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepointFailure;
+import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
+import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
+import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.test.testdata.KMeansData;
+import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.test.util.TestEnvironment;
+import org.apache.flink.util.TestLogger;
+
+import org.hamcrest.Matchers;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.isA;
+import static org.hamcrest.Matchers.hasProperty;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test job classloader.
+ */
+public class LegacyClassLoaderITCase extends TestLogger {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LegacyClassLoaderITCase.class);
+
+ private static final String INPUT_SPLITS_PROG_JAR_FILE = "customsplit-test-jar.jar";
+
+ private static final String STREAMING_INPUT_SPLITS_PROG_JAR_FILE = "streaming-customsplit-test-jar.jar";
+
+ private static final String STREAMING_PROG_JAR_FILE = "streamingclassloader-test-jar.jar";
+
+ private static final String STREAMING_CHECKPOINTED_PROG_JAR_FILE = "streaming-checkpointed-classloader-test-jar.jar";
+
+ private static final String KMEANS_JAR_PATH = "kmeans-test-jar.jar";
+
+ private static final String USERCODETYPE_JAR_PATH = "usercodetype-test-jar.jar";
+
+ private static final String CUSTOM_KV_STATE_JAR_PATH = "custom_kv_state-test-jar.jar";
+
+ private static final String CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH = "checkpointing_custom_kv_state-test-jar.jar";
+
+ @ClassRule
+ public static final TemporaryFolder FOLDER = new TemporaryFolder();
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ private static TestingCluster testCluster;
+
+ private static int parallelism;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ Configuration config = new Configuration();
+ config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+ parallelism = 4;
+
+ // we need to use the "filesystem" state backend to ensure FLINK-2543 is not happening again.
+ config.setString(CheckpointingOptions.STATE_BACKEND, "filesystem");
+ config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY,
+ FOLDER.newFolder().getAbsoluteFile().toURI().toString());
+
+ // Savepoint path
+ config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY,
+ FOLDER.newFolder().getAbsoluteFile().toURI().toString());
+
+ testCluster = new TestingCluster(config, false);
+ testCluster.start();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ if (testCluster != null) {
+ testCluster.stop();
+ }
+
+ TestStreamEnvironment.unsetAsContext();
+ TestEnvironment.unsetAsContext();
+ }
+
+ @Test
+ public void testCustomSplitJobWithCustomClassLoaderJar() throws IOException, ProgramInvocationException {
+
+ PackagedProgram inputSplitTestProg = new PackagedProgram(new File(INPUT_SPLITS_PROG_JAR_FILE));
+
+ TestEnvironment.setAsContext(
+ testCluster,
+ parallelism,
+ Collections.singleton(new Path(INPUT_SPLITS_PROG_JAR_FILE)),
+ Collections.<URL>emptyList());
+
+ inputSplitTestProg.invokeInteractiveModeForExecution();
+ }
+
+ @Test
+ public void testStreamingCustomSplitJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
+ PackagedProgram streamingInputSplitTestProg = new PackagedProgram(new File(STREAMING_INPUT_SPLITS_PROG_JAR_FILE));
+
+ TestStreamEnvironment.setAsContext(
+ testCluster,
+ parallelism,
+ Collections.singleton(new Path(STREAMING_INPUT_SPLITS_PROG_JAR_FILE)),
+ Collections.<URL>emptyList());
+
+ streamingInputSplitTestProg.invokeInteractiveModeForExecution();
+ }
+
+ @Test
+ public void testCustomSplitJobWithCustomClassLoaderPath() throws IOException, ProgramInvocationException {
+ URL classpath = new File(INPUT_SPLITS_PROG_JAR_FILE).toURI().toURL();
+ PackagedProgram inputSplitTestProg2 = new PackagedProgram(new File(INPUT_SPLITS_PROG_JAR_FILE));
+
+ TestEnvironment.setAsContext(
+ testCluster,
+ parallelism,
+ Collections.<Path>emptyList(),
+ Collections.singleton(classpath));
+
+ inputSplitTestProg2.invokeInteractiveModeForExecution();
+ }
+
+ @Test
+ public void testStreamingClassloaderJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
+ // regular streaming job
+ PackagedProgram streamingProg = new PackagedProgram(new File(STREAMING_PROG_JAR_FILE));
+
+ TestStreamEnvironment.setAsContext(
+ testCluster,
+ parallelism,
+ Collections.singleton(new Path(STREAMING_PROG_JAR_FILE)),
+ Collections.<URL>emptyList());
+
+ streamingProg.invokeInteractiveModeForExecution();
+ }
+
+ @Test
+ public void testCheckpointedStreamingClassloaderJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
+ // checkpointed streaming job with custom classes for the checkpoint (FLINK-2543)
+ // the test also ensures that user specific exceptions are serializable between JobManager <--> JobClient.
+ PackagedProgram streamingCheckpointedProg = new PackagedProgram(new File(STREAMING_CHECKPOINTED_PROG_JAR_FILE));
+
+ TestStreamEnvironment.setAsContext(
+ testCluster,
+ parallelism,
+ Collections.singleton(new Path(STREAMING_CHECKPOINTED_PROG_JAR_FILE)),
+ Collections.<URL>emptyList());
+
+ // Program should terminate with a 'SuccessException':
+ // we can not access the SuccessException here when executing the tests with maven, because its not available in the jar.
+ expectedException.expectCause(
+ Matchers.<Throwable>hasProperty("cause",
+ hasProperty("class",
+ hasProperty("canonicalName", equalTo(
+ "org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException")))));
+
+ streamingCheckpointedProg.invokeInteractiveModeForExecution();
+ }
+
+ @Test
+ public void testKMeansJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
+ PackagedProgram kMeansProg = new PackagedProgram(
+ new File(KMEANS_JAR_PATH),
+ new String[] {
+ KMeansData.DATAPOINTS,
+ KMeansData.INITIAL_CENTERS,
+ "25"
+ });
+
+ TestEnvironment.setAsContext(
+ testCluster,
+ parallelism,
+ Collections.singleton(new Path(KMEANS_JAR_PATH)),
+ Collections.<URL>emptyList());
+
+ kMeansProg.invokeInteractiveModeForExecution();
+ }
+
+ @Test
+ public void testUserCodeTypeJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
+ int port = testCluster.getLeaderRPCPort();
+
+ // test FLINK-3633
+ final PackagedProgram userCodeTypeProg = new PackagedProgram(
+ new File(USERCODETYPE_JAR_PATH),
+ new String[] { USERCODETYPE_JAR_PATH,
+ "localhost",
+ String.valueOf(port),
+ });
+
+ TestEnvironment.setAsContext(
+ testCluster,
+ parallelism,
+ Collections.singleton(new Path(USERCODETYPE_JAR_PATH)),
+ Collections.<URL>emptyList());
+
+ userCodeTypeProg.invokeInteractiveModeForExecution();
+ }
+
+ @Test
+ public void testCheckpointingCustomKvStateJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
+ File checkpointDir = FOLDER.newFolder();
+ File outputDir = FOLDER.newFolder();
+
+ final PackagedProgram program = new PackagedProgram(
+ new File(CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH),
+ new String[] {
+ checkpointDir.toURI().toString(),
+ outputDir.toURI().toString()
+ });
+
+ TestStreamEnvironment.setAsContext(
+ testCluster,
+ parallelism,
+ Collections.singleton(new Path(CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH)),
+ Collections.<URL>emptyList());
+
+ expectedException.expectCause(
+ Matchers.<Throwable>hasProperty("cause", isA(SuccessException.class)));
+
+ program.invokeInteractiveModeForExecution();
+ }
+
+ /**
+ * Tests disposal of a savepoint, which contains custom user code KvState.
+ */
+ @Test
+ public void testDisposeSavepointWithCustomKvState() throws Exception {
+ Deadline deadline = new FiniteDuration(100, TimeUnit.SECONDS).fromNow();
+
+ File checkpointDir = FOLDER.newFolder();
+ File outputDir = FOLDER.newFolder();
+
+ final PackagedProgram program = new PackagedProgram(
+ new File(CUSTOM_KV_STATE_JAR_PATH),
+ new String[] {
+ String.valueOf(parallelism),
+ checkpointDir.toURI().toString(),
+ "5000",
+ outputDir.toURI().toString()
+ });
+
+ TestStreamEnvironment.setAsContext(
+ testCluster,
+ parallelism,
+ Collections.singleton(new Path(CUSTOM_KV_STATE_JAR_PATH)),
+ Collections.<URL>emptyList()
+ );
+
+ // Execute detached
+ Thread invokeThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ program.invokeInteractiveModeForExecution();
+ } catch (ProgramInvocationException ignored) {
+ if (ignored.getCause() == null ||
+ !(ignored.getCause() instanceof JobCancellationException)) {
+ ignored.printStackTrace();
+ }
+ }
+ }
+ });
+
+ LOG.info("Starting program invoke thread");
+ invokeThread.start();
+
+ // The job ID
+ JobID jobId = null;
+
+ ActorGateway jm = testCluster.getLeaderGateway(deadline.timeLeft());
+
+ LOG.info("Waiting for job status running.");
+
+ // Wait for running job
+ while (jobId == null && deadline.hasTimeLeft()) {
+ Future<Object> jobsFuture = jm.ask(JobManagerMessages.getRequestRunningJobsStatus(), deadline.timeLeft());
+ RunningJobsStatus runningJobs = (RunningJobsStatus) Await.result(jobsFuture, deadline.timeLeft());
+
+ for (JobStatusMessage runningJob : runningJobs.getStatusMessages()) {
+ jobId = runningJob.getJobId();
+ LOG.info("Job running. ID: " + jobId);
+ break;
+ }
+
+ // Retry if job is not available yet
+ if (jobId == null) {
+ Thread.sleep(100L);
+ }
+ }
+
+ LOG.info("Wait for all tasks to be running.");
+ Future<Object> allRunning = jm.ask(new WaitForAllVerticesToBeRunning(jobId), deadline.timeLeft());
+ Await.ready(allRunning, deadline.timeLeft());
+ LOG.info("All tasks are running.");
+
+ // Trigger savepoint
+ String savepointPath = null;
+ for (int i = 0; i < 20; i++) {
+ LOG.info("Triggering savepoint (" + (i + 1) + "/20).");
+ Future<Object> savepointFuture = jm.ask(new TriggerSavepoint(jobId, Option.<String>empty()), deadline.timeLeft());
+
+ Object savepointResponse = Await.result(savepointFuture, deadline.timeLeft());
+
+ if (savepointResponse.getClass() == TriggerSavepointSuccess.class) {
+ savepointPath = ((TriggerSavepointSuccess) savepointResponse).savepointPath();
+ LOG.info("Triggered savepoint. Path: " + savepointPath);
+ } else if (savepointResponse.getClass() == JobManagerMessages.TriggerSavepointFailure.class) {
+ Throwable cause = ((JobManagerMessages.TriggerSavepointFailure) savepointResponse).cause();
+ LOG.info("Failed to trigger savepoint. Retrying...", cause);
+ // This can fail if the operators are not opened yet
+ Thread.sleep(500);
+ } else {
+ throw new IllegalStateException("Unexpected response to TriggerSavepoint");
+ }
+ }
+
+ assertNotNull("Failed to trigger savepoint", savepointPath);
+
+ // Dispose savepoint
+ LOG.info("Disposing savepoint at " + savepointPath);
+ Future<Object> disposeFuture = jm.ask(new DisposeSavepoint(savepointPath), deadline.timeLeft());
+ Object disposeResponse = Await.result(disposeFuture, deadline.timeLeft());
+
+ if (disposeResponse.getClass() == JobManagerMessages.getDisposeSavepointSuccess().getClass()) {
+ // Success :-)
+ LOG.info("Disposed savepoint at " + savepointPath);
+ } else if (disposeResponse instanceof DisposeSavepointFailure) {
+ throw new IllegalStateException("Failed to dispose savepoint " + disposeResponse);
+ } else {
+ throw new IllegalStateException("Unexpected response to DisposeSavepoint");
+ }
+
+ // Cancel job, wait for success
+ Future<?> cancelFuture = jm.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft());
+ Object response = Await.result(cancelFuture, deadline.timeLeft());
+ assertTrue("Unexpected response: " + response, response instanceof JobManagerMessages.CancellationSuccess);
+
+ // make sure, the execution is finished to not influence other test methods
+ invokeThread.join(deadline.timeLeft().toMillis());
+ assertFalse("Program invoke thread still running", invokeThread.isAlive());
+ }
+}
[9/9] flink git commit: [FLINK-9208][tests] fix naming of
StreamNetworkThroughputBenchmarkTest
Posted by ch...@apache.org.
[FLINK-9208][tests] fix naming of StreamNetworkThroughputBenchmarkTest
This closes #5873.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ed4f4f1a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ed4f4f1a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ed4f4f1a
Branch: refs/heads/release-1.5
Commit: ed4f4f1afdff44251439b760fcc9d0f7fad7465c
Parents: 71c2ac3
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Wed Apr 18 17:04:38 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Apr 19 10:01:01 2018 +0200
----------------------------------------------------------------------
.../StreamNetworkThroughputBenchmarkTest.java | 98 ++++++++++++++++++++
.../StreamNetworkThroughputBenchmarkTests.java | 98 --------------------
2 files changed, 98 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ed4f4f1a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTest.java
new file mode 100644
index 0000000..ba8fe27
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io.benchmark;
+
+import org.junit.Test;
+
+/**
+ * Tests for various network benchmarks based on {@link StreamNetworkThroughputBenchmark}.
+ */
+public class StreamNetworkThroughputBenchmarkTest {
+ @Test
+ public void pointToPointBenchmark() throws Exception {
+ StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark();
+ benchmark.setUp(1, 1, 100);
+ try {
+ benchmark.executeBenchmark(1_000);
+ }
+ finally {
+ benchmark.tearDown();
+ }
+ }
+
+ @Test
+ public void largeLocalMode() throws Exception {
+ StreamNetworkThroughputBenchmark env = new StreamNetworkThroughputBenchmark();
+ env.setUp(4, 10, 100, true);
+ env.executeBenchmark(10_000_000);
+ env.tearDown();
+ }
+
+ @Test
+ public void largeRemoteMode() throws Exception {
+ StreamNetworkThroughputBenchmark env = new StreamNetworkThroughputBenchmark();
+ env.setUp(4, 10, 100, false);
+ env.executeBenchmark(10_000_000);
+ env.tearDown();
+ }
+
+ @Test
+ public void largeRemoteAlwaysFlush() throws Exception {
+ StreamNetworkThroughputBenchmark env = new StreamNetworkThroughputBenchmark();
+ env.setUp(1, 1, 0, false);
+ env.executeBenchmark(1_000_000);
+ env.tearDown();
+ }
+
+ @Test
+ public void pointToMultiPointBenchmark() throws Exception {
+ StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark();
+ benchmark.setUp(1, 100, 100);
+ try {
+ benchmark.executeBenchmark(1_000);
+ }
+ finally {
+ benchmark.tearDown();
+ }
+ }
+
+ @Test
+ public void multiPointToPointBenchmark() throws Exception {
+ StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark();
+ benchmark.setUp(4, 1, 100);
+ try {
+ benchmark.executeBenchmark(1_000);
+ }
+ finally {
+ benchmark.tearDown();
+ }
+ }
+
+ @Test
+ public void multiPointToMultiPointBenchmark() throws Exception {
+ StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark();
+ benchmark.setUp(4, 100, 100);
+ try {
+ benchmark.executeBenchmark(1_000);
+ }
+ finally {
+ benchmark.tearDown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ed4f4f1a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java
deleted file mode 100644
index a60fa3c..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io.benchmark;
-
-import org.junit.Test;
-
-/**
- * Tests for various network benchmarks based on {@link StreamNetworkThroughputBenchmark}.
- */
-public class StreamNetworkThroughputBenchmarkTests {
- @Test
- public void pointToPointBenchmark() throws Exception {
- StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark();
- benchmark.setUp(1, 1, 100);
- try {
- benchmark.executeBenchmark(1_000);
- }
- finally {
- benchmark.tearDown();
- }
- }
-
- @Test
- public void largeLocalMode() throws Exception {
- StreamNetworkThroughputBenchmark env = new StreamNetworkThroughputBenchmark();
- env.setUp(4, 10, 100, true);
- env.executeBenchmark(10_000_000);
- env.tearDown();
- }
-
- @Test
- public void largeRemoteMode() throws Exception {
- StreamNetworkThroughputBenchmark env = new StreamNetworkThroughputBenchmark();
- env.setUp(4, 10, 100, false);
- env.executeBenchmark(10_000_000);
- env.tearDown();
- }
-
- @Test
- public void largeRemoteAlwaysFlush() throws Exception {
- StreamNetworkThroughputBenchmark env = new StreamNetworkThroughputBenchmark();
- env.setUp(1, 1, 0, false);
- env.executeBenchmark(1_000_000);
- env.tearDown();
- }
-
- @Test
- public void pointToMultiPointBenchmark() throws Exception {
- StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark();
- benchmark.setUp(1, 100, 100);
- try {
- benchmark.executeBenchmark(1_000);
- }
- finally {
- benchmark.tearDown();
- }
- }
-
- @Test
- public void multiPointToPointBenchmark() throws Exception {
- StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark();
- benchmark.setUp(4, 1, 100);
- try {
- benchmark.executeBenchmark(1_000);
- }
- finally {
- benchmark.tearDown();
- }
- }
-
- @Test
- public void multiPointToMultiPointBenchmark() throws Exception {
- StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark();
- benchmark.setUp(4, 100, 100);
- try {
- benchmark.executeBenchmark(1_000);
- }
- finally {
- benchmark.tearDown();
- }
- }
-}
[6/9] flink git commit: [FLINK-8703][tests] Port WebFrontendITCase to
MiniClusterResource
Posted by ch...@apache.org.
[FLINK-8703][tests] Port WebFrontendITCase to MiniClusterResource
This closes #5665.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c0bba030
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c0bba030
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c0bba030
Branch: refs/heads/release-1.5
Commit: c0bba030cd0b3deb33aada98427473f2c3b403fa
Parents: 0f39ffd
Author: zentol <ch...@apache.org>
Authored: Wed Mar 7 11:14:46 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Thu Apr 19 10:01:00 2018 +0200
----------------------------------------------------------------------
.../runtime/webmonitor/WebFrontendITCase.java | 202 +++++++++++++------
.../webmonitor/testutils/HttpTestClient.java | 19 ++
.../flink/test/util/MiniClusterResource.java | 4 +-
3 files changed, 166 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c0bba030/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
index 14602e3..f512766 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
@@ -19,15 +19,19 @@
package org.apache.flink.runtime.webmonitor;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.apache.flink.runtime.testutils.StoppableInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
+import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.TestLogger;
@@ -38,8 +42,9 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
+import org.junit.After;
import org.junit.Assert;
-import org.junit.BeforeClass;
+import org.junit.ClassRule;
import org.junit.Test;
import java.io.File;
@@ -47,8 +52,13 @@ import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.file.Files;
+import java.util.Collection;
+import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
@@ -66,40 +76,49 @@ public class WebFrontendITCase extends TestLogger {
private static final int NUM_TASK_MANAGERS = 2;
private static final int NUM_SLOTS = 4;
- private static LocalFlinkMiniCluster cluster;
+ private static final Configuration CLUSTER_CONFIGURATION = getClusterConfiguration();
- private static int port = -1;
+ @ClassRule
+ public static final MiniClusterResource CLUSTER = new MiniClusterResource(
+ new MiniClusterResource.MiniClusterResourceConfiguration(
+ CLUSTER_CONFIGURATION,
+ NUM_TASK_MANAGERS,
+ NUM_SLOTS),
+ true
+ );
- @BeforeClass
- public static void initialize() throws Exception {
+ private static Configuration getClusterConfiguration() {
Configuration config = new Configuration();
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS);
- config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
- config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
-
- File logDir = File.createTempFile("TestBaseUtils-logdir", null);
- assertTrue("Unable to delete temp file", logDir.delete());
- assertTrue("Unable to create temp directory", logDir.mkdir());
- File logFile = new File(logDir, "jobmanager.log");
- File outFile = new File(logDir, "jobmanager.out");
+ try {
+ File logDir = File.createTempFile("TestBaseUtils-logdir", null);
+ assertTrue("Unable to delete temp file", logDir.delete());
+ assertTrue("Unable to create temp directory", logDir.mkdir());
+ File logFile = new File(logDir, "jobmanager.log");
+ File outFile = new File(logDir, "jobmanager.out");
- Files.createFile(logFile.toPath());
- Files.createFile(outFile.toPath());
+ Files.createFile(logFile.toPath());
+ Files.createFile(outFile.toPath());
- config.setString(WebOptions.LOG_PATH, logFile.getAbsolutePath());
- config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.getAbsolutePath());
+ config.setString(WebOptions.LOG_PATH, logFile.getAbsolutePath());
+ config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.getAbsolutePath());
+ } catch (Exception e) {
+ throw new AssertionError("Could not setup test.", e);
+ }
+ config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
+ config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
- cluster = new LocalFlinkMiniCluster(config, false);
- cluster.start();
+ return config;
+ }
- port = cluster.webMonitor().get().getServerPort();
+ @After
+ public void tearDown() {
+ BlockingInvokable.reset();
}
@Test
public void getFrontPage() {
try {
- String fromHTTP = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/index.html");
+ String fromHTTP = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/index.html");
String text = "Apache Flink Dashboard";
assertTrue("Startpage should contain " + text, fromHTTP.contains(text));
} catch (Exception e) {
@@ -111,7 +130,7 @@ public class WebFrontendITCase extends TestLogger {
@Test
public void testResponseHeaders() throws Exception {
// check headers for successful json response
- URL taskManagersUrl = new URL("http://localhost:" + port + "/taskmanagers");
+ URL taskManagersUrl = new URL("http://localhost:" + CLUSTER.getWebUIPort() + "/taskmanagers");
HttpURLConnection taskManagerConnection = (HttpURLConnection) taskManagersUrl.openConnection();
taskManagerConnection.setConnectTimeout(100000);
taskManagerConnection.connect();
@@ -127,14 +146,18 @@ public class WebFrontendITCase extends TestLogger {
Assert.assertEquals("application/json; charset=UTF-8", taskManagerConnection.getContentType());
// check headers in case of an error
- URL notFoundJobUrl = new URL("http://localhost:" + port + "/jobs/dontexist");
+ URL notFoundJobUrl = new URL("http://localhost:" + CLUSTER.getWebUIPort() + "/jobs/dontexist");
HttpURLConnection notFoundJobConnection = (HttpURLConnection) notFoundJobUrl.openConnection();
notFoundJobConnection.setConnectTimeout(100000);
notFoundJobConnection.connect();
if (notFoundJobConnection.getResponseCode() >= 400) {
// we don't set the content-encoding header
Assert.assertNull(notFoundJobConnection.getContentEncoding());
- Assert.assertEquals("text/plain; charset=UTF-8", notFoundJobConnection.getContentType());
+ if (Objects.equals(MiniClusterResource.NEW_CODEBASE, System.getProperty(MiniClusterResource.CODEBASE_KEY))) {
+ Assert.assertEquals("application/json; charset=UTF-8", notFoundJobConnection.getContentType());
+ } else {
+ Assert.assertEquals("text/plain; charset=UTF-8", notFoundJobConnection.getContentType());
+ }
} else {
throw new RuntimeException("Request for non-existing job did not return an error.");
}
@@ -143,14 +166,14 @@ public class WebFrontendITCase extends TestLogger {
@Test
public void getNumberOfTaskManagers() {
try {
- String json = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/");
+ String json = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/taskmanagers/");
ObjectMapper mapper = new ObjectMapper();
JsonNode response = mapper.readTree(json);
ArrayNode taskManagers = (ArrayNode) response.get("taskmanagers");
assertNotNull(taskManagers);
- assertEquals(cluster.numTaskManagers(), taskManagers.size());
+ assertEquals(NUM_TASK_MANAGERS, taskManagers.size());
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
@@ -159,14 +182,14 @@ public class WebFrontendITCase extends TestLogger {
@Test
public void getTaskmanagers() throws Exception {
- String json = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/");
+ String json = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/taskmanagers/");
ObjectMapper mapper = new ObjectMapper();
JsonNode parsed = mapper.readTree(json);
ArrayNode taskManagers = (ArrayNode) parsed.get("taskmanagers");
assertNotNull(taskManagers);
- assertEquals(cluster.numTaskManagers(), taskManagers.size());
+ assertEquals(NUM_TASK_MANAGERS, taskManagers.size());
JsonNode taskManager = taskManagers.get(0);
assertNotNull(taskManager);
@@ -176,21 +199,21 @@ public class WebFrontendITCase extends TestLogger {
@Test
public void getLogAndStdoutFiles() throws Exception {
- WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(cluster.configuration());
+ WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(CLUSTER_CONFIGURATION);
FileUtils.writeStringToFile(logFiles.logFile, "job manager log");
- String logs = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/jobmanager/log");
+ String logs = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/jobmanager/log");
assertTrue(logs.contains("job manager log"));
FileUtils.writeStringToFile(logFiles.stdOutFile, "job manager out");
- logs = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/jobmanager/stdout");
+ logs = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/jobmanager/stdout");
assertTrue(logs.contains("job manager out"));
}
@Test
public void getTaskManagerLogAndStdoutFiles() {
try {
- String json = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/");
+ String json = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/taskmanagers/");
ObjectMapper mapper = new ObjectMapper();
JsonNode parsed = mapper.readTree(json);
@@ -198,15 +221,15 @@ public class WebFrontendITCase extends TestLogger {
JsonNode taskManager = taskManagers.get(0);
String id = taskManager.get("id").asText();
- WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(cluster.configuration());
+ WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(CLUSTER_CONFIGURATION);
//we check for job manager log files, since no separate taskmanager logs exist
FileUtils.writeStringToFile(logFiles.logFile, "job manager log");
- String logs = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/" + id + "/log");
+ String logs = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/taskmanagers/" + id + "/log");
assertTrue(logs.contains("job manager log"));
FileUtils.writeStringToFile(logFiles.stdOutFile, "job manager out");
- logs = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/" + id + "/stdout");
+ logs = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/taskmanagers/" + id + "/stdout");
assertTrue(logs.contains("job manager out"));
} catch (Exception e) {
e.printStackTrace();
@@ -217,12 +240,12 @@ public class WebFrontendITCase extends TestLogger {
@Test
public void getConfiguration() {
try {
- String config = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/jobmanager/config");
+ String config = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/jobmanager/config");
Map<String, String> conf = WebMonitorUtils.fromKeyValueJsonArray(config);
assertEquals(
- cluster.configuration().getString("taskmanager.numberOfTaskSlots", null),
- conf.get(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS));
+ CLUSTER_CONFIGURATION.getString(ConfigConstants.LOCAL_START_WEBSERVER, null),
+ conf.get(ConfigConstants.LOCAL_START_WEBSERVER));
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
@@ -232,29 +255,42 @@ public class WebFrontendITCase extends TestLogger {
@Test
public void testStop() throws Exception {
// this only works if there is no active job at this point
- assertTrue(cluster.getCurrentlyRunningJobsJava().isEmpty());
+ assertTrue(getRunningJobs(CLUSTER.getClusterClient()).isEmpty());
// Create a task
final JobVertex sender = new JobVertex("Sender");
sender.setParallelism(2);
- sender.setInvokableClass(StoppableInvokable.class);
+ sender.setInvokableClass(BlockingInvokable.class);
final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", sender);
final JobID jid = jobGraph.getJobID();
- cluster.submitJobDetached(jobGraph);
+ ClusterClient<?> clusterClient = CLUSTER.getClusterClient();
+ clusterClient.setDetached(true);
+ clusterClient.submitJob(jobGraph, WebFrontendITCase.class.getClassLoader());
// wait for job to show up
- while (cluster.getCurrentlyRunningJobsJava().isEmpty()) {
+ while (getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
Thread.sleep(10);
}
+ // wait for tasks to be properly running
+ BlockingInvokable.latch.await();
+
final FiniteDuration testTimeout = new FiniteDuration(2, TimeUnit.MINUTES);
final Deadline deadline = testTimeout.fromNow();
- while (!cluster.getCurrentlyRunningJobsJava().isEmpty()) {
- try (HttpTestClient client = new HttpTestClient("localhost", port)) {
- // Request the file from the web server
+ try (HttpTestClient client = new HttpTestClient("localhost", CLUSTER.getWebUIPort())) {
+ if (Objects.equals(MiniClusterResource.NEW_CODEBASE, System.getProperty(MiniClusterResource.CODEBASE_KEY))) {
+ // stop the job
+ client.sendPatchRequest("/jobs/" + jid + "/?mode=stop", deadline.timeLeft());
+ HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft());
+
+ assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
+ assertEquals("application/json; charset=UTF-8", response.getType());
+ assertEquals("{}", response.getContent());
+ } else {
+ // stop the job
client.sendDeleteRequest("/jobs/" + jid + "/stop", deadline.timeLeft());
HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft());
@@ -262,12 +298,15 @@ public class WebFrontendITCase extends TestLogger {
assertEquals("application/json; charset=UTF-8", response.getType());
assertEquals("{}", response.getContent());
}
+ }
+ // wait for cancellation to finish
+ while (!getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
Thread.sleep(20);
}
// ensure we can access job details when its finished (FLINK-4011)
- try (HttpTestClient client = new HttpTestClient("localhost", port)) {
+ try (HttpTestClient client = new HttpTestClient("localhost", CLUSTER.getWebUIPort())) {
FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS);
client.sendGetRequest("/jobs/" + jid + "/config", timeout);
HttpTestClient.SimpleHttpResponse response = client.getNextResponse(timeout);
@@ -283,40 +322,89 @@ public class WebFrontendITCase extends TestLogger {
@Test
public void testStopYarn() throws Exception {
// this only works if there is no active job at this point
- assertTrue(cluster.getCurrentlyRunningJobsJava().isEmpty());
+ assertTrue(getRunningJobs(CLUSTER.getClusterClient()).isEmpty());
// Create a task
final JobVertex sender = new JobVertex("Sender");
sender.setParallelism(2);
- sender.setInvokableClass(StoppableInvokable.class);
+ sender.setInvokableClass(BlockingInvokable.class);
final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", sender);
final JobID jid = jobGraph.getJobID();
- cluster.submitJobDetached(jobGraph);
+ ClusterClient<?> clusterClient = CLUSTER.getClusterClient();
+ clusterClient.setDetached(true);
+ clusterClient.submitJob(jobGraph, WebFrontendITCase.class.getClassLoader());
// wait for job to show up
- while (cluster.getCurrentlyRunningJobsJava().isEmpty()) {
+ while (getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
Thread.sleep(10);
}
+ // wait for tasks to be properly running
+ BlockingInvokable.latch.await();
+
final FiniteDuration testTimeout = new FiniteDuration(2, TimeUnit.MINUTES);
final Deadline deadline = testTimeout.fromNow();
- while (!cluster.getCurrentlyRunningJobsJava().isEmpty()) {
- try (HttpTestClient client = new HttpTestClient("localhost", port)) {
+ while (!getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
+ try (HttpTestClient client = new HttpTestClient("localhost", CLUSTER.getWebUIPort())) {
// Request the file from the web server
client.sendGetRequest("/jobs/" + jid + "/yarn-stop", deadline.timeLeft());
HttpTestClient.SimpleHttpResponse response = client
.getNextResponse(deadline.timeLeft());
- assertEquals(HttpResponseStatus.OK, response.getStatus());
+ if (Objects.equals(MiniClusterResource.NEW_CODEBASE, System.getProperty(MiniClusterResource.CODEBASE_KEY))) {
+ assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
+ } else {
+ assertEquals(HttpResponseStatus.OK, response.getStatus());
+ }
assertEquals("application/json; charset=UTF-8", response.getType());
assertEquals("{}", response.getContent());
}
Thread.sleep(20);
}
+ BlockingInvokable.reset();
+ }
+
+ private static List<JobID> getRunningJobs(ClusterClient<?> client) throws Exception {
+ Collection<JobStatusMessage> statusMessages = client.listJobs().get();
+ return statusMessages.stream()
+ .filter(status -> !status.getJobState().isGloballyTerminalState())
+ .map(JobStatusMessage::getJobId)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Test invokable that is stoppable and allows waiting for all subtasks to be running.
+ */
+ public static class BlockingInvokable extends AbstractInvokable implements StoppableTask {
+
+ private static CountDownLatch latch = new CountDownLatch(2);
+
+ private volatile boolean isRunning = true;
+
+ public BlockingInvokable(Environment environment) {
+ super(environment);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ latch.countDown();
+ while (isRunning) {
+ Thread.sleep(100);
+ }
+ }
+
+ @Override
+ public void stop() {
+ this.isRunning = false;
+ }
+
+ public static void reset() {
+ latch = new CountDownLatch(2);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c0bba030/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java
index d9608fe..d94f7a2 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java
@@ -186,6 +186,25 @@ public class HttpTestClient implements AutoCloseable {
}
/**
+ * Sends a simple PATCH request to the given path. You only specify the $path part of
+ * http://$host:$host/$path.
+ *
+ * @param path The $path to PATCH (http://$host:$host/$path)
+ */
+ public void sendPatchRequest(String path, FiniteDuration timeout) throws TimeoutException, InterruptedException {
+ if (!path.startsWith("/")) {
+ path = "/" + path;
+ }
+
+ HttpRequest getRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
+ HttpMethod.PATCH, path);
+ getRequest.headers().set(HttpHeaders.Names.HOST, host);
+ getRequest.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
+
+ sendRequest(getRequest, timeout);
+ }
+
+ /**
* Returns the next available HTTP response. A call to this method blocks until a response
* becomes available.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/c0bba030/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
index 160c1d1..8c21b37 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -55,9 +55,9 @@ public class MiniClusterResource extends ExternalResource {
private static final Logger LOG = LoggerFactory.getLogger(MiniClusterResource.class);
- private static final String CODEBASE_KEY = "codebase";
+ public static final String CODEBASE_KEY = "codebase";
- private static final String NEW_CODEBASE = "new";
+ public static final String NEW_CODEBASE = "new";
private final MiniClusterResourceConfiguration miniClusterResourceConfiguration;
[5/9] flink git commit: [FLINK-8960][tests] Port SavepointITCase to
flip6
Posted by ch...@apache.org.
[FLINK-8960][tests] Port SavepointITCase to flip6
This closes #5806.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f1a82d5f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f1a82d5f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f1a82d5f
Branch: refs/heads/release-1.5
Commit: f1a82d5fb333af12f2ac9005e34bb7abfa1fcd66
Parents: 390db6f
Author: zentol <ch...@apache.org>
Authored: Tue Mar 27 14:45:03 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Apr 19 10:01:00 2018 +0200
----------------------------------------------------------------------
.../test/checkpointing/SavepointITCase.java | 510 +++++++------------
1 file changed, 170 insertions(+), 340 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f1a82d5f/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 888c418..9549dc7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -26,41 +26,20 @@ import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.checkpoint.OperatorState;
-import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.TaskInformation;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
-import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
-import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
-import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
-import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.filesystem.FileStateHandle;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestSavepoint;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ResponseSavepoint;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.ResponseSubmitTaskListener;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.IterativeStream;
@@ -69,15 +48,11 @@ import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.util.Collector;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
-import org.apache.flink.shaded.guava18.com.google.common.collect.HashMultimap;
-import org.apache.flink.shaded.guava18.com.google.common.collect.Multimap;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -87,27 +62,18 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileNotFoundException;
-import java.util.ArrayList;
+import java.net.URI;
+import java.time.Duration;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
+import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import scala.Option;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.apache.flink.runtime.messages.JobManagerMessages.getDisposeSavepointSuccess;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -142,264 +108,108 @@ public class SavepointITCase extends TestLogger {
final int numTaskManagers = 2;
final int numSlotsPerTaskManager = 2;
final int parallelism = numTaskManagers * numSlotsPerTaskManager;
- final Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
- final File testRoot = folder.getRoot();
-
- TestingCluster flink = null;
-
- try {
- // Create a test actor system
- ActorSystem testActorSystem = AkkaUtils.createDefaultActorSystem();
-
- // Flink configuration
- final Configuration config = new Configuration();
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
-
- final File checkpointDir = new File(testRoot, "checkpoints");
- final File savepointRootDir = new File(testRoot, "savepoints");
+ final File testRoot = folder.newFolder();
- if (!checkpointDir.mkdir() || !savepointRootDir.mkdirs()) {
- fail("Test setup failed: failed to create temporary directories.");
- }
-
- // Use file based checkpoints
- config.setString(CheckpointingOptions.STATE_BACKEND, "filesystem");
- config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
- config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 0);
- config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointRootDir.toURI().toString());
-
- // Start Flink
- flink = new TestingCluster(config);
- flink.start(true);
-
- // Submit the job
- final JobGraph jobGraph = createJobGraph(parallelism, 0, 1000);
- final JobID jobId = jobGraph.getJobID();
-
- // Reset the static test job helpers
- StatefulCounter.resetForTest(parallelism);
+ Configuration config = new Configuration();
- // Retrieve the job manager
- ActorGateway jobManager = Await.result(flink.leaderGateway().future(), deadline.timeLeft());
+ final File checkpointDir = new File(testRoot, "checkpoints");
+ final File savepointRootDir = new File(testRoot, "savepoints");
- LOG.info("Submitting job " + jobGraph.getJobID() + " in detached mode.");
+ if (!checkpointDir.mkdir() || !savepointRootDir.mkdirs()) {
+ fail("Test setup failed: failed to create temporary directories.");
+ }
- flink.submitJobDetached(jobGraph);
+ // Use file based checkpoints
+ config.setString(CheckpointingOptions.STATE_BACKEND, "filesystem");
+ config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
+ config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 0);
+ config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointRootDir.toURI().toString());
- LOG.info("Waiting for some progress.");
+ MiniClusterResourceFactory clusterFactory = new MiniClusterResourceFactory(numTaskManagers, numSlotsPerTaskManager, config);
- // wait for the JobManager to be ready
- Future<Object> allRunning = jobManager.ask(new WaitForAllVerticesToBeRunning(jobId), deadline.timeLeft());
- Await.ready(allRunning, deadline.timeLeft());
+ String savepointPath = submitJobAndGetVerifiedSavepoint(clusterFactory, parallelism);
- // wait for the Tasks to be ready
- StatefulCounter.getProgressLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ restoreJobAndVerifyState(savepointPath, clusterFactory, parallelism);
+ }
- LOG.info("Triggering a savepoint.");
- Future<Object> savepointPathFuture = jobManager.ask(new TriggerSavepoint(jobId, Option.<String>empty()), deadline.timeLeft());
- final String savepointPath = ((TriggerSavepointSuccess) Await.result(savepointPathFuture, deadline.timeLeft())).savepointPath();
- LOG.info("Retrieved savepoint path: " + savepointPath + ".");
+ private String submitJobAndGetVerifiedSavepoint(MiniClusterResourceFactory clusterFactory, int parallelism) throws Exception {
+ final JobGraph jobGraph = createJobGraph(parallelism, 0, 1000);
+ final JobID jobId = jobGraph.getJobID();
+ StatefulCounter.resetForTest(parallelism);
- // Retrieve the savepoint from the testing job manager
- LOG.info("Requesting the savepoint.");
- Future<Object> savepointFuture = jobManager.ask(new RequestSavepoint(savepointPath), deadline.timeLeft());
+ MiniClusterResource cluster = clusterFactory.get();
+ cluster.before();
+ ClusterClient<?> client = cluster.getClusterClient();
- SavepointV2 savepoint = (SavepointV2) ((ResponseSavepoint) Await.result(savepointFuture, deadline.timeLeft())).savepoint();
- LOG.info("Retrieved savepoint: " + savepointPath + ".");
+ try {
+ client.setDetached(true);
+ client.submitJob(jobGraph, SavepointITCase.class.getClassLoader());
- // Shut down the Flink cluster (thereby canceling the job)
- LOG.info("Shutting down Flink cluster.");
- flink.stop();
- flink = null;
+ StatefulCounter.getProgressLatch().await();
- // - Verification START -------------------------------------------
+ String savepointPath = client.triggerSavepoint(jobId, null).get();
// Only one savepoint should exist
- File[] files = savepointRootDir.listFiles();
-
- if (files != null) {
- assertEquals("Savepoint not created in expected directory", 1, files.length);
- assertTrue("Savepoint did not create self-contained directory", files[0].isDirectory());
+ File savepointDir = new File(new URI(savepointPath));
+ assertTrue("Savepoint directory does not exist.", savepointDir.exists());
+ assertTrue("Savepoint did not create self-contained directory.", savepointDir.isDirectory());
- File savepointDir = files[0];
- File[] savepointFiles = savepointDir.listFiles();
- assertNotNull(savepointFiles);
+ File[] savepointFiles = savepointDir.listFiles();
+ if (savepointFiles != null) {
// Expect one metadata file and one checkpoint file per stateful
// parallel subtask
String errMsg = "Did not write expected number of savepoint/checkpoint files to directory: "
+ Arrays.toString(savepointFiles);
assertEquals(errMsg, 1 + parallelism, savepointFiles.length);
} else {
- fail("Savepoint not created in expected directory");
+ fail(String.format("Returned savepoint path (%s) is not valid.", savepointPath));
}
- // - Verification END ---------------------------------------------
-
- // Restart the cluster
- LOG.info("Restarting Flink cluster.");
- flink = new TestingCluster(config);
- flink.start();
-
- // Retrieve the job manager
- LOG.info("Retrieving JobManager.");
- jobManager = Await.result(flink.leaderGateway().future(), deadline.timeLeft());
- LOG.info("JobManager: " + jobManager + ".");
-
- // Reset static test helpers
+ return savepointPath;
+ } finally {
+ cluster.after();
StatefulCounter.resetForTest(parallelism);
+ }
+ }
- // Gather all task deployment descriptors
- final Throwable[] error = new Throwable[1];
- final TestingCluster finalFlink = flink;
- final Multimap<JobVertexID, TaskDeploymentDescriptor> tdds = HashMultimap.create();
-
- new JavaTestKit(testActorSystem) {{
-
- new Within(deadline.timeLeft()) {
- @Override
- protected void run() {
- try {
- // Register to all submit task messages for job
- for (ActorRef taskManager : finalFlink.getTaskManagersAsJava()) {
- taskManager.tell(new TestingTaskManagerMessages
- .RegisterSubmitTaskListener(jobId), getTestActor());
- }
-
- // Set the savepoint path
- jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
-
- LOG.info("Resubmitting job " + jobGraph.getJobID() + " with " +
- "savepoint path " + savepointPath + " in detached mode.");
-
- // Submit the job
- finalFlink.submitJobDetached(jobGraph);
-
- int numTasks = 0;
- for (JobVertex jobVertex : jobGraph.getVertices()) {
- numTasks += jobVertex.getParallelism();
- }
-
- // Gather the task deployment descriptors
- LOG.info("Gathering " + numTasks + " submitted " +
- "TaskDeploymentDescriptor instances.");
-
- for (int i = 0; i < numTasks; i++) {
- ResponseSubmitTaskListener resp = (ResponseSubmitTaskListener)
- expectMsgAnyClassOf(getRemainingTime(),
- ResponseSubmitTaskListener.class);
-
- TaskDeploymentDescriptor tdd = resp.tdd();
-
- LOG.info("Received: " + tdd.toString() + ".");
-
- TaskInformation taskInformation = tdd
- .getSerializedTaskInformation()
- .deserializeValue(getClass().getClassLoader());
-
- tdds.put(taskInformation.getJobVertexId(), tdd);
- }
- } catch (Throwable t) {
- error[0] = t;
- }
- }
- };
- }};
-
- ExecutionGraph graph = (ExecutionGraph) ((JobManagerMessages.JobFound) Await.result(jobManager.ask(new JobManagerMessages.RequestJob(jobId), deadline.timeLeft()), deadline.timeLeft())).executionGraph();
-
- // - Verification START -------------------------------------------
-
- String errMsg = "Error during gathering of TaskDeploymentDescriptors";
- if (error[0] != null) {
- throw new RuntimeException(error[0]);
- }
-
- Map<OperatorID, Tuple2<Integer, ExecutionJobVertex>> operatorToJobVertexMapping = new HashMap<>();
- for (ExecutionJobVertex task : graph.getVerticesTopologically()) {
- List<OperatorID> operatorIDs = task.getOperatorIDs();
- for (int x = 0; x < operatorIDs.size(); x++) {
- operatorToJobVertexMapping.put(operatorIDs.get(x), new Tuple2<>(x, task));
- }
- }
-
- // Verify that all tasks, which are part of the savepoint
- // have a matching task deployment descriptor.
- for (OperatorState operatorState : savepoint.getOperatorStates()) {
- Tuple2<Integer, ExecutionJobVertex> chainIndexAndJobVertex = operatorToJobVertexMapping.get(operatorState.getOperatorID());
- Collection<TaskDeploymentDescriptor> taskTdds = tdds.get(chainIndexAndJobVertex.f1.getJobVertexId());
-
- errMsg = "Missing task for savepoint state for operator "
- + operatorState.getOperatorID() + ".";
- assertTrue(errMsg, taskTdds.size() > 0);
-
- assertEquals(operatorState.getNumberCollectedStates(), taskTdds.size());
+ private void restoreJobAndVerifyState(String savepointPath, MiniClusterResourceFactory clusterFactory, int parallelism) throws Exception {
+ final JobGraph jobGraph = createJobGraph(parallelism, 0, 1000);
+ jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
+ final JobID jobId = jobGraph.getJobID();
+ StatefulCounter.resetForTest(parallelism);
- for (TaskDeploymentDescriptor tdd : taskTdds) {
- OperatorSubtaskState subtaskState = operatorState.getState(tdd.getSubtaskIndex());
+ MiniClusterResource cluster = clusterFactory.get();
+ cluster.before();
+ ClusterClient<?> client = cluster.getClusterClient();
- assertNotNull(subtaskState);
- }
- }
+ try {
+ client.setDetached(true);
+ client.submitJob(jobGraph, SavepointITCase.class.getClassLoader());
// Await state is restored
- StatefulCounter.getRestoreLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ StatefulCounter.getRestoreLatch().await();
// Await some progress after restore
- StatefulCounter.getProgressLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-
- // - Verification END ---------------------------------------------
+ StatefulCounter.getProgressLatch().await();
- LOG.info("Cancelling job " + jobId + ".");
- jobManager.tell(new CancelJob(jobId));
+ client.cancel(jobId);
- LOG.info("Disposing savepoint " + savepointPath + ".");
- Future<Object> disposeFuture = jobManager.ask(new DisposeSavepoint(savepointPath), deadline.timeLeft());
+ FutureUtils.retrySuccesfulWithDelay(
+ () -> client.getJobStatus(jobId),
+ Time.milliseconds(50),
+ Deadline.now().plus(Duration.ofSeconds(30)),
+ status -> status == JobStatus.CANCELED,
+ TestingUtils.defaultScheduledExecutor()
+ );
- errMsg = "Failed to dispose savepoint " + savepointPath + ".";
- Object resp = Await.result(disposeFuture, deadline.timeLeft());
- assertTrue(errMsg, resp.getClass() == getDisposeSavepointSuccess().getClass());
+ client.disposeSavepoint(savepointPath)
+ .get();
- // - Verification START -------------------------------------------
- // The checkpoint files
- List<File> checkpointFiles = new ArrayList<>();
-
- for (OperatorState stateForTaskGroup : savepoint.getOperatorStates()) {
- for (OperatorSubtaskState subtaskState : stateForTaskGroup.getStates()) {
- Collection<OperatorStateHandle> streamTaskState = subtaskState.getManagedOperatorState();
-
- if (streamTaskState != null && !streamTaskState.isEmpty()) {
- for (OperatorStateHandle osh : streamTaskState) {
- FileStateHandle fileStateHandle = (FileStateHandle) osh.getDelegateStateHandle();
- checkpointFiles.add(new File(fileStateHandle.getFilePath().toUri()));
- }
- }
- }
- }
-
- // The checkpoint files of the savepoint should have been discarded
- for (File f : checkpointFiles) {
- errMsg = "Checkpoint file " + f + " not cleaned up properly.";
- assertFalse(errMsg, f.exists());
- }
-
- if (checkpointFiles.size() > 0) {
- File parent = checkpointFiles.get(0).getParentFile();
- errMsg = "Checkpoint parent directory " + parent + " not cleaned up properly.";
- assertFalse(errMsg, parent.exists());
- }
-
- // All savepoints should have been cleaned up
- errMsg = "Savepoints directory not cleaned up properly: " +
- Arrays.toString(savepointRootDir.listFiles()) + ".";
- assertEquals(errMsg, 0, savepointRootDir.listFiles().length);
-
- // - Verification END ---------------------------------------------
+ assertFalse("Savepoint not properly cleaned up.", new File(savepointPath).exists());
} finally {
- if (flink != null) {
- flink.stop();
- }
+ cluster.after();
+ StatefulCounter.resetForTest(parallelism);
}
}
@@ -410,34 +220,23 @@ public class SavepointITCase extends TestLogger {
int numSlotsPerTaskManager = 1;
int parallelism = numTaskManagers * numSlotsPerTaskManager;
- // Test deadline
- final Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
-
- final File tmpDir = folder.getRoot();
+ final File tmpDir = folder.newFolder();
final File savepointDir = new File(tmpDir, "savepoints");
- TestingCluster flink = null;
+ final Configuration config = new Configuration();
+ config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
+
+ MiniClusterResource cluster = new MiniClusterResource(
+ new MiniClusterResource.MiniClusterResourceConfiguration(
+ config,
+ numTaskManagers,
+ numSlotsPerTaskManager
+ ),
+ true);
+ cluster.before();
+ ClusterClient<?> client = cluster.getClusterClient();
try {
- // Flink configuration
- final Configuration config = new Configuration();
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
- config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
-
- LOG.info("Flink configuration: " + config + ".");
-
- // Start Flink
- flink = new TestingCluster(config);
- LOG.info("Starting Flink cluster.");
- flink.start();
-
- // Retrieve the job manager
- LOG.info("Retrieving JobManager.");
- ActorGateway jobManager = Await.result(
- flink.leaderGateway().future(),
- deadline.timeLeft());
- LOG.info("JobManager: " + jobManager + ".");
// High value to ensure timeouts if restarted.
int numberOfRetries = 1000;
@@ -453,15 +252,17 @@ public class SavepointITCase extends TestLogger {
LOG.info("Submitting job " + jobGraph.getJobID() + " in detached mode.");
try {
- flink.submitJobAndWait(jobGraph, false);
+ client.setDetached(false);
+ client.submitJob(jobGraph, SavepointITCase.class.getClassLoader());
} catch (Exception e) {
- assertEquals(JobExecutionException.class, e.getClass());
- assertEquals(FileNotFoundException.class, e.getCause().getClass());
+ Optional<JobExecutionException> expectedJobExecutionException = ExceptionUtils.findThrowable(e, JobExecutionException.class);
+ Optional<FileNotFoundException> expectedFileNotFoundException = ExceptionUtils.findThrowable(e, FileNotFoundException.class);
+ if (!(expectedJobExecutionException.isPresent() && expectedFileNotFoundException.isPresent())) {
+ throw e;
+ }
}
} finally {
- if (flink != null) {
- flink.stop();
- }
+ cluster.after();
}
}
@@ -480,15 +281,13 @@ public class SavepointITCase extends TestLogger {
int parallelism = 2;
// Test deadline
- final Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
+ final Deadline deadline = Deadline.now().plus(Duration.ofMinutes(5));
- final File tmpDir = folder.getRoot();
+ final File tmpDir = folder.newFolder();
final File savepointDir = new File(tmpDir, "savepoints");
// Flink configuration
final Configuration config = new Configuration();
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
String savepointPath;
@@ -496,18 +295,18 @@ public class SavepointITCase extends TestLogger {
LOG.info("Flink configuration: " + config + ".");
// Start Flink
- TestingCluster flink = new TestingCluster(config);
+ MiniClusterResource cluster = new MiniClusterResource(
+ new MiniClusterResource.MiniClusterResourceConfiguration(
+ config,
+ numTaskManagers,
+ numSlotsPerTaskManager
+ ),
+ true);
+
+ LOG.info("Shutting down Flink cluster.");
+ cluster.before();
+ ClusterClient<?> client = cluster.getClusterClient();
try {
- LOG.info("Starting Flink cluster.");
- flink.start(true);
-
- // Retrieve the job manager
- LOG.info("Retrieving JobManager.");
- ActorGateway jobManager = Await.result(
- flink.leaderGateway().future(),
- deadline.timeLeft());
- LOG.info("JobManager: " + jobManager + ".");
-
final StatefulCounter statefulCounter = new StatefulCounter();
StatefulCounter.resetForTest(parallelism);
@@ -536,38 +335,34 @@ public class SavepointITCase extends TestLogger {
JobGraph originalJobGraph = env.getStreamGraph().getJobGraph();
- JobSubmissionResult submissionResult = flink.submitJobDetached(originalJobGraph);
+ client.setDetached(true);
+ JobSubmissionResult submissionResult = client.submitJob(originalJobGraph, SavepointITCase.class.getClassLoader());
JobID jobID = submissionResult.getJobID();
// wait for the Tasks to be ready
StatefulCounter.getProgressLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
- Future<Object> savepointPathFuture = jobManager.ask(new TriggerSavepoint(jobID, Option.<String>empty()), deadline.timeLeft());
- savepointPath = ((TriggerSavepointSuccess) Await.result(savepointPathFuture, deadline.timeLeft())).savepointPath();
- Future<Object> savepointFuture = jobManager.ask(new RequestSavepoint(savepointPath), deadline.timeLeft());
-
- ((ResponseSavepoint) Await.result(savepointFuture, deadline.timeLeft())).savepoint();
+ savepointPath = client.triggerSavepoint(jobID, null).get();
LOG.info("Retrieved savepoint: " + savepointPath + ".");
} finally {
// Shut down the Flink cluster (thereby canceling the job)
LOG.info("Shutting down Flink cluster.");
- flink.stop();
+ cluster.after();
}
// create a new TestingCluster to make sure we start with completely
// new resources
- flink = new TestingCluster(config);
+ cluster = new MiniClusterResource(
+ new MiniClusterResource.MiniClusterResourceConfiguration(
+ config,
+ numTaskManagers,
+ numSlotsPerTaskManager
+ ),
+ true);
+ LOG.info("Restarting Flink cluster.");
+ cluster.before();
+ client = cluster.getClusterClient();
try {
- LOG.info("Restarting Flink cluster.");
- flink = new TestingCluster(config);
-
- flink.start(true);
-
- // Retrieve the job manager
- LOG.info("Retrieving JobManager.");
- ActorGateway jobManager = Await.result(flink.leaderGateway().future(), deadline.timeLeft());
- LOG.info("JobManager: " + jobManager + ".");
-
// Reset static test helpers
StatefulCounter.resetForTest(parallelism);
@@ -598,14 +393,15 @@ public class SavepointITCase extends TestLogger {
"savepoint path " + savepointPath + " in detached mode.");
// Submit the job
- flink.submitJobDetached(modifiedJobGraph);
+ client.setDetached(true);
+ client.submitJob(modifiedJobGraph, SavepointITCase.class.getClassLoader());
// Await state is restored
StatefulCounter.getRestoreLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
// Await some progress after restore
StatefulCounter.getProgressLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
} finally {
- flink.stop();
+ cluster.after();
}
}
@@ -787,7 +583,6 @@ public class SavepointITCase extends TestLogger {
Configuration config = new Configuration();
config.addAll(jobGraph.getJobConfiguration());
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2 * jobGraph.getMaximumParallelism());
final File checkpointDir = new File(tmpDir, "checkpoints");
final File savepointDir = new File(tmpDir, "savepoints");
@@ -800,31 +595,40 @@ public class SavepointITCase extends TestLogger {
config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 0);
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
- TestingCluster cluster = new TestingCluster(config, false);
+ MiniClusterResource cluster = new MiniClusterResource(
+ new MiniClusterResource.MiniClusterResourceConfiguration(
+ config,
+ 1,
+ 2 * jobGraph.getMaximumParallelism()
+ ),
+ true);
+ cluster.before();
+ ClusterClient<?> client = cluster.getClusterClient();
+
String savepointPath = null;
try {
- cluster.start();
-
- cluster.submitJobDetached(jobGraph);
+ client.setDetached(true);
+ client.submitJob(jobGraph, SavepointITCase.class.getClassLoader());
for (OneShotLatch latch : iterTestSnapshotWait) {
latch.await();
}
- savepointPath = cluster.triggerSavepoint(jobGraph.getJobID());
+ savepointPath = client.triggerSavepoint(jobGraph.getJobID(), null).get();
source.cancel();
jobGraph = streamGraph.getJobGraph();
jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
- cluster.submitJobDetached(jobGraph);
+ client.setDetached(true);
+ client.submitJob(jobGraph, SavepointITCase.class.getClassLoader());
for (OneShotLatch latch : iterTestRestoreWait) {
latch.await();
}
source.cancel();
} finally {
if (null != savepointPath) {
- cluster.disposeSavepoint(savepointPath);
+ client.disposeSavepoint(savepointPath);
}
- cluster.stop();
+ cluster.after();
}
}
@@ -904,4 +708,30 @@ public class SavepointITCase extends TestLogger {
}
}
}
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ private static class MiniClusterResourceFactory {
+ private final int numTaskManagers;
+ private final int numSlotsPerTaskManager;
+ private final Configuration config;
+
+ private MiniClusterResourceFactory(int numTaskManagers, int numSlotsPerTaskManager, Configuration config) {
+ this.numTaskManagers = numTaskManagers;
+ this.numSlotsPerTaskManager = numSlotsPerTaskManager;
+ this.config = config;
+ }
+
+ MiniClusterResource get() {
+ return new MiniClusterResource(
+ new MiniClusterResource.MiniClusterResourceConfiguration(
+ config,
+ numTaskManagers,
+ numSlotsPerTaskManager
+ ),
+ true);
+ }
+ }
}