You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/04 12:22:19 UTC

[01/18] flink git commit: [hotfix][docs][table] Minor fixes

Repository: flink
Updated Branches:
  refs/heads/master db366cd3d -> 6ffe22db8


[hotfix][docs][table] Minor fixes

This closes #5795.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2426f788
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2426f788
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2426f788

Branch: refs/heads/master
Commit: 2426f78841509c576770a9c30ef88e5d76ba90f4
Parents: ac07761
Author: mayyamus <ch...@outlook.com>
Authored: Fri Mar 30 21:44:47 2018 +0800
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 4 08:59:05 2018 +0200

----------------------------------------------------------------------
 docs/dev/table/common.md    | 4 ++--
 docs/dev/table/streaming.md | 2 +-
 2 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2426f788/docs/dev/table/common.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/common.md b/docs/dev/table/common.md
index e1d3809..3e3965a 100644
--- a/docs/dev/table/common.md
+++ b/docs/dev/table/common.md
@@ -775,8 +775,8 @@ DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);
 TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
   Types.STRING(),
   Types.INT());
-DataStream<Tuple2<String, Integer>> dsTuple = 
-  tableEnv.toAppendStream(table, tupleType);
+DataSet<Tuple2<String, Integer>> dsTuple = 
+  tableEnv.toDataSet(table, tupleType);
 {% endhighlight %}
 </div>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2426f788/docs/dev/table/streaming.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/streaming.md b/docs/dev/table/streaming.md
index 310121e..dc0fdf8 100644
--- a/docs/dev/table/streaming.md
+++ b/docs/dev/table/streaming.md
@@ -362,7 +362,7 @@ In either case the event time timestamp field will hold the value of the `DataSt
 // Option 1:
 
 // extract timestamp and assign watermarks based on knowledge of the stream
-DataStream<Tuple3<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
+DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
 
 // declare an additional logical field as an event time attribute
 Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.rowtime");


[14/18] flink git commit: [FLINK-9104][docs] Update generator and regenerate REST API docs

Posted by ch...@apache.org.
[FLINK-9104][docs] Update generator and regenerate REST API docs

This closes #5797.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/83fffdf3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/83fffdf3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/83fffdf3

Branch: refs/heads/master
Commit: 83fffdf3ab044ef770f9c97f8fa60b782a9e9f78
Parents: f1a8dd0
Author: Rong Rong <wa...@hotmail.com>
Authored: Sat Mar 31 12:29:59 2018 -0700
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 4 09:22:03 2018 +0200

----------------------------------------------------------------------
 docs/_includes/generated/rest_dispatcher.html   | 3054 ++++++++++++------
 .../flink/docs/rest/RestAPIDocGenerator.java    |   41 +
 2 files changed, 2071 insertions(+), 1024 deletions(-)
----------------------------------------------------------------------



[10/18] flink git commit: [FLINK-8966][tests] Upload user-jars in MiniCluster

Posted by ch...@apache.org.
[FLINK-8966][tests] Upload user-jars in MiniCluster


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3947a395
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3947a395
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3947a395

Branch: refs/heads/master
Commit: 3947a395ecfe37de97be346416493d933cd3fe5a
Parents: 2426f78
Author: zentol <ch...@apache.org>
Authored: Wed Mar 14 12:49:11 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 4 08:59:30 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/minicluster/MiniCluster.java  | 38 +++++++++++++++++++-
 1 file changed, 37 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3947a395/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 2e826eb..64d46c6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -25,9 +25,12 @@ import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobCacheService;
+import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
@@ -86,8 +89,10 @@ import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
@@ -632,7 +637,10 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 		// from the ResourceManager
 		jobGraph.setAllowQueuedScheduling(true);
 
-		final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = dispatcherGateway.submitJob(jobGraph, rpcTimeout);
+		final CompletableFuture<Void> jarUploadFuture = uploadAndSetJarFiles(dispatcherGateway, jobGraph);
+
+		final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture.thenCompose(
+			(Void ack) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout));
 
 		return acknowledgeCompletableFuture.thenApply(
 			(Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID()));
@@ -661,6 +669,34 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 		}
 	}
 
+	private CompletableFuture<Void> uploadAndSetJarFiles(final DispatcherGateway currentDispatcherGateway, final JobGraph job) {
+		List<Path> userJars = job.getUserJars();
+		if (!userJars.isEmpty()) {
+			CompletableFuture<List<PermanentBlobKey>> jarUploadFuture = uploadJarFiles(currentDispatcherGateway, job.getJobID(), job.getUserJars());
+			return jarUploadFuture.thenAccept(blobKeys -> {
+					for (PermanentBlobKey blobKey : blobKeys) {
+						job.addBlob(blobKey);
+					}
+				});
+		} else {
+			LOG.debug("No jars to upload for job {}.", job.getJobID());
+			return CompletableFuture.completedFuture(null);
+		}
+	}
+
+	private CompletableFuture<List<PermanentBlobKey>> uploadJarFiles(final DispatcherGateway currentDispatcherGateway, final JobID jobId, final List<Path> jars) {
+		return currentDispatcherGateway.getBlobServerPort(rpcTimeout)
+			.thenApply(blobServerPort -> {
+				InetSocketAddress blobServerAddress = new InetSocketAddress(currentDispatcherGateway.getHostname(), blobServerPort);
+
+				try {
+					return BlobClient.uploadJarFiles(blobServerAddress, miniClusterConfiguration.getConfiguration(), jobId, jars);
+				} catch (IOException ioe) {
+					throw new CompletionException(new FlinkException("Could not upload job jar files.", ioe));
+				}
+			});
+	}
+
 	// ------------------------------------------------------------------------
 	//  factories - can be overridden by subclasses to alter behavior
 	// ------------------------------------------------------------------------


[05/18] flink git commit: [FLINK-8963][tests] Port BigUserProgramJobSubmitITCase to MiniClusterResource

Posted by ch...@apache.org.
[FLINK-8963][tests] Port BigUserProgramJobSubmitITCase to MiniClusterResource

This closes #5772.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/576ba1d7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/576ba1d7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/576ba1d7

Branch: refs/heads/master
Commit: 576ba1d79c0d6f4daf648bf51df6af0b94845e7d
Parents: 2d87453
Author: zentol <ch...@apache.org>
Authored: Tue Feb 20 18:02:51 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 4 08:59:05 2018 +0200

----------------------------------------------------------------------
 .../runtime/BigUserProgramJobSubmitITCase.java  | 84 +++++++++++++-------
 1 file changed, 56 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/576ba1d7/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java
index a4d5958..b10dbec 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java
@@ -18,21 +18,27 @@
 package org.apache.flink.test.streaming.runtime;
 
 import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.client.deployment.StandaloneClusterId;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
-import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
+import org.apache.flink.testutils.category.New;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.junit.experimental.categories.Category;
 
+import java.net.URI;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -44,37 +50,48 @@ import static org.junit.Assert.assertEquals;
  * Integration test that verifies that a user program with a big(ger) payload is successfully
  * submitted and run.
  */
-@Ignore("Fails on job submission payload being too large - [FLINK-7285]")
+@Category(New.class)
 public class BigUserProgramJobSubmitITCase extends TestLogger {
 
 	// ------------------------------------------------------------------------
 	//  The mini cluster that is shared across tests
 	// ------------------------------------------------------------------------
 
-	private static final int DEFAULT_PARALLELISM = 1;
+	private static final MiniCluster CLUSTER;
+	private static final RestClusterClient<StandaloneClusterId> CLIENT;
 
-	private static LocalFlinkMiniCluster cluster;
+	static {
+		try {
+			MiniClusterConfiguration clusterConfiguration = new MiniClusterConfiguration.Builder()
+				.setNumTaskManagers(1)
+				.setNumSlotsPerTaskManager(1)
+				.build();
+			CLUSTER = new MiniCluster(clusterConfiguration);
+			CLUSTER.start();
 
-	private static final Logger LOG = LoggerFactory.getLogger(BigUserProgramJobSubmitITCase.class);
+			URI restAddress = CLUSTER.getRestAddress();
+
+			final Configuration clientConfig = new Configuration();
+			clientConfig.setString(JobManagerOptions.ADDRESS, restAddress.getHost());
+			clientConfig.setInteger(RestOptions.REST_PORT, restAddress.getPort());
+
+			CLIENT = new RestClusterClient<>(
+				clientConfig,
+				StandaloneClusterId.getInstance());
+
+		} catch (Exception e) {
+			throw new AssertionError("Could not setup cluster.", e);
+		}
+	}
 
 	// ------------------------------------------------------------------------
 	//  Cluster setup & teardown
 	// ------------------------------------------------------------------------
 
-	@BeforeClass
-	public static void setup() throws Exception {
-		// make sure we do not use a singleActorSystem for the tests
-		// (therefore, we cannot simply inherit from StreamingMultipleProgramsTestBase)
-		LOG.info("Starting FlinkMiniCluster");
-		cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, false, false, false);
-		TestStreamEnvironment.setAsContext(cluster, DEFAULT_PARALLELISM);
-	}
-
 	@AfterClass
 	public static void teardown() throws Exception {
-		LOG.info("Closing FlinkMiniCluster");
-		TestStreamEnvironment.unsetAsContext();
-		TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
+		CLIENT.shutdown();
+		CLUSTER.close();
 	}
 
 	private final Random rnd = new Random();
@@ -85,15 +102,16 @@ public class BigUserProgramJobSubmitITCase extends TestLogger {
 	@Test
 	public void bigDataInMap() throws Exception {
 
-		final byte[] data = new byte[100 * 1024 * 1024]; // 100 MB
+		final byte[] data = new byte[16 * 1024 * 1024]; // 16 MB
 		rnd.nextBytes(data); // use random data so that Java does not optimise it away
 		data[1] = 0;
 		data[3] = 0;
 		data[5] = 0;
 
-		TestListResultSink<String> resultSink = new TestListResultSink<>();
+		CollectingSink resultSink = new CollectingSink();
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
 
 		DataStream<Integer> src = env.fromElements(1, 3, 5);
 
@@ -106,15 +124,25 @@ public class BigUserProgramJobSubmitITCase extends TestLogger {
 			}
 		}).addSink(resultSink);
 
-		env.execute();
+		JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+		CLIENT.setDetached(false);
+		CLIENT.submitJob(jobGraph, BigUserProgramJobSubmitITCase.class.getClassLoader());
 
 		List<String> expected = Arrays.asList("x 1 0", "x 3 0", "x 5 0");
 
-		List<String> result = resultSink.getResult();
+		List<String> result = CollectingSink.result;
 
 		Collections.sort(expected);
 		Collections.sort(result);
 
 		assertEquals(expected, result);
 	}
+
+	private static class CollectingSink implements SinkFunction<String> {
+		private static final List<String> result = Collections.synchronizedList(new ArrayList<>(3));
+
+		public void invoke(String value, Context context) throws Exception {
+			result.add(value);
+		}
+	}
 }


[16/18] flink git commit: [FLINK-8697] Rename DummyFlinkKafkaConsumer in Kinesis tests This closes #5809.

Posted by ch...@apache.org.
[FLINK-8697] Rename DummyFlinkKafkaConsumer in Kinesis tests  This closes #5809.

This closes #5785.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/baa1ad3c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/baa1ad3c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/baa1ad3c

Branch: refs/heads/master
Commit: baa1ad3cc32e600d426c7f4aa075232369069822
Parents: df719e7
Author: Bowen Li <bo...@gmail.com>
Authored: Tue Apr 3 13:53:23 2018 -0700
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 4 09:27:31 2018 +0200

----------------------------------------------------------------------
 .../connectors/kinesis/internals/KinesisDataFetcherTest.java | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/baa1ad3c/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
index 7854d03..ccf39d0 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
@@ -113,7 +113,7 @@ public class KinesisDataFetcherTest extends TestLogger {
 
 		// FlinkKinesisConsumer is responsible for setting up the fetcher before it can be run;
 		// run the consumer until it reaches the point where the fetcher starts to run
-		final DummyFlinkKafkaConsumer<String> consumer = new DummyFlinkKafkaConsumer<>(TestUtils.getStandardProperties(), fetcher, 1, 0);
+		final DummyFlinkKinesisConsumer<String> consumer = new DummyFlinkKinesisConsumer<>(TestUtils.getStandardProperties(), fetcher, 1, 0);
 
 		CheckedThread consumerThread = new CheckedThread() {
 			@Override
@@ -171,7 +171,7 @@ public class KinesisDataFetcherTest extends TestLogger {
 				subscribedStreamsToLastSeenShardIdsUnderTest,
 				FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(streamToShardCount));
 
-		final DummyFlinkKafkaConsumer<String> consumer = new DummyFlinkKafkaConsumer<>(
+		final DummyFlinkKinesisConsumer<String> consumer = new DummyFlinkKinesisConsumer<>(
 				TestUtils.getStandardProperties(), fetcher, 1, 0);
 
 		CheckedThread consumerThread = new CheckedThread() {
@@ -609,7 +609,7 @@ public class KinesisDataFetcherTest extends TestLogger {
 		assertEquals(streamShardHandle, KinesisDataFetcher.convertToStreamShardHandle(kinesisStreamShard));
 	}
 
-	private static class DummyFlinkKafkaConsumer<T> extends FlinkKinesisConsumer<T> {
+	private static class DummyFlinkKinesisConsumer<T> extends FlinkKinesisConsumer<T> {
 		private static final long serialVersionUID = 1L;
 
 		private final KinesisDataFetcher<T> fetcher;
@@ -618,7 +618,7 @@ public class KinesisDataFetcherTest extends TestLogger {
 		private final int subtaskIndex;
 
 		@SuppressWarnings("unchecked")
-		DummyFlinkKafkaConsumer(
+		DummyFlinkKinesisConsumer(
 				Properties properties,
 				KinesisDataFetcher<T> fetcher,
 				int numParallelSubtasks,


[11/18] flink git commit: [FLINK-8966][tests] Port AvroExternalJarProgramITCase to flip6

Posted by ch...@apache.org.
[FLINK-8966][tests] Port AvroExternalJarProgramITCase to flip6

This closes #5766.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f1a8dd06
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f1a8dd06
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f1a8dd06

Branch: refs/heads/master
Commit: f1a8dd0654030d6b4bd79732ef1d0f32c5f6820e
Parents: 3947a39
Author: zentol <ch...@apache.org>
Authored: Tue Apr 3 11:20:12 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 4 08:59:45 2018 +0200

----------------------------------------------------------------------
 .../avro/AvroExternalJarProgramITCase.java      | 75 +++++++---------
 .../LegacyAvroExternalJarProgramITCase.java     | 92 ++++++++++++++++++++
 2 files changed, 124 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f1a8dd06/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java
index 985471a..6766947 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java
@@ -19,74 +19,63 @@
 package org.apache.flink.formats.avro;
 
 import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.formats.avro.testjar.AvroExternalJarProgram;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.test.util.TestEnvironment;
+import org.apache.flink.testutils.category.New;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.Assert;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import java.io.File;
-import java.net.URL;
 import java.util.Collections;
 
 /**
  * IT case for the {@link AvroExternalJarProgram}.
  */
+@Category(New.class)
 public class AvroExternalJarProgramITCase extends TestLogger {
 
 	private static final String JAR_FILE = "maven-test-jar.jar";
 
 	private static final String TEST_DATA_FILE = "/testdata.avro";
 
-	@Test
-	public void testExternalProgram() {
-
-		LocalFlinkMiniCluster testMiniCluster = null;
+	private static final int PARALLELISM = 4;
 
-		try {
-			int parallelism = 4;
-			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
-			testMiniCluster = new LocalFlinkMiniCluster(config, false);
-			testMiniCluster.start();
+	private static final MiniCluster MINI_CLUSTER = new MiniCluster(
+		new MiniClusterConfiguration.Builder()
+			.setNumTaskManagers(1)
+			.setNumSlotsPerTaskManager(PARALLELISM)
+			.build());
 
-			String jarFile = JAR_FILE;
-			String testData = getClass().getResource(TEST_DATA_FILE).toString();
+	@BeforeClass
+	public static void setUp() throws Exception {
+		MINI_CLUSTER.start();
+	}
 
-			PackagedProgram program = new PackagedProgram(new File(jarFile), new String[] { testData });
+	@AfterClass
+	public static void tearDown() {
+		TestEnvironment.unsetAsContext();
+		MINI_CLUSTER.closeAsync();
+	}
 
-			TestEnvironment.setAsContext(
-				testMiniCluster,
-				parallelism,
-				Collections.singleton(new Path(jarFile)),
-				Collections.<URL>emptyList());
+	@Test
+	public void testExternalProgram() throws Exception {
+		TestEnvironment.setAsContext(
+			MINI_CLUSTER,
+			PARALLELISM,
+			Collections.singleton(new Path(JAR_FILE)),
+			Collections.emptyList());
 
-			config.setString(JobManagerOptions.ADDRESS, "localhost");
-			config.setInteger(JobManagerOptions.PORT, testMiniCluster.getLeaderRPCPort());
+		String testData = getClass().getResource(TEST_DATA_FILE).toString();
 
-			program.invokeInteractiveModeForExecution();
-		}
-		catch (Throwable t) {
-			System.err.println(t.getMessage());
-			t.printStackTrace();
-			Assert.fail("Error during the packaged program execution: " + t.getMessage());
-		}
-		finally {
-			TestEnvironment.unsetAsContext();
+		PackagedProgram program = new PackagedProgram(new File(JAR_FILE), new String[]{testData});
 
-			if (testMiniCluster != null) {
-				try {
-					testMiniCluster.stop();
-				} catch (Throwable t) {
-					// ignore
-				}
-			}
-		}
+		program.invokeInteractiveModeForExecution();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f1a8dd06/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/LegacyAvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/LegacyAvroExternalJarProgramITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/LegacyAvroExternalJarProgramITCase.java
new file mode 100644
index 0000000..1dd56a7
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/LegacyAvroExternalJarProgramITCase.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.avro.testjar.AvroExternalJarProgram;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.test.util.TestEnvironment;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.net.URL;
+import java.util.Collections;
+
+/**
+ * IT case for the {@link AvroExternalJarProgram}.
+ */
+public class LegacyAvroExternalJarProgramITCase extends TestLogger {
+
+	private static final String JAR_FILE = "maven-test-jar.jar";
+
+	private static final String TEST_DATA_FILE = "/testdata.avro";
+
+	@Test
+	public void testExternalProgram() {
+
+		LocalFlinkMiniCluster testMiniCluster = null;
+
+		try {
+			int parallelism = 4;
+			Configuration config = new Configuration();
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
+			testMiniCluster = new LocalFlinkMiniCluster(config, false);
+			testMiniCluster.start();
+
+			String jarFile = JAR_FILE;
+			String testData = getClass().getResource(TEST_DATA_FILE).toString();
+
+			PackagedProgram program = new PackagedProgram(new File(jarFile), new String[] { testData });
+
+			TestEnvironment.setAsContext(
+				testMiniCluster,
+				parallelism,
+				Collections.singleton(new Path(jarFile)),
+				Collections.<URL>emptyList());
+
+			config.setString(JobManagerOptions.ADDRESS, "localhost");
+			config.setInteger(JobManagerOptions.PORT, testMiniCluster.getLeaderRPCPort());
+
+			program.invokeInteractiveModeForExecution();
+		}
+		catch (Throwable t) {
+			System.err.println(t.getMessage());
+			t.printStackTrace();
+			Assert.fail("Error during the packaged program execution: " + t.getMessage());
+		}
+		finally {
+			TestEnvironment.unsetAsContext();
+
+			if (testMiniCluster != null) {
+				try {
+					testMiniCluster.stop();
+				} catch (Throwable t) {
+					// ignore
+				}
+			}
+		}
+	}
+}


[17/18] flink git commit: [FLINK-8771][build] Upgrade scalastyle to 1.0.0

Posted by ch...@apache.org.
[FLINK-8771][build] Upgrade scalastyle to 1.0.0

This closes #5702.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fea6d5ec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fea6d5ec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fea6d5ec

Branch: refs/heads/master
Commit: fea6d5ec358b360ef40e116faec67f70bc96b3e0
Parents: baa1ad3
Author: Bowen Li <bo...@gmail.com>
Authored: Wed Mar 14 17:26:18 2018 -0700
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 4 09:29:47 2018 +0200

----------------------------------------------------------------------
 .../runtime/jobmanager/SlotSharingITCase.scala  | 32 +++++++++----------
 .../TaskManagerFailsWithSlotSharingITCase.scala | 33 ++++++++++----------
 .../taskmanager/TaskManagerFailsITCase.scala    | 32 +++++++++----------
 pom.xml                                         |  2 +-
 4 files changed, 49 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fea6d5ec/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
index 4fffd68..1c26901 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
@@ -1,20 +1,20 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.flink.runtime.jobmanager
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fea6d5ec/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
index 9775d33..e2702c7 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
@@ -1,26 +1,25 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.flink.runtime.jobmanager
 
 import akka.actor.{ActorSystem, Kill, PoisonPill}
 import akka.testkit.{ImplicitSender, TestKit}
-import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.runtime.akka.ListeningBehaviour
 import org.apache.flink.runtime.client.JobExecutionException
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType

http://git-wip-us.apache.org/repos/asf/flink/blob/fea6d5ec/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
index 61cb8cc..a065e5b 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
@@ -1,20 +1,20 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.flink.api.scala.runtime.taskmanager
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fea6d5ec/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index de0c259..baf4fda 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1429,7 +1429,7 @@ under the License.
 				<plugin>
 					<groupId>org.scalastyle</groupId>
 					<artifactId>scalastyle-maven-plugin</artifactId>
-					<version>0.8.0</version>
+					<version>1.0.0</version>
 					<executions>
 						<execution>
 							<goals>


[15/18] flink git commit: [FLINK-8804][build] Bump flink-shaded-jackson version to 3.0

Posted by ch...@apache.org.
[FLINK-8804][build] Bump flink-shaded-jackson version to 3.0

This closes #5596.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/df719e76
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/df719e76
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/df719e76

Branch: refs/heads/master
Commit: df719e76d3d5ca90e38c26fce0085c861acb935d
Parents: 83fffdf
Author: zentol <ch...@apache.org>
Authored: Wed Feb 28 11:42:21 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 4 09:23:03 2018 +0200

----------------------------------------------------------------------
 flink-dist/pom.xml                              |  7 ---
 flink-docs/pom.xml                              | 18 +-------
 .../flink/docs/rest/RestAPIDocGenerator.java    | 14 +++---
 flink-libraries/flink-sql-client/pom.xml        | 11 +----
 .../flink/table/client/config/ConfigUtil.java   | 10 ++--
 pom.xml                                         | 48 +++++++++++++++++++-
 6 files changed, 63 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/df719e76/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index 44ce9df..24a514a 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -506,13 +506,6 @@ under the License.
 									<exclude>log4j:log4j</exclude>
 								</excludes>
 							</artifactSet>
-							<relocations>
-								<relocation>
-									<!-- relocate jackson services, which isn't done by flink-shaded-jackson -->
-									<pattern>com.fasterxml.jackson</pattern>
-									<shadedPattern>org.apache.flink.shaded.jackson2.com.fasterxml.jackson</shadedPattern>
-								</relocation>
-							</relocations>
 							<transformers>
 								<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
 									<resource>reference.conf</resource>

http://git-wip-us.apache.org/repos/asf/flink/blob/df719e76/flink-docs/pom.xml
----------------------------------------------------------------------
diff --git a/flink-docs/pom.xml b/flink-docs/pom.xml
index 4132066..33d1a2c 100644
--- a/flink-docs/pom.xml
+++ b/flink-docs/pom.xml
@@ -63,23 +63,9 @@ under the License.
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-shaded-netty</artifactId>
 		</dependency>
-
-		<dependency>
-			<!-- We use standard jackson since jackson-module-jsonSchema isn't part of flink-shaded-jackson -->
-			<groupId>com.fasterxml.jackson.core</groupId>
-			<artifactId>jackson-core</artifactId>
-			<version>${jackson.version}</version>
-		</dependency>
 		<dependency>
-			<!-- We use standard jackson since jackson-module-jsonSchema isn't part of flink-shaded-jackson -->
-			<groupId>com.fasterxml.jackson.core</groupId>
-			<artifactId>jackson-databind</artifactId>
-			<version>${jackson.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>com.fasterxml.jackson.module</groupId>
-			<artifactId>jackson-module-jsonSchema</artifactId>
-			<version>${jackson.version}</version>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-shaded-jackson-module-jsonSchema</artifactId>
 		</dependency>
 
 		<dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/df719e76/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
----------------------------------------------------------------------
diff --git a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
index 8ece7b1..2d5ec8f 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
@@ -41,15 +41,15 @@ import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.ConfigurationException;
 
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.SerializableString;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.io.CharacterEscapes;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.io.SerializedString;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.module.jsonSchema.JsonSchema;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.SerializableString;
-import com.fasterxml.jackson.core.io.CharacterEscapes;
-import com.fasterxml.jackson.core.io.SerializedString;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
-import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/df719e76/flink-libraries/flink-sql-client/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/pom.xml b/flink-libraries/flink-sql-client/pom.xml
index 64ae1be..6bcfc13 100644
--- a/flink-libraries/flink-sql-client/pom.xml
+++ b/flink-libraries/flink-sql-client/pom.xml
@@ -99,15 +99,8 @@ under the License.
 
 		<!-- configuration -->
 		<dependency>
-			<groupId>com.fasterxml.jackson.core</groupId>
-			<artifactId>jackson-databind</artifactId>
-			<version>${jackson.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>com.fasterxml.jackson.dataformat</groupId>
-			<artifactId>jackson-dataformat-yaml</artifactId>
-			<version>${jackson.version}</version>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-shaded-jackson</artifactId>
 		</dependency>
 
 		<!-- test dependencies -->

http://git-wip-us.apache.org/repos/asf/flink/blob/df719e76/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/ConfigUtil.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/ConfigUtil.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/ConfigUtil.java
index 87201a6..337d803 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/ConfigUtil.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/ConfigUtil.java
@@ -18,11 +18,11 @@
 
 package org.apache.flink.table.client.config;
 
-import com.fasterxml.jackson.core.JsonToken;
-import com.fasterxml.jackson.core.io.IOContext;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
-import com.fasterxml.jackson.dataformat.yaml.YAMLParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonToken;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.io.IOContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.yaml.YAMLParser;
 
 import java.io.IOException;
 import java.io.InputStream;

http://git-wip-us.apache.org/repos/asf/flink/blob/df719e76/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a9d190e..de0c259 100644
--- a/pom.xml
+++ b/pom.xml
@@ -247,7 +247,53 @@ under the License.
 			<dependency>
 				<groupId>org.apache.flink</groupId>
 				<artifactId>flink-shaded-jackson</artifactId>
-				<version>${jackson.version}-${flink.shaded.version}</version>
+				<!-- We use a newer version since we didn't have to time to do a proper switch to 3.0 -->
+				<version>${jackson.version}-3.0</version>
+				<!-- Dependencies aren't properly hidden in 3.0 -->
+				<exclusions>
+					<exclusion>
+						<groupId>com.fasterxml.jackson.core</groupId>
+						<artifactId>jackson-core</artifactId>
+					</exclusion>
+					<exclusion>
+						<groupId>com.fasterxml.jackson.core</groupId>
+						<artifactId>jackson-annotations</artifactId>
+					</exclusion>
+					<exclusion>
+						<groupId>com.fasterxml.jackson.core</groupId>
+						<artifactId>jackson-databind</artifactId>
+					</exclusion>
+					<exclusion>
+						<groupId>com.fasterxml.jackson.dataformat</groupId>
+						<artifactId>jackson-dataformat-yaml</artifactId>
+					</exclusion>
+				</exclusions>
+			</dependency>
+
+			<dependency>
+				<groupId>org.apache.flink</groupId>
+				<artifactId>flink-shaded-jackson-module-jsonSchema</artifactId>
+				<!-- We use a newer version since we didn't have to time to do a proper switch to 3.0 -->
+				<version>${jackson.version}-3.0</version>
+				<!-- Dependencies aren't properly hidden in 3.0 -->
+				<exclusions>
+					<exclusion>
+						<groupId>com.fasterxml.jackson.core</groupId>
+						<artifactId>jackson-core</artifactId>
+					</exclusion>
+					<exclusion>
+						<groupId>com.fasterxml.jackson.core</groupId>
+						<artifactId>jackson-annotations</artifactId>
+					</exclusion>
+					<exclusion>
+						<groupId>com.fasterxml.jackson.core</groupId>
+						<artifactId>jackson-databind</artifactId>
+					</exclusion>
+					<exclusion>
+						<groupId>com.fasterxml.jackson.module</groupId>
+						<artifactId>jackson-module-jsonSchema</artifactId>
+					</exclusion>
+				</exclusions>
 			</dependency>
 
 			<dependency>


[04/18] flink git commit: [hotfix][tests] Hide output from config.sh

Posted by ch...@apache.org.
[hotfix][tests] Hide output from config.sh

This closes #5763.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/767d79fd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/767d79fd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/767d79fd

Branch: refs/heads/master
Commit: 767d79fd4936777035f62ccd0cb7c06a6e030a4a
Parents: 7c553ba
Author: zentol <ch...@apache.org>
Authored: Mon Mar 26 13:36:24 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 4 08:59:05 2018 +0200

----------------------------------------------------------------------
 flink-dist/src/test/bin/calcTMHeapSizeMB.sh | 2 +-
 flink-dist/src/test/bin/calcTMNetBufMem.sh  | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/767d79fd/flink-dist/src/test/bin/calcTMHeapSizeMB.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/test/bin/calcTMHeapSizeMB.sh b/flink-dist/src/test/bin/calcTMHeapSizeMB.sh
index 3956643..d5b7742 100755
--- a/flink-dist/src/test/bin/calcTMHeapSizeMB.sh
+++ b/flink-dist/src/test/bin/calcTMHeapSizeMB.sh
@@ -37,6 +37,6 @@ if [[ -z "${FLINK_TM_MEM_MANAGED_FRACTION}" ]]; then
 fi
 
 FLINK_CONF_DIR=${bin}/../../main/resources
-. ${bin}/../../main/flink-bin/bin/config.sh
+. ${bin}/../../main/flink-bin/bin/config.sh > /dev/null
 
 calculateTaskManagerHeapSizeMB

http://git-wip-us.apache.org/repos/asf/flink/blob/767d79fd/flink-dist/src/test/bin/calcTMNetBufMem.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/test/bin/calcTMNetBufMem.sh b/flink-dist/src/test/bin/calcTMNetBufMem.sh
index 9948d9c..355a978 100755
--- a/flink-dist/src/test/bin/calcTMNetBufMem.sh
+++ b/flink-dist/src/test/bin/calcTMNetBufMem.sh
@@ -34,6 +34,6 @@ if [[ -z "${FLINK_TM_NET_BUF_MAX}" ]]; then
 fi
 
 FLINK_CONF_DIR=${bin}/../../main/resources
-. ${bin}/../../main/flink-bin/bin/config.sh
+. ${bin}/../../main/flink-bin/bin/config.sh > /dev/null
 
 calculateNetworkBufferMemory


[02/18] flink git commit: [FLINK-8704][tests] Port SlotCountExceedingParallelismTest

Posted by ch...@apache.org.
[FLINK-8704][tests] Port SlotCountExceedingParallelismTest

This closes #5694.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/916fc201
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/916fc201
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/916fc201

Branch: refs/heads/master
Commit: 916fc201c830c4fb7d38972c4368b7829196a194
Parents: 576ba1d
Author: zentol <ch...@apache.org>
Authored: Mon Mar 12 13:52:57 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 4 08:59:05 2018 +0200

----------------------------------------------------------------------
 ...LegacySlotCountExceedingParallelismTest.java | 212 +++++++++++++++++++
 .../SlotCountExceedingParallelismTest.java      |  32 ++-
 2 files changed, 235 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/916fc201/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/LegacySlotCountExceedingParallelismTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/LegacySlotCountExceedingParallelismTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/LegacySlotCountExceedingParallelismTest.java
new file mode 100644
index 0000000..356d94a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/LegacySlotCountExceedingParallelismTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.reader.RecordReader;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.BitSet;
+
+public class LegacySlotCountExceedingParallelismTest extends TestLogger {
+
+	// Test configuration
+	private static final int NUMBER_OF_TMS = 2;
+	private static final int NUMBER_OF_SLOTS_PER_TM = 2;
+	private static final int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
+
+	public static final String JOB_NAME = "SlotCountExceedingParallelismTest (no slot sharing, blocking results)";
+
+	private static TestingCluster flink;
+
+	@BeforeClass
+	public static void setUp() throws Exception {
+		flink = TestingUtils.startTestingCluster(
+				NUMBER_OF_SLOTS_PER_TM,
+				NUMBER_OF_TMS,
+				TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
+	}
+
+	@AfterClass
+	public static void tearDown() throws Exception {
+		if (flink != null) {
+			flink.stop();
+		}
+	}
+
+	@Test
+	public void testNoSlotSharingAndBlockingResultSender() throws Exception {
+		// Sender with higher parallelism than available slots
+		JobGraph jobGraph = createTestJobGraph(JOB_NAME, PARALLELISM * 2, PARALLELISM);
+		submitJobGraphAndWait(jobGraph);
+	}
+
+	@Test
+	public void testNoSlotSharingAndBlockingResultReceiver() throws Exception {
+		// Receiver with higher parallelism than available slots
+		JobGraph jobGraph = createTestJobGraph(JOB_NAME, PARALLELISM, PARALLELISM * 2);
+		submitJobGraphAndWait(jobGraph);
+	}
+
+	@Test
+	public void testNoSlotSharingAndBlockingResultBoth() throws Exception {
+		// Both sender and receiver with higher parallelism than available slots
+		JobGraph jobGraph = createTestJobGraph(JOB_NAME, PARALLELISM * 2, PARALLELISM * 2);
+		submitJobGraphAndWait(jobGraph);
+	}
+
+	// ---------------------------------------------------------------------------------------------
+
+	private void submitJobGraphAndWait(final JobGraph jobGraph) throws JobExecutionException {
+		flink.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION());
+	}
+
+	private JobGraph createTestJobGraph(
+			String jobName,
+			int senderParallelism,
+			int receiverParallelism) {
+
+		// The sender and receiver invokable logic ensure that each subtask gets the expected data
+		final JobVertex sender = new JobVertex("Sender");
+		sender.setInvokableClass(RoundRobinSubtaskIndexSender.class);
+		sender.getConfiguration().setInteger(RoundRobinSubtaskIndexSender.CONFIG_KEY, receiverParallelism);
+		sender.setParallelism(senderParallelism);
+
+		final JobVertex receiver = new JobVertex("Receiver");
+		receiver.setInvokableClass(SubtaskIndexReceiver.class);
+		receiver.getConfiguration().setInteger(SubtaskIndexReceiver.CONFIG_KEY, senderParallelism);
+		receiver.setParallelism(receiverParallelism);
+
+		receiver.connectNewDataSetAsInput(
+				sender,
+				DistributionPattern.ALL_TO_ALL,
+				ResultPartitionType.BLOCKING);
+
+		final JobGraph jobGraph = new JobGraph(jobName, sender, receiver);
+
+		// We need to allow queued scheduling, because there are not enough slots available
+		// to run all tasks at once. We queue tasks and then let them finish/consume the blocking
+		// result one after the other.
+		jobGraph.setAllowQueuedScheduling(true);
+
+		return jobGraph;
+	}
+
+	/**
+	 * Sends the subtask index a configurable number of times in a round-robin fashion.
+	 */
+	public static class RoundRobinSubtaskIndexSender extends AbstractInvokable {
+
+		public static final String CONFIG_KEY = "number-of-times-to-send";
+
+		public RoundRobinSubtaskIndexSender(Environment environment) {
+			super(environment);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			RecordWriter<IntValue> writer = new RecordWriter<>(getEnvironment().getWriter(0));
+			final int numberOfTimesToSend = getTaskConfiguration().getInteger(CONFIG_KEY, 0);
+
+			final IntValue subtaskIndex = new IntValue(
+					getEnvironment().getTaskInfo().getIndexOfThisSubtask());
+
+			try {
+				for (int i = 0; i < numberOfTimesToSend; i++) {
+					writer.emit(subtaskIndex);
+				}
+				writer.flushAll();
+			}
+			finally {
+				writer.clearBuffers();
+			}
+		}
+	}
+
+	/**
+	 * Expects to receive the subtask index from a configurable number of sender tasks.
+	 */
+	public static class SubtaskIndexReceiver extends AbstractInvokable {
+
+		public static final String CONFIG_KEY = "number-of-indexes-to-receive";
+
+		public SubtaskIndexReceiver(Environment environment) {
+			super(environment);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			RecordReader<IntValue> reader = new RecordReader<>(
+					getEnvironment().getInputGate(0),
+					IntValue.class,
+					getEnvironment().getTaskManagerInfo().getTmpDirectories());
+
+			try {
+				final int numberOfSubtaskIndexesToReceive = getTaskConfiguration().getInteger(CONFIG_KEY, 0);
+				final BitSet receivedSubtaskIndexes = new BitSet(numberOfSubtaskIndexesToReceive);
+
+				IntValue record;
+
+				int numberOfReceivedSubtaskIndexes = 0;
+
+				while ((record = reader.next()) != null) {
+					// Check that we don't receive more than expected
+					numberOfReceivedSubtaskIndexes++;
+
+					if (numberOfReceivedSubtaskIndexes > numberOfSubtaskIndexesToReceive) {
+						throw new IllegalStateException("Received more records than expected.");
+					}
+
+					int subtaskIndex = record.getValue();
+
+					// Check that we only receive each subtask index once
+					if (receivedSubtaskIndexes.get(subtaskIndex)) {
+						throw new IllegalStateException("Received expected subtask index twice.");
+					}
+					else {
+						receivedSubtaskIndexes.set(subtaskIndex, true);
+					}
+				}
+
+				// Check that we have received all expected subtask indexes
+				if (receivedSubtaskIndexes.cardinality() != numberOfSubtaskIndexesToReceive) {
+					throw new IllegalStateException("Finished receive, but did not receive "
+							+ "all expected subtask indexes.");
+				}
+			}
+			finally {
+				reader.clearBuffers();
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/916fc201/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
index cba80aa..2f3f555 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.jobmanager;
 
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.reader.RecordReader;
@@ -27,17 +29,21 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.testutils.category.New;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import java.util.BitSet;
 
+@Category(New.class)
 public class SlotCountExceedingParallelismTest extends TestLogger {
 
 	// Test configuration
@@ -47,20 +53,28 @@ public class SlotCountExceedingParallelismTest extends TestLogger {
 
 	public static final String JOB_NAME = "SlotCountExceedingParallelismTest (no slot sharing, blocking results)";
 
-	private static TestingCluster flink;
+	private static MiniCluster flink;
 
 	@BeforeClass
 	public static void setUp() throws Exception {
-		flink = TestingUtils.startTestingCluster(
-				NUMBER_OF_SLOTS_PER_TM,
-				NUMBER_OF_TMS,
-				TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
+		final Configuration config = new Configuration();
+		config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
+
+		final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
+			.setConfiguration(config)
+			.setNumTaskManagers(NUMBER_OF_TMS)
+			.setNumSlotsPerTaskManager(NUMBER_OF_SLOTS_PER_TM)
+			.build();
+
+		flink = new MiniCluster(miniClusterConfiguration);
+
+		flink.start();
 	}
 
 	@AfterClass
 	public static void tearDown() throws Exception {
 		if (flink != null) {
-			flink.stop();
+			flink.close();
 		}
 	}
 
@@ -87,8 +101,8 @@ public class SlotCountExceedingParallelismTest extends TestLogger {
 
 	// ---------------------------------------------------------------------------------------------
 
-	private void submitJobGraphAndWait(final JobGraph jobGraph) throws JobExecutionException {
-		flink.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION());
+	private void submitJobGraphAndWait(final JobGraph jobGraph) throws JobExecutionException, InterruptedException {
+		flink.executeJobBlocking(jobGraph);
 	}
 
 	private JobGraph createTestJobGraph(


[12/18] flink git commit: [FLINK-9104][docs] Update generator and regenerate REST API docs

Posted by ch...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/83fffdf3/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
----------------------------------------------------------------------
diff --git a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
index be5f677..8ece7b1 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
@@ -20,6 +20,7 @@ package org.apache.flink.docs.rest;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
@@ -43,6 +44,9 @@ import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.SerializableString;
+import com.fasterxml.jackson.core.io.CharacterEscapes;
+import com.fasterxml.jackson.core.io.SerializedString;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
 import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator;
@@ -56,7 +60,9 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Collection;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -99,6 +105,7 @@ public class RestAPIDocGenerator {
 
 	static {
 		mapper = new ObjectMapper();
+		mapper.getFactory().setCharacterEscapes(new HTMLCharacterEscapes());
 		schemaGen = new JsonSchemaGenerator(mapper);
 	}
 
@@ -259,6 +266,39 @@ public class RestAPIDocGenerator {
 	}
 
 	/**
+	 * Create character escapes for HTML when generating JSON request/response string.
+	 *
+	 * <p>This is to avoid exception when generating JSON with Field schema contains generic types.
+	 */
+	private static class HTMLCharacterEscapes extends CharacterEscapes {
+		private final int[] asciiEscapes;
+		private final Map<Integer, SerializableString> escapeSequences;
+
+		public HTMLCharacterEscapes() {
+			int[] esc = CharacterEscapes.standardAsciiEscapesForJSON();
+			esc['<'] = CharacterEscapes.ESCAPE_CUSTOM;
+			esc['>'] = CharacterEscapes.ESCAPE_CUSTOM;
+			esc['&'] = CharacterEscapes.ESCAPE_CUSTOM;
+			Map<Integer, SerializableString> escMap = new HashMap<>();
+			escMap.put((int) '<', new SerializedString("&lt;"));
+			escMap.put((int) '>', new SerializedString("&gt;"));
+			escMap.put((int) '&', new SerializedString("&amp;"));
+			asciiEscapes = esc;
+			escapeSequences = escMap;
+		}
+
+		@Override
+		public int[] getEscapeCodesForAscii() {
+			return asciiEscapes;
+		}
+
+		@Override
+		public SerializableString getEscapeSequence(int i) {
+			return escapeSequences.getOrDefault(i, null);
+		}
+	}
+
+	/**
 	 * Utility class to extract the {@link MessageHeaders} that the {@link DispatcherRestEndpoint} supports.
 	 */
 	private static class DocumentingDispatcherRestEndpoint extends DispatcherRestEndpoint implements DocumentingRestEndpoint {
@@ -273,6 +313,7 @@ public class RestAPIDocGenerator {
 
 		static {
 			config = new Configuration();
+			config.setString(RestOptions.REST_ADDRESS, "localhost");
 			try {
 				restConfig = RestServerEndpointConfiguration.fromConfiguration(config);
 			} catch (ConfigurationException e) {


[08/18] flink git commit: [FLINK-8703][tests] Port CancelingTestBase to MiniClusterResource

Posted by ch...@apache.org.
[FLINK-8703][tests] Port CancelingTestBase to MiniClusterResource

This closes #5664.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/20bda911
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/20bda911
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/20bda911

Branch: refs/heads/master
Commit: 20bda911067dc2e2503e4447e498e2ff8731dada
Parents: 916fc20
Author: zentol <ch...@apache.org>
Authored: Mon Feb 26 15:36:37 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 4 08:59:05 2018 +0200

----------------------------------------------------------------------
 .../test/cancelling/CancelingTestBase.java      | 133 ++++++-------------
 .../test/cancelling/JoinCancelingITCase.java    |   9 +-
 .../test/cancelling/MapCancelingITCase.java     |   7 +-
 3 files changed, 45 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/20bda911/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
index 03ca649..cac16f0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.test.cancelling;
 
+import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.Plan;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
@@ -28,150 +29,100 @@ import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
+import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.util.TestLogger;
 
-import org.apache.hadoop.fs.FileSystem;
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.junit.ClassRule;
 
 import java.util.concurrent.TimeUnit;
 
-import scala.concurrent.Await;
-import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
-import static org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
-import static org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
-import static org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
-
 /**
  * Base class for testing job cancellation.
  */
 public abstract class CancelingTestBase extends TestLogger {
 
-	private static final Logger LOG = LoggerFactory.getLogger(CancelingTestBase.class);
-
 	private static final int MINIMUM_HEAP_SIZE_MB = 192;
 
-	/**
-	 * Defines the number of seconds after which an issued cancel request is expected to have taken effect (i.e. the job
-	 * is canceled), starting from the point in time when the cancel request is issued.
-	 */
-	private static final int DEFAULT_CANCEL_FINISHED_INTERVAL = 10 * 1000;
-
-	private static final int DEFAULT_TASK_MANAGER_NUM_SLOTS = 1;
+	protected static final int PARALLELISM = 4;
 
 	// --------------------------------------------------------------------------------------------
 
-	protected LocalFlinkMiniCluster executor;
-
-	protected int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
+	@ClassRule
+	public static final MiniClusterResource CLUSTER = new MiniClusterResource(
+		new MiniClusterResource.MiniClusterResourceConfiguration(
+			getConfiguration(),
+			2,
+			4),
+		true);
 
 	// --------------------------------------------------------------------------------------------
 
-	private void verifyJvmOptions() {
+	private static void verifyJvmOptions() {
 		final long heap = Runtime.getRuntime().maxMemory() >> 20;
 		Assert.assertTrue("Insufficient java heap space " + heap + "mb - set JVM option: -Xmx" + MINIMUM_HEAP_SIZE_MB
 				+ "m", heap > MINIMUM_HEAP_SIZE_MB - 50);
 	}
 
-	@Before
-	public void startCluster() throws Exception {
+	private static Configuration getConfiguration() {
 		verifyJvmOptions();
 		Configuration config = new Configuration();
 		config.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true);
-		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
-		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
 		config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
 		config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096);
 		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 2048);
 
-		this.executor = new LocalFlinkMiniCluster(config, false);
-		this.executor.start();
-	}
-
-	@After
-	public void stopCluster() throws Exception {
-		if (this.executor != null) {
-			this.executor.stop();
-			this.executor = null;
-			FileSystem.closeAll();
-			System.gc();
-		}
+		return config;
 	}
 
 	// --------------------------------------------------------------------------------------------
 
-	public void runAndCancelJob(Plan plan, int msecsTillCanceling) throws Exception {
-		runAndCancelJob(plan, msecsTillCanceling, DEFAULT_CANCEL_FINISHED_INTERVAL);
-	}
-
-	public void runAndCancelJob(Plan plan, final int msecsTillCanceling, int maxTimeTillCanceled) throws Exception {
-		try {
-			// submit job
-			final JobGraph jobGraph = getJobGraph(plan);
-
-			executor.submitJobDetached(jobGraph);
+	protected void runAndCancelJob(Plan plan, final int msecsTillCanceling, int maxTimeTillCanceled) throws Exception {
+		// submit job
+		final JobGraph jobGraph = getJobGraph(plan);
 
-			// Wait for the job to make some progress and then cancel
-			JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.RUNNING,
-					executor.getLeaderGateway(TestingUtils.TESTING_DURATION()),
-					TestingUtils.TESTING_DURATION());
+		ClusterClient<?> client = CLUSTER.getClusterClient();
+		client.setDetached(true);
 
-			Thread.sleep(msecsTillCanceling);
+		JobSubmissionResult jobSubmissionResult = client.submitJob(jobGraph, CancelingTestBase.class.getClassLoader());
 
-			FiniteDuration timeout = new FiniteDuration(maxTimeTillCanceled, TimeUnit.MILLISECONDS);
+		Deadline submissionDeadLine = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
 
-			ActorGateway jobManager = executor.getLeaderGateway(TestingUtils.TESTING_DURATION());
+		JobStatus jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(submissionDeadLine.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+		while (jobStatus != JobStatus.RUNNING && submissionDeadLine.hasTimeLeft()) {
+			Thread.sleep(50);
+			jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(submissionDeadLine.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+		}
+		if (jobStatus != JobStatus.RUNNING) {
+			Assert.fail("Job not in state RUNNING.");
+		}
 
-			Future<Object> ask = jobManager.ask(new CancelJob(jobGraph.getJobID()), timeout);
+		Thread.sleep(msecsTillCanceling);
 
-			Object result = Await.result(ask, timeout);
+		client.cancel(jobSubmissionResult.getJobID());
 
-			if (result instanceof CancellationSuccess) {
-				// all good
-			} else if (result instanceof CancellationFailure) {
-				// Failure
-				CancellationFailure failure = (CancellationFailure) result;
-				throw new Exception("Failed to cancel job with ID " + failure.jobID() + ".",
-						failure.cause());
-			} else {
-				throw new Exception("Unexpected response to cancel request: " + result);
-			}
+		Deadline cancelDeadline = new FiniteDuration(maxTimeTillCanceled, TimeUnit.MILLISECONDS).fromNow();
 
-			// Wait for the job to be cancelled
-			JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.CANCELED,
-					executor.getLeaderGateway(TestingUtils.TESTING_DURATION()),
-					TestingUtils.TESTING_DURATION());
+		JobStatus jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(cancelDeadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+		while (jobStatusAfterCancel != JobStatus.CANCELED && cancelDeadline.hasTimeLeft()) {
+			Thread.sleep(50);
+			jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(cancelDeadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 		}
-		catch (Exception e) {
-			LOG.error("Exception found in runAndCancelJob.", e);
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
+		if (jobStatusAfterCancel != JobStatus.CANCELED) {
+			Assert.fail("Failed to cancel job with ID " + jobSubmissionResult.getJobID() + '.');
 		}
 	}
 
-	private JobGraph getJobGraph(final Plan plan) throws Exception {
-		final Optimizer pc = new Optimizer(new DataStatistics(), this.executor.configuration());
+	private JobGraph getJobGraph(final Plan plan) {
+		final Optimizer pc = new Optimizer(new DataStatistics(), getConfiguration());
 		final OptimizedPlan op = pc.compile(plan);
 		final JobGraphGenerator jgg = new JobGraphGenerator();
 		return jgg.compileJobGraph(op);
 	}
-
-	public void setTaskManagerNumSlots(int taskManagerNumSlots) {
-		this.taskManagerNumSlots = taskManagerNumSlots;
-	}
-
-	public int getTaskManagerNumSlots() {
-		return this.taskManagerNumSlots;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/20bda911/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
index 5e21129..66919e7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
@@ -34,15 +34,10 @@ import org.apache.flink.test.util.UniformIntTupleGeneratorInputFormat;
  * Test job cancellation from within a JoinFunction.
  */
 public class JoinCancelingITCase extends CancelingTestBase {
-	private static final int parallelism = 4;
-
-	public JoinCancelingITCase() {
-		setTaskManagerNumSlots(parallelism);
-	}
 
 	// --------------- Test Sort Matches that are canceled while still reading / sorting -----------------
 	private void executeTask(JoinFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> joiner, boolean slow) throws Exception {
-		executeTask(joiner, slow, parallelism);
+		executeTask(joiner, slow, PARALLELISM);
 	}
 
 	private void executeTask(JoinFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> joiner, boolean slow, int parallelism) throws Exception {
@@ -90,7 +85,7 @@ public class JoinCancelingITCase extends CancelingTestBase {
 				.with(joiner)
 				.output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
 
-		env.setParallelism(parallelism);
+		env.setParallelism(PARALLELISM);
 
 		runAndCancelJob(env.createProgramPlan(), msecsTillCanceling, maxTimeTillCanceled);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/20bda911/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
index 3a7039f..13edea4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
@@ -31,11 +31,6 @@ import org.junit.Test;
  * Test job cancellation from within a MapFunction.
  */
 public class MapCancelingITCase extends CancelingTestBase {
-	private static final int parallelism = 4;
-
-	public MapCancelingITCase() {
-		setTaskManagerNumSlots(parallelism);
-	}
 
 	@Test
 	public void testMapCancelling() throws Exception {
@@ -65,7 +60,7 @@ public class MapCancelingITCase extends CancelingTestBase {
 				.map(mapper)
 				.output(new DiscardingOutputFormat<Integer>());
 
-		env.setParallelism(parallelism);
+		env.setParallelism(PARALLELISM);
 
 		runAndCancelJob(env.createProgramPlan(), 5 * 1000, 10 * 1000);
 	}


[07/18] flink git commit: [hotfix][tests] Properly disable JoinCancelingITCase

Posted by ch...@apache.org.
[hotfix][tests] Properly disable JoinCancelingITCase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f0a6ff76
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f0a6ff76
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f0a6ff76

Branch: refs/heads/master
Commit: f0a6ff76b732eb9fac6f204525080d04dcd8d8ea
Parents: 20bda91
Author: zentol <ch...@apache.org>
Authored: Wed Feb 28 13:43:42 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 4 08:59:05 2018 +0200

----------------------------------------------------------------------
 .../test/cancelling/JoinCancelingITCase.java      | 18 +++++++++++-------
 1 file changed, 11 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f0a6ff76/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
index 66919e7..7288ba5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
@@ -30,9 +30,13 @@ import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator;
 import org.apache.flink.test.util.InfiniteIntegerTupleInputFormat;
 import org.apache.flink.test.util.UniformIntTupleGeneratorInputFormat;
 
+import org.junit.Ignore;
+import org.junit.Test;
+
 /**
  * Test job cancellation from within a JoinFunction.
  */
+@Ignore("Takes too long.")
 public class JoinCancelingITCase extends CancelingTestBase {
 
 	// --------------- Test Sort Matches that are canceled while still reading / sorting -----------------
@@ -56,17 +60,17 @@ public class JoinCancelingITCase extends CancelingTestBase {
 		runAndCancelJob(env.createProgramPlan(), 5 * 1000, 10 * 1000);
 	}
 
-//	@Test
+	@Test
 	public void testCancelSortMatchWhileReadingSlowInputs() throws Exception {
 		executeTask(new SimpleMatcher<Integer>(), true);
 	}
 
-//	@Test
+	@Test
 	public void testCancelSortMatchWhileReadingFastInputs() throws Exception {
 		executeTask(new SimpleMatcher<Integer>(), false);
 	}
 
-//	@Test
+	@Test
 	public void testCancelSortMatchPriorToFirstRecordReading() throws Exception {
 		executeTask(new StuckInOpenMatcher<Integer>(), false);
 	}
@@ -90,26 +94,26 @@ public class JoinCancelingITCase extends CancelingTestBase {
 		runAndCancelJob(env.createProgramPlan(), msecsTillCanceling, maxTimeTillCanceled);
 	}
 
-//	@Test
+	@Test
 	public void testCancelSortMatchWhileDoingHeavySorting() throws Exception {
 		executeTaskWithGenerator(new SimpleMatcher<Integer>(), 50000, 100, 30 * 1000, 30 * 1000);
 	}
 
 	// --------------- Test Sort Matches that are canceled while in the Matching Phase -----------------
 
-//	@Test
+	@Test
 	public void testCancelSortMatchWhileJoining() throws Exception {
 		executeTaskWithGenerator(new DelayingMatcher<Integer>(), 500, 3, 10 * 1000, 20 * 1000);
 	}
 
-//	@Test
+	@Test
 	public void testCancelSortMatchWithLongCancellingResponse() throws Exception {
 		executeTaskWithGenerator(new LongCancelTimeMatcher<Integer>(), 500, 3, 10 * 1000, 10 * 1000);
 	}
 
 	// -------------------------------------- Test System corner cases ---------------------------------
 
-//	@Test
+	@Test
 	public void testCancelSortMatchWithHighparallelism() throws Exception {
 		executeTask(new SimpleMatcher<Integer>(), false, 64);
 	}


[09/18] flink git commit: [FLINK-9042][tests] Port ResumeCheckpointManuallyITCase to flip6

Posted by ch...@apache.org.
[FLINK-9042][tests] Port ResumeCheckpointManuallyITCase to flip6

This closes #5736.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7c553ba4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7c553ba4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7c553ba4

Branch: refs/heads/master
Commit: 7c553ba45b44145ea09e4d9ccb0bdf64df7ee076
Parents: db366cd
Author: zentol <ch...@apache.org>
Authored: Wed Mar 21 13:31:56 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 4 08:59:05 2018 +0200

----------------------------------------------------------------------
 .../ResumeCheckpointManuallyITCase.java         | 146 +++++++++++++------
 1 file changed, 104 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7c553ba4/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
index 537f864..add4243 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
@@ -18,25 +18,26 @@
 
 package org.apache.flink.test.checkpointing;
 
-import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
-import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.test.state.ManualWindowSpeedITCase;
+import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.curator.test.TestingServer;
@@ -44,9 +45,17 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import javax.annotation.Nullable;
+
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertNotNull;
 
 /**
  * IT case for resuming from checkpoints manually via their external pointer, rather than automatic
@@ -240,14 +249,10 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
 
 		final Configuration config = new Configuration();
 
-		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
-		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, SLOTS_PER_TASK_MANAGER);
-
 		final File savepointDir = temporaryFolder.newFolder();
 
 		config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
 		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
-		config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
 
 		if (localRecovery) {
 			config.setString(
@@ -263,56 +268,113 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
 			config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haDir.toURI().toString());
 		}
 
-		TestingCluster cluster = new TestingCluster(config);
-		cluster.start();
+		MiniClusterResource cluster = new MiniClusterResource(
+			new MiniClusterResource.MiniClusterResourceConfiguration(
+				config,
+				NUM_TASK_MANAGERS,
+				SLOTS_PER_TASK_MANAGER),
+			true);
+
+		cluster.before();
 
-		String externalCheckpoint = null;
+		ClusterClient<?> client = cluster.getClusterClient();
+		client.setDetached(true);
 
 		try {
+			// main test sequence:  start job -> eCP -> restore job -> eCP -> restore job
+			String firstExternalCheckpoint = runJobAndGetExternalizedCheckpoint(backend, checkpointDir, null, client);
+			assertNotNull(firstExternalCheckpoint);
+
+			String secondExternalCheckpoint = runJobAndGetExternalizedCheckpoint(backend, checkpointDir, firstExternalCheckpoint, client);
+			assertNotNull(secondExternalCheckpoint);
 
-			// main test sequence:  start job -> eCP -> restore job -> eCP -> restore job -> eCP
-			for (int i = 0; i < 3; ++i) {
-				final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+			String thirdExternalCheckpoint = runJobAndGetExternalizedCheckpoint(backend, checkpointDir, secondExternalCheckpoint, client);
+			assertNotNull(thirdExternalCheckpoint);
+		} finally {
+			cluster.after();
+		}
+	}
 
-				env.setStateBackend(backend);
-				env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
-				env.setParallelism(PARALLELISM);
+	private static String runJobAndGetExternalizedCheckpoint(StateBackend backend, File checkpointDir, @Nullable String externalCheckpoint, ClusterClient<?> client) throws Exception {
+		JobGraph initialJobGraph = getJobGraph(backend, externalCheckpoint);
+		NotifyingInfiniteTupleSource.countDownLatch = new CountDownLatch(PARALLELISM);
 
-				// initialize count down latch
-				NotifyingInfiniteTupleSource.countDownLatch = new CountDownLatch(PARALLELISM);
+		client.submitJob(initialJobGraph, ResumeCheckpointManuallyITCase.class.getClassLoader());
 
-				env.addSource(new NotifyingInfiniteTupleSource(10_000))
-					.keyBy(0)
-					.timeWindow(Time.seconds(3))
-					.reduce((value1, value2) -> Tuple2.of(value1.f0, value1.f1 + value2.f1))
-					.filter(value -> value.f0.startsWith("Tuple 0"));
+		// wait until all sources have been started
+		NotifyingInfiniteTupleSource.countDownLatch.await();
 
-				StreamGraph streamGraph = env.getStreamGraph();
-				streamGraph.setJobName("Test");
+		waitUntilExternalizedCheckpointCreated(checkpointDir, initialJobGraph.getJobID());
+		client.cancel(initialJobGraph.getJobID());
+		waitUntilCanceled(initialJobGraph.getJobID(), client);
 
-				JobGraph jobGraph = streamGraph.getJobGraph();
+		return getExternalizedCheckpointCheckpointPath(checkpointDir, initialJobGraph.getJobID());
+	}
+
+	private static String getExternalizedCheckpointCheckpointPath(File checkpointDir, JobID jobId) throws IOException {
+		Optional<Path> checkpoint = findExternalizedCheckpoint(checkpointDir, jobId);
+		if (!checkpoint.isPresent()) {
+			throw new AssertionError("No complete checkpoint could be found.");
+		} else {
+			return checkpoint.get().toString();
+		}
+	}
 
-				// recover from previous iteration?
-				if (externalCheckpoint != null) {
-					jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(externalCheckpoint));
+	private static void waitUntilExternalizedCheckpointCreated(File checkpointDir, JobID jobId) throws InterruptedException, IOException {
+		while (true) {
+			Thread.sleep(50);
+			Optional<Path> externalizedCheckpoint = findExternalizedCheckpoint(checkpointDir, jobId);
+			if (externalizedCheckpoint.isPresent()) {
+				break;
+			}
+		}
+	}
+
+	private static Optional<Path> findExternalizedCheckpoint(File checkpointDir, JobID jobId) throws IOException {
+		return Files.list(checkpointDir.toPath().resolve(jobId.toString()))
+			.filter(path -> path.getFileName().toString().startsWith("chk-"))
+			.filter(path -> {
+				try {
+					return Files.list(path).anyMatch(child -> child.getFileName().toString().contains("meta"));
+				} catch (IOException ignored) {
+					return false;
 				}
+			})
+			.findAny();
+	}
 
-				config.addAll(jobGraph.getJobConfiguration());
-				JobSubmissionResult submissionResult = cluster.submitJobDetached(jobGraph);
+	private static void waitUntilCanceled(JobID jobId, ClusterClient<?> client) throws ExecutionException, InterruptedException {
+		while (client.getJobStatus(jobId).get() != JobStatus.CANCELLING) {
+			Thread.sleep(50);
+		}
+	}
 
-				// wait until all sources have been started
-				NotifyingInfiniteTupleSource.countDownLatch.await();
+	private static JobGraph getJobGraph(StateBackend backend, @Nullable String externalCheckpoint) {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
-				externalCheckpoint = cluster.requestCheckpoint(
-						submissionResult.getJobID(),
-						CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION);
+		env.enableCheckpointing(500);
+		env.setStateBackend(backend);
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+		env.setParallelism(PARALLELISM);
+		env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 
-				cluster.cancelJob(submissionResult.getJobID());
-			}
-		} finally {
-			cluster.stop();
-			cluster.awaitTermination();
+		env.addSource(new NotifyingInfiniteTupleSource(10_000))
+			.keyBy(0)
+			.timeWindow(Time.seconds(3))
+			.reduce((value1, value2) -> Tuple2.of(value1.f0, value1.f1 + value2.f1))
+			.filter(value -> value.f0.startsWith("Tuple 0"));
+
+		StreamGraph streamGraph = env.getStreamGraph();
+		streamGraph.setJobName("Test");
+
+		JobGraph jobGraph = streamGraph.getJobGraph();
+
+		// recover from previous iteration?
+		if (externalCheckpoint != null) {
+			jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(externalCheckpoint));
 		}
+
+		return jobGraph;
 	}
 
 	/**


[18/18] flink git commit: +ScheduleOrUpdateConsumersTest

Posted by ch...@apache.org.
+ScheduleOrUpdateConsumersTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6ffe22db
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6ffe22db
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6ffe22db

Branch: refs/heads/master
Commit: 6ffe22db86f0846c7d3f1e7d7ff77eaa61483701
Parents: fea6d5e
Author: zentol <ch...@apache.org>
Authored: Wed Apr 4 11:43:55 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 4 11:43:55 2018 +0200

----------------------------------------------------------------------
 .../jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java      | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6ffe22db/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
index c743a63..e16c3ed 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
@@ -31,7 +31,7 @@ import org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.testutils.category.New;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.util.TestLogger;
 
@@ -46,7 +46,7 @@ import java.util.List;
 
 import static org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest.SubtaskIndexReceiver.CONFIG_KEY;
 
-@Category(Flip6.class)
+@Category(New.class)
 public class ScheduleOrUpdateConsumersTest extends TestLogger {
 
 	private static final int NUMBER_OF_TMS = 2;


[13/18] flink git commit: [FLINK-9104][docs] Update generator and regenerate REST API docs

Posted by ch...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/83fffdf3/docs/_includes/generated/rest_dispatcher.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/rest_dispatcher.html b/docs/_includes/generated/rest_dispatcher.html
index 25bdff1..e02c232 100644
--- a/docs/_includes/generated/rest_dispatcher.html
+++ b/docs/_includes/generated/rest_dispatcher.html
@@ -12,8 +12,8 @@
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#-1881098330">Request</button>
-        <div id="-1881098330" class="collapse">
+        <button data-toggle="collapse" data-target="#1124793457">Request</button>
+        <div id="1124793457" class="collapse">
           <pre>
             <code>
 {}            </code>
@@ -23,18 +23,18 @@
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#-574888474">Response</button>
-        <div id="-574888474" class="collapse">
+        <button data-toggle="collapse" data-target="#-1863963983">Response</button>
+        <div id="-1863963983" class="collapse">
           <pre>
             <code>
-{
-  "type" : "object",
-  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:BlobServerPortResponseBody",
-  "properties" : {
-    "port" : {
-      "type" : "integer"
-    }
-  }
+{
+  "type" : "object",
+  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:BlobServerPortResponseBody",
+  "properties" : {
+    "port" : {
+      "type" : "integer"
+    }
+  }
 }            </code>
           </pre>
          </div>
@@ -56,8 +56,8 @@
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#-1357269034">Request</button>
-        <div id="-1357269034" class="collapse">
+        <button data-toggle="collapse" data-target="#1648622753">Request</button>
+        <div id="1648622753" class="collapse">
           <pre>
             <code>
 {}            </code>
@@ -67,30 +67,30 @@
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#-642531372">Response</button>
-        <div id="-642531372" class="collapse">
+        <button data-toggle="collapse" data-target="#-1931606881">Response</button>
+        <div id="-1931606881" class="collapse">
           <pre>
             <code>
-{
-  "type" : "object",
-  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:DashboardConfiguration",
-  "properties" : {
-    "refreshInterval" : {
-      "type" : "integer"
-    },
-    "timeZoneName" : {
-      "type" : "string"
-    },
-    "timeZoneOffset" : {
-      "type" : "integer"
-    },
-    "flinkVersion" : {
-      "type" : "string"
-    },
-    "flinkRevision" : {
-      "type" : "string"
-    }
-  }
+{
+  "type" : "object",
+  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:DashboardConfiguration",
+  "properties" : {
+    "refreshInterval" : {
+      "type" : "integer"
+    },
+    "timeZoneName" : {
+      "type" : "string"
+    },
+    "timeZoneOffset" : {
+      "type" : "integer"
+    },
+    "flinkVersion" : {
+      "type" : "string"
+    },
+    "flinkRevision" : {
+      "type" : "string"
+    }
+  }
 }            </code>
           </pre>
          </div>
@@ -112,8 +112,8 @@
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#-1353094089">Request</button>
-        <div id="-1353094089" class="collapse">
+        <button data-toggle="collapse" data-target="#1652797698">Request</button>
+        <div id="1652797698" class="collapse">
           <pre>
             <code>
 {}            </code>
@@ -123,24 +123,24 @@
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#-1672532771">Response</button>
-        <div id="-1672532771" class="collapse">
+        <button data-toggle="collapse" data-target="#1333359016">Response</button>
+        <div id="1333359016" class="collapse">
           <pre>
             <code>
-{
-  "type" : "array",
-  "items" : {
-    "type" : "object",
-    "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ClusterConfigurationInfoEntry",
-    "properties" : {
-      "key" : {
-        "type" : "string"
-      },
-      "value" : {
-        "type" : "string"
-      }
-    }
-  }
+{
+  "type" : "array",
+  "items" : {
+    "type" : "object",
+    "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ClusterConfigurationInfoEntry",
+    "properties" : {
+      "key" : {
+        "type" : "string"
+      },
+      "value" : {
+        "type" : "string"
+      }
+    }
+  }
 }            </code>
           </pre>
          </div>
@@ -172,8 +172,8 @@
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#-1667487464">Request</button>
-        <div id="-1667487464" class="collapse">
+        <button data-toggle="collapse" data-target="#1338404323">Request</button>
+        <div id="1338404323" class="collapse">
           <pre>
             <code>
 {}            </code>
@@ -183,30 +183,30 @@
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#-1356759103">Response</button>
-        <div id="-1356759103" class="collapse">
+        <button data-toggle="collapse" data-target="#1649132684">Response</button>
+        <div id="1649132684" class="collapse">
           <pre>
             <code>
-{
-  "type" : "object",
-  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:MetricCollectionResponseBody",
-  "properties" : {
-    "metrics" : {
-      "type" : "array",
-      "items" : {
-        "type" : "object",
-        "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:Metric",
-        "properties" : {
-          "id" : {
-            "type" : "string"
-          },
-          "value" : {
-            "type" : "string"
-          }
-        }
-      }
-    }
-  }
+{
+  "type" : "object",
+  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:MetricCollectionResponseBody",
+  "properties" : {
+    "metrics" : {
+      "type" : "array",
+      "items" : {
+        "type" : "object",
+        "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:Metric",
+        "properties" : {
+          "id" : {
+            "type" : "string"
+          },
+          "value" : {
+            "type" : "string"
+          }
+        }
+      }
+    }
+  }
 }            </code>
           </pre>
          </div>
@@ -228,8 +228,8 @@
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#1281196202">Request</button>
-        <div id="1281196202" class="collapse">
+        <button data-toggle="collapse" data-target="#-7879307">Request</button>
+        <div id="-7879307" class="collapse">
           <pre>
             <code>
 {}            </code>
@@ -239,46 +239,46 @@
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#1630160880">Response</button>
-        <div id="1630160880" class="collapse">
-          <pre>
-            <code>
-{
-  "type" : "object",
-  "id" : "urn:jsonschema:org:apache:flink:runtime:messages:webmonitor:JobIdsWithStatusOverview",
-  "properties" : {
-    "jobsWithStatus" : {
-      "type" : "array",
-      "items" : {
-        "type" : "object",
-        "id" : "urn:jsonschema:org:apache:flink:runtime:messages:webmonitor:JobIdsWithStatusOverview:JobIdWithStatus",
-        "properties" : {
-          "jobId" : {
-            "type" : "object",
-            "id" : "urn:jsonschema:org:apache:flink:api:common:JobID",
-            "properties" : {
-              "upperPart" : {
-                "type" : "integer"
-              },
-              "lowerPart" : {
-                "type" : "integer"
-              },
-              "bytes" : {
-                "type" : "array",
-                "items" : {
-                  "type" : "integer"
-                }
-              }
-            }
-          },
-          "jobStatus" : {
-            "type" : "string",
-            "enum" : [ "CREATED", "RUNNING", "FAILING", "FAILED", "CANCELLING", "CANCELED", "FINISHED", "RESTARTING", "SUSPENDED", "RECONCILING" ]
-          }
-        }
-      }
-    }
-  }
+        <button data-toggle="collapse" data-target="#341085371">Response</button>
+        <div id="341085371" class="collapse">
+          <pre>
+            <code>
+{
+  "type" : "object",
+  "id" : "urn:jsonschema:org:apache:flink:runtime:messages:webmonitor:JobIdsWithStatusOverview",
+  "properties" : {
+    "jobsWithStatus" : {
+      "type" : "array",
+      "items" : {
+        "type" : "object",
+        "id" : "urn:jsonschema:org:apache:flink:runtime:messages:webmonitor:JobIdsWithStatusOverview:JobIdWithStatus",
+        "properties" : {
+          "jobId" : {
+            "type" : "object",
+            "id" : "urn:jsonschema:org:apache:flink:api:common:JobID",
+            "properties" : {
+              "upperPart" : {
+                "type" : "integer"
+              },
+              "lowerPart" : {
+                "type" : "integer"
+              },
+              "bytes" : {
+                "type" : "array",
+                "items" : {
+                  "type" : "integer"
+                }
+              }
+            }
+          },
+          "jobStatus" : {
+            "type" : "string",
+            "enum" : [ "CREATED", "RUNNING", "FAILING", "FAILED", "CANCELLING", "CANCELED", "FINISHED", "RESTARTING", "SUSPENDING", "SUSPENDED", "RECONCILING" ]
+          }
+        }
+      }
+    }
+  }
 }            </code>
           </pre>
          </div>
@@ -300,21 +300,21 @@
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#1710504926">Request</button>
-        <div id="1710504926" class="collapse">
+        <button data-toggle="collapse" data-target="#-1188986258">Request</button>
+        <div id="-1188986258" class="collapse">
           <pre>
             <code>
-{
-  "type" : "object",
-  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobSubmitRequestBody",
-  "properties" : {
-    "serializedJobGraph" : {
-      "type" : "array",
-      "items" : {
-        "type" : "integer"
-      }
-    }
-  }
+{
+  "type" : "object",
+  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobSubmitRequestBody",
+  "properties" : {
+    "serializedJobGraph" : {
+      "type" : "array",
+      "items" : {
+        "type" : "integer"
+      }
+    }
+  }
 }            </code>
           </pre>
          </div>
@@ -322,18 +322,18 @@
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#-476159276">Response</button>
-        <div id="-476159276" class="collapse">
+        <button data-toggle="collapse" data-target="#919316836">Response</button>
+        <div id="919316836" class="collapse">
           <pre>
             <code>
-{
-  "type" : "object",
-  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobSubmitResponseBody",
-  "properties" : {
-    "jobUrl" : {
-      "type" : "string"
-    }
-  }
+{
+  "type" : "object",
+  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobSubmitResponseBody",
+  "properties" : {
+    "jobUrl" : {
+      "type" : "string"
+    }
+  }
 }            </code>
           </pre>
          </div>
@@ -355,8 +355,114 @@
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#-297252120">Request</button>
-        <div id="-297252120" class="collapse">
+        <button data-toggle="collapse" data-target="#-1586327629">Request</button>
+        <div id="-1586327629" class="collapse">
+          <pre>
+            <code>
+{}            </code>
+          </pre>
+         </div>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <button data-toggle="collapse" data-target="#-1663567735">Response</button>
+        <div id="-1663567735" class="collapse">
+          <pre>
+            <code>
+{
+  "type" : "object",
+  "id" : "urn:jsonschema:org:apache:flink:runtime:messages:webmonitor:MultipleJobsDetails",
+  "properties" : {
+    "jobs" : {
+      "type" : "array",
+      "items" : {
+        "type" : "object",
+        "id" : "urn:jsonschema:org:apache:flink:runtime:messages:webmonitor:JobDetails",
+        "properties" : {
+          "jobId" : {
+            "type" : "object",
+            "id" : "urn:jsonschema:org:apache:flink:api:common:JobID",
+            "properties" : {
+              "upperPart" : {
+                "type" : "integer"
+              },
+              "lowerPart" : {
+                "type" : "integer"
+              },
+              "bytes" : {
+                "type" : "array",
+                "items" : {
+                  "type" : "integer"
+                }
+              }
+            }
+          },
+          "jobName" : {
+            "type" : "string"
+          },
+          "startTime" : {
+            "type" : "integer"
+          },
+          "endTime" : {
+            "type" : "integer"
+          },
+          "duration" : {
+            "type" : "integer"
+          },
+          "status" : {
+            "type" : "string",
+            "enum" : [ "CREATED", "RUNNING", "FAILING", "FAILED", "CANCELLING", "CANCELED", "FINISHED", "RESTARTING", "SUSPENDING", "SUSPENDED", "RECONCILING" ]
+          },
+          "lastUpdateTime" : {
+            "type" : "integer"
+          },
+          "tasksPerState" : {
+            "type" : "array",
+            "items" : {
+              "type" : "integer"
+            }
+          },
+          "numTasks" : {
+            "type" : "integer"
+          }
+        }
+      }
+    }
+  }
+}            </code>
+          </pre>
+         </div>
+      </td>
+    </tr>
+  </tbody>
+</table>
+<table class="table table-bordered">
+  <tbody>
+    <tr>
+      <td class="text-left" colspan="2"><strong>/jobs/:jobid</strong></td>
+    </tr>
+    <tr>
+      <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
+      <td class="text-left">Response code: <code>200 OK</code></td>
+    </tr>
+    <tr>
+      <td colspan="2">description</td>
+    </tr>
+    <tr>
+      <td colspan="2">Path parameters</td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <ul>
+<li><code>jobid</code> - description</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <button data-toggle="collapse" data-target="#-2071441832">Request</button>
+        <div id="-2071441832" class="collapse">
           <pre>
             <code>
 {}            </code>
@@ -366,70 +472,152 @@
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#-374492226">Response</button>
-        <div id="-374492226" class="collapse">
-          <pre>
-            <code>
-{
-  "type" : "object",
-  "id" : "urn:jsonschema:org:apache:flink:runtime:messages:webmonitor:MultipleJobsDetails",
-  "properties" : {
-    "jobs" : {
-      "type" : "array",
-      "items" : {
-        "type" : "object",
-        "id" : "urn:jsonschema:org:apache:flink:runtime:messages:webmonitor:JobDetails",
-        "properties" : {
-          "jobId" : {
-            "type" : "object",
-            "id" : "urn:jsonschema:org:apache:flink:api:common:JobID",
-            "properties" : {
-              "upperPart" : {
-                "type" : "integer"
-              },
-              "lowerPart" : {
-                "type" : "integer"
-              },
-              "bytes" : {
-                "type" : "array",
-                "items" : {
-                  "type" : "integer"
-                }
-              }
-            }
-          },
-          "jobName" : {
-            "type" : "string"
-          },
-          "startTime" : {
-            "type" : "integer"
-          },
-          "endTime" : {
-            "type" : "integer"
-          },
-          "duration" : {
-            "type" : "integer"
-          },
-          "status" : {
-            "type" : "string",
-            "enum" : [ "CREATED", "RUNNING", "FAILING", "FAILED", "CANCELLING", "CANCELED", "FINISHED", "RESTARTING", "SUSPENDED", "RECONCILING" ]
-          },
-          "lastUpdateTime" : {
-            "type" : "integer"
-          },
-          "tasksPerState" : {
-            "type" : "array",
-            "items" : {
-              "type" : "integer"
-            }
-          },
-          "numTasks" : {
-            "type" : "integer"
-          }
-        }
-      }
-    }
-  }
+        <button data-toggle="collapse" data-target="#998294776">Response</button>
+        <div id="998294776" class="collapse">
+          <pre>
+            <code>
+{
+  "type" : "object",
+  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobDetailsInfo",
+  "properties" : {
+    "jobId" : {
+      "type" : "object",
+      "id" : "urn:jsonschema:org:apache:flink:api:common:JobID",
+      "properties" : {
+        "upperPart" : {
+          "type" : "integer"
+        },
+        "lowerPart" : {
+          "type" : "integer"
+        },
+        "bytes" : {
+          "type" : "array",
+          "items" : {
+            "type" : "integer"
+          }
+        }
+      }
+    },
+    "name" : {
+      "type" : "string"
+    },
+    "jobStatus" : {
+      "type" : "string",
+      "enum" : [ "CREATED", "RUNNING", "FAILING", "FAILED", "CANCELLING", "CANCELED", "FINISHED", "RESTARTING", "SUSPENDING", "SUSPENDED", "RECONCILING" ]
+    },
+    "startTime" : {
+      "type" : "integer"
+    },
+    "endTime" : {
+      "type" : "integer"
+    },
+    "duration" : {
+      "type" : "integer"
+    },
+    "now" : {
+      "type" : "integer"
+    },
+    "timestamps" : {
+      "type" : "object",
+      "additionalProperties" : {
+        "type" : "integer"
+      }
+    },
+    "jobVertexInfos" : {
+      "type" : "array",
+      "items" : {
+        "type" : "object",
+        "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobDetailsInfo:JobVertexDetailsInfo",
+        "properties" : {
+          "jobVertexID" : {
+            "type" : "object",
+            "id" : "urn:jsonschema:org:apache:flink:runtime:jobgraph:JobVertexID",
+            "properties" : {
+              "upperPart" : {
+                "type" : "integer"
+              },
+              "lowerPart" : {
+                "type" : "integer"
+              },
+              "bytes" : {
+                "type" : "array",
+                "items" : {
+                  "type" : "integer"
+                }
+              }
+            }
+          },
+          "name" : {
+            "type" : "string"
+          },
+          "parallelism" : {
+            "type" : "integer"
+          },
+          "executionState" : {
+            "type" : "string",
+            "enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING" ]
+          },
+          "startTime" : {
+            "type" : "integer"
+          },
+          "endTime" : {
+            "type" : "integer"
+          },
+          "duration" : {
+            "type" : "integer"
+          },
+          "tasksPerState" : {
+            "type" : "object",
+            "additionalProperties" : {
+              "type" : "integer"
+            }
+          },
+          "jobVertexMetrics" : {
+            "type" : "object",
+            "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:IOMetricsInfo",
+            "properties" : {
+              "bytesRead" : {
+                "type" : "integer"
+              },
+              "bytesReadComplete" : {
+                "type" : "boolean"
+              },
+              "bytesWritten" : {
+                "type" : "integer"
+              },
+              "bytesWrittenComplete" : {
+                "type" : "boolean"
+              },
+              "recordsRead" : {
+                "type" : "integer"
+              },
+              "recordsReadComplete" : {
+                "type" : "boolean"
+              },
+              "recordsWritten" : {
+                "type" : "integer"
+              },
+              "recordsWrittenComplete" : {
+                "type" : "boolean"
+              }
+            }
+          }
+        }
+      }
+    },
+    "jobVerticesPerState" : {
+      "type" : "object",
+      "additionalProperties" : {
+        "type" : "integer"
+      }
+    },
+    "jsonPlan" : {
+      "type" : "string"
+    },
+    "stoppable" : {
+      "type" : "boolean"
+    }
+  }
 }            </code>
           </pre>
          </div>
@@ -471,8 +659,8 @@
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#1776493329">Request</button>
-        <div id="1776493329" class="collapse">
+        <button data-toggle="collapse" data-target="#-1977276770">Request</button>
+        <div id="-1977276770" class="collapse">
           <pre>
             <code>
 {}            </code>
@@ -482,8 +670,8 @@
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#1961476153">Response</button>
-        <div id="1961476153" class="collapse">
+        <button data-toggle="collapse" data-target="#-1792293946">Response</button>
+        <div id="-1792293946" class="collapse">
           <pre>
             <code>
 {}            </code>
@@ -496,7 +684,7 @@
 <table class="table table-bordered">
   <tbody>
     <tr>
-      <td class="text-left" colspan="2"><strong>/jobs/:jobid</strong></td>
+      <td class="text-left" colspan="2"><strong>/jobs/:jobid/accumulators</strong></td>
     </tr>
     <tr>
       <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
@@ -516,9 +704,19 @@
       </td>
     </tr>
     <tr>
+      <td colspan="2">Query parameters</td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <ul>
+<li><code>includeSerializedValue</code> (optional): description</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#-782366323">Request</button>
-        <div id="-782366323" class="collapse">
+        <button data-toggle="collapse" data-target="#-2064509468">Request</button>
+        <div id="-2064509468" class="collapse">
           <pre>
             <code>
 {}            </code>
@@ -528,12 +726,54 @@
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#-2007597011">Response</button>
-        <div id="-2007597011" class="collapse">
+        <button data-toggle="collapse" data-target="#-1744345868">Response</button>
+        <div id="-1744345868" class="collapse">
           <pre>
             <code>
-{
-  "type" : "any"
+{
+  "type" : "object",
+  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobAccumulatorsInfo",
+  "properties" : {
+    "jobAccumulators" : {
+      "type" : "array",
+      "items" : {
+        "type" : "any"
+      }
+    },
+    "userAccumulators" : {
+      "type" : "array",
+      "items" : {
+        "type" : "object",
+        "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobAccumulatorsInfo:UserTaskAccumulator",
+        "properties" : {
+          "name" : {
+            "type" : "string"
+          },
+          "type" : {
+            "type" : "string"
+          },
+          "value" : {
+            "type" : "string"
+          }
+        }
+      }
+    },
+    "serializedUserAccumulators" : {
+      "type" : "object",
+      "additionalProperties" : {
+        "type" : "object",
+        "id" : "urn:jsonschema:org:apache:flink:util:SerializedValue&lt;org:apache:flink:util:OptionalFailure&lt;java:lang:Object&gt;&gt;",
+        "properties" : {
+          "byteArray" : {
+            "type" : "array",
+            "items" : {
+              "type" : "integer"
+            }
+          }
+        }
+      }
+    }
+  }
 }            </code>
           </pre>
          </div>
@@ -544,7 +784,7 @@
 <table class="table table-bordered">
   <tbody>
     <tr>
-      <td class="text-left" colspan="2"><strong>/jobs/:jobid/accumulators</strong></td>
+      <td class="text-left" colspan="2"><strong>/jobs/:jobid/checkpoints</strong></td>
     </tr>
     <tr>
       <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
@@ -565,8 +805,8 @@
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#-775433959">Request</button>
-        <div id="-775433959" class="collapse">
+        <button data-toggle="collapse" data-target="#-1417778508">Request</button>
+        <div id="-1417778508" class="collapse">
           <pre>
             <code>
 {}            </code>
@@ -576,12 +816,267 @@
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#-455270359">Response</button>
-        <div id="-455270359" class="collapse">
+        <button data-toggle="collapse" data-target="#-850872826">Response</button>
+        <div id="-850872826" class="collapse">
           <pre>
             <code>
-{
-  "type" : "any"
+{
+  "type" : "object",
+  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointingStatistics",
+  "properties" : {
+    "counts" : {
+      "type" : "object",
+      "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointingStatistics:Counts",
+      "properties" : {
+        "numberRestoredCheckpoints" : {
+          "type" : "integer"
+        },
+        "totalNumberCheckpoints" : {
+          "type" : "integer"
+        },
+        "numberInProgressCheckpoints" : {
+          "type" : "integer"
+        },
+        "numberCompletedCheckpoints" : {
+          "type" : "integer"
+        },
+        "numberFailedCheckpoints" : {
+          "type" : "integer"
+        }
+      }
+    },
+    "summary" : {
+      "type" : "object",
+      "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointingStatistics:Summary",
+      "properties" : {
+        "stateSize" : {
+          "type" : "object",
+          "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics",
+          "properties" : {
+            "minimum" : {
+              "type" : "integer"
+            },
+            "maximum" : {
+              "type" : "integer"
+            },
+            "average" : {
+              "type" : "integer"
+            }
+          }
+        },
+        "duration" : {
+          "type" : "object",
+          "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
+        },
+        "alignmentBuffered" : {
+          "type" : "object",
+          "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
+        }
+      }
+    },
+    "latestCheckpoints" : {
+      "type" : "object",
+      "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointingStatistics:LatestCheckpoints",
+      "properties" : {
+        "completedCheckpointStatistics" : {
+          "type" : "object",
+          "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointStatistics:CompletedCheckpointStatistics",
+          "properties" : {
+            "id" : {
+              "type" : "integer"
+            },
+            "status" : {
+              "type" : "string",
+              "enum" : [ "IN_PROGRESS", "COMPLETED", "FAILED" ]
+            },
+            "savepoint" : {
+              "type" : "boolean"
+            },
+            "triggerTimestamp" : {
+              "type" : "integer"
+            },
+            "latestAckTimestamp" : {
+              "type" : "integer"
+            },
+            "stateSize" : {
+              "type" : "integer"
+            },
+            "duration" : {
+              "type" : "integer"
+            },
+            "alignmentBuffered" : {
+              "type" : "integer"
+            },
+            "numSubtasks" : {
+              "type" : "integer"
+            },
+            "numAckSubtasks" : {
+              "type" : "integer"
+            },
+            "checkpointStatisticsPerTask" : {
+              "type" : "object",
+              "additionalProperties" : {
+                "type" : "object",
+                "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:TaskCheckpointStatistics",
+                "properties" : {
+                  "checkpointId" : {
+                    "type" : "integer"
+                  },
+                  "checkpointStatus" : {
+                    "type" : "string",
+                    "enum" : [ "IN_PROGRESS", "COMPLETED", "FAILED" ]
+                  },
+                  "latestAckTimestamp" : {
+                    "type" : "integer"
+                  },
+                  "stateSize" : {
+                    "type" : "integer"
+                  },
+                  "duration" : {
+                    "type" : "integer"
+                  },
+                  "alignmentBuffered" : {
+                    "type" : "integer"
+                  },
+                  "numSubtasks" : {
+                    "type" : "integer"
+                  },
+                  "numAckSubtasks" : {
+                    "type" : "integer"
+                  }
+                }
+              }
+            },
+            "externalPath" : {
+              "type" : "string"
+            },
+            "discarded" : {
+              "type" : "boolean"
+            }
+          }
+        },
+        "savepointStatistics" : {
+          "type" : "object",
+          "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointStatistics:CompletedCheckpointStatistics"
+        },
+        "failedCheckpointStatistics" : {
+          "type" : "object",
+          "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointStatistics:FailedCheckpointStatistics",
+          "properties" : {
+            "id" : {
+              "type" : "integer"
+            },
+            "status" : {
+              "type" : "string",
+              "enum" : [ "IN_PROGRESS", "COMPLETED", "FAILED" ]
+            },
+            "savepoint" : {
+              "type" : "boolean"
+            },
+            "triggerTimestamp" : {
+              "type" : "integer"
+            },
+            "latestAckTimestamp" : {
+              "type" : "integer"
+            },
+            "stateSize" : {
+              "type" : "integer"
+            },
+            "duration" : {
+              "type" : "integer"
+            },
+            "alignmentBuffered" : {
+              "type" : "integer"
+            },
+            "numSubtasks" : {
+              "type" : "integer"
+            },
+            "numAckSubtasks" : {
+              "type" : "integer"
+            },
+            "checkpointStatisticsPerTask" : {
+              "type" : "object",
+              "additionalProperties" : {
+                "type" : "object",
+                "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:TaskCheckpointStatistics"
+              }
+            },
+            "failureTimestamp" : {
+              "type" : "integer"
+            },
+            "failureMessage" : {
+              "type" : "string"
+            }
+          }
+        },
+        "restoredCheckpointStatistics" : {
+          "type" : "object",
+          "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointingStatistics:RestoredCheckpointStatistics",
+          "properties" : {
+            "id" : {
+              "type" : "integer"
+            },
+            "restoreTimestamp" : {
+              "type" : "integer"
+            },
+            "savepoint" : {
+              "type" : "boolean"
+            },
+            "externalPath" : {
+              "type" : "string"
+            }
+          }
+        }
+      }
+    },
+    "history" : {
+      "type" : "array",
+      "items" : {
+        "type" : "object",
+        "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointStatistics",
+        "properties" : {
+          "id" : {
+            "type" : "integer"
+          },
+          "status" : {
+            "type" : "string",
+            "enum" : [ "IN_PROGRESS", "COMPLETED", "FAILED" ]
+          },
+          "savepoint" : {
+            "type" : "boolean"
+          },
+          "triggerTimestamp" : {
+            "type" : "integer"
+          },
+          "latestAckTimestamp" : {
+            "type" : "integer"
+          },
+          "stateSize" : {
+            "type" : "integer"
+          },
+          "duration" : {
+            "type" : "integer"
+          },
+          "alignmentBuffered" : {
+            "type" : "integer"
+          },
+          "numSubtasks" : {
+            "type" : "integer"
+          },
+          "numAckSubtasks" : {
+            "type" : "integer"
+          },
+          "checkpointStatisticsPerTask" : {
+            "type" : "object",
+            "additionalProperties" : {
+              "type" : "object",
+              "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:TaskCheckpointStatistics"
+            }
+          }
+        }
+      }
+    }
+  }
 }            </code>
           </pre>
          </div>
@@ -592,7 +1087,7 @@
 <table class="table table-bordered">
   <tbody>
     <tr>
-      <td class="text-left" colspan="2"><strong>/jobs/:jobid/checkpoints</strong></td>
+      <td class="text-left" colspan="2"><strong>/jobs/:jobid/checkpoints/config</strong></td>
     </tr>
     <tr>
       <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
@@ -613,8 +1108,8 @@
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#-128702999">Request</button>
-        <div id="-128702999" class="collapse">
+        <button data-toggle="collapse" data-target="#-1872439843">Request</button>
+        <div id="-1872439843" class="collapse">
           <pre>
             <code>
 {}            </code>
@@ -624,267 +1119,12 @@
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#438202683">Response</button>
-        <div id="438202683" class="collapse">
-          <pre>
-            <code>
-{
-  "type" : "object",
-  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointingStatistics",
-  "properties" : {
-    "counts" : {
-      "type" : "object",
-      "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointingStatistics:Counts",
-      "properties" : {
-        "numberRestoredCheckpoints" : {
-          "type" : "integer"
-        },
-        "totalNumberCheckpoints" : {
-          "type" : "integer"
-        },
-        "numberInProgressCheckpoints" : {
-          "type" : "integer"
-        },
-        "numberCompletedCheckpoints" : {
-          "type" : "integer"
-        },
-        "numberFailedCheckpoints" : {
-          "type" : "integer"
-        }
-      }
-    },
-    "summary" : {
-      "type" : "object",
-      "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointingStatistics:Summary",
-      "properties" : {
-        "stateSize" : {
-          "type" : "object",
-          "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics",
-          "properties" : {
-            "minimum" : {
-              "type" : "integer"
-            },
-            "maximum" : {
-              "type" : "integer"
-            },
-            "average" : {
-              "type" : "integer"
-            }
-          }
-        },
-        "duration" : {
-          "type" : "object",
-          "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
-        },
-        "alignmentBuffered" : {
-          "type" : "object",
-          "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
-        }
-      }
-    },
-    "latestCheckpoints" : {
-      "type" : "object",
-      "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointingStatistics:LatestCheckpoints",
-      "properties" : {
-        "completedCheckpointStatistics" : {
-          "type" : "object",
-          "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointStatistics:CompletedCheckpointStatistics",
-          "properties" : {
-            "id" : {
-              "type" : "integer"
-            },
-            "status" : {
-              "type" : "string",
-              "enum" : [ "IN_PROGRESS", "COMPLETED", "FAILED" ]
-            },
-            "savepoint" : {
-              "type" : "boolean"
-            },
-            "triggerTimestamp" : {
-              "type" : "integer"
-            },
-            "latestAckTimestamp" : {
-              "type" : "integer"
-            },
-            "stateSize" : {
-              "type" : "integer"
-            },
-            "duration" : {
-              "type" : "integer"
-            },
-            "alignmentBuffered" : {
-              "type" : "integer"
-            },
-            "numSubtasks" : {
-              "type" : "integer"
-            },
-            "numAckSubtasks" : {
-              "type" : "integer"
-            },
-            "checkpointStatisticsPerTask" : {
-              "type" : "object",
-              "additionalProperties" : {
-                "type" : "object",
-                "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:TaskCheckpointStatistics",
-                "properties" : {
-                  "checkpointId" : {
-                    "type" : "integer"
-                  },
-                  "checkpointStatus" : {
-                    "type" : "string",
-                    "enum" : [ "IN_PROGRESS", "COMPLETED", "FAILED" ]
-                  },
-                  "latestAckTimestamp" : {
-                    "type" : "integer"
-                  },
-                  "stateSize" : {
-                    "type" : "integer"
-                  },
-                  "duration" : {
-                    "type" : "integer"
-                  },
-                  "alignmentBuffered" : {
-                    "type" : "integer"
-                  },
-                  "numSubtasks" : {
-                    "type" : "integer"
-                  },
-                  "numAckSubtasks" : {
-                    "type" : "integer"
-                  }
-                }
-              }
-            },
-            "externalPath" : {
-              "type" : "string"
-            },
-            "discarded" : {
-              "type" : "boolean"
-            }
-          }
-        },
-        "savepointStatistics" : {
-          "type" : "object",
-          "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointStatistics:CompletedCheckpointStatistics"
-        },
-        "failedCheckpointStatistics" : {
-          "type" : "object",
-          "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointStatistics:FailedCheckpointStatistics",
-          "properties" : {
-            "id" : {
-              "type" : "integer"
-            },
-            "status" : {
-              "type" : "string",
-              "enum" : [ "IN_PROGRESS", "COMPLETED", "FAILED" ]
-            },
-            "savepoint" : {
-              "type" : "boolean"
-            },
-            "triggerTimestamp" : {
-              "type" : "integer"
-            },
-            "latestAckTimestamp" : {
-              "type" : "integer"
-            },
-            "stateSize" : {
-              "type" : "integer"
-            },
-            "duration" : {
-              "type" : "integer"
-            },
-            "alignmentBuffered" : {
-              "type" : "integer"
-            },
-            "numSubtasks" : {
-              "type" : "integer"
-            },
-            "numAckSubtasks" : {
-              "type" : "integer"
-            },
-            "checkpointStatisticsPerTask" : {
-              "type" : "object",
-              "additionalProperties" : {
-                "type" : "object",
-                "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:TaskCheckpointStatistics"
-              }
-            },
-            "failureTimestamp" : {
-              "type" : "integer"
-            },
-            "failureMessage" : {
-              "type" : "string"
-            }
-          }
-        },
-        "restoredCheckpointStatistics" : {
-          "type" : "object",
-          "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointingStatistics:RestoredCheckpointStatistics",
-          "properties" : {
-            "id" : {
-              "type" : "integer"
-            },
-            "restoreTimestamp" : {
-              "type" : "integer"
-            },
-            "savepoint" : {
-              "type" : "boolean"
-            },
-            "externalPath" : {
-              "type" : "string"
-            }
-          }
-        }
-      }
-    },
-    "history" : {
-      "type" : "array",
-      "items" : {
-        "type" : "object",
-        "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointStatistics",
-        "properties" : {
-          "id" : {
-            "type" : "integer"
-          },
-          "status" : {
-            "type" : "string",
-            "enum" : [ "IN_PROGRESS", "COMPLETED", "FAILED" ]
-          },
-          "savepoint" : {
-            "type" : "boolean"
-          },
-          "triggerTimestamp" : {
-            "type" : "integer"
-          },
-          "latestAckTimestamp" : {
-            "type" : "integer"
-          },
-          "stateSize" : {
-            "type" : "integer"
-          },
-          "duration" : {
-            "type" : "integer"
-          },
-          "alignmentBuffered" : {
-            "type" : "integer"
-          },
-          "numSubtasks" : {
-            "type" : "integer"
-          },
-          "numAckSubtasks" : {
-            "type" : "integer"
-          },
-          "checkpointStatisticsPerTask" : {
-            "type" : "object",
-            "additionalProperties" : {
-              "type" : "object",
-              "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:TaskCheckpointStatistics"
-            }
-          }
-        }
-      }
-    }
-  }
+        <button data-toggle="collapse" data-target="#1325385076">Response</button>
+        <div id="1325385076" class="collapse">
+          <pre>
+            <code>
+{
+  "type" : "any"
 }            </code>
           </pre>
          </div>
@@ -895,7 +1135,7 @@
 <table class="table table-bordered">
   <tbody>
     <tr>
-      <td class="text-left" colspan="2"><strong>/jobs/:jobid/checkpoints/config</strong></td>
+      <td class="text-left" colspan="2"><strong>/jobs/:jobid/checkpoints/details/:checkpointid/subtasks/:vertexid</strong></td>
     </tr>
     <tr>
       <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
@@ -911,13 +1151,15 @@
       <td colspan="2">
         <ul>
 <li><code>jobid</code> - description</li>
+<li><code>checkpointid</code> - description</li>
+<li><code>vertexid</code> - description</li>
         </ul>
       </td>
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#-583364334">Request</button>
-        <div id="-583364334" class="collapse">
+        <button data-toggle="collapse" data-target="#723836914">Request</button>
+        <div id="723836914" class="collapse">
           <pre>
             <code>
 {}            </code>
@@ -927,12 +1169,108 @@
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#-1680506711">Response</button>
-        <div id="-1680506711" class="collapse">
+        <button data-toggle="collapse" data-target="#664379972">Response</button>
+        <div id="664379972" class="collapse">
           <pre>
             <code>
-{
-  "type" : "any"
+{
+  "type" : "object",
+  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:TaskCheckpointStatisticsWithSubtaskDetails",
+  "properties" : {
+    "checkpointId" : {
+      "type" : "integer"
+    },
+    "checkpointStatus" : {
+      "type" : "string",
+      "enum" : [ "IN_PROGRESS", "COMPLETED", "FAILED" ]
+    },
+    "latestAckTimestamp" : {
+      "type" : "integer"
+    },
+    "stateSize" : {
+      "type" : "integer"
+    },
+    "duration" : {
+      "type" : "integer"
+    },
+    "alignmentBuffered" : {
+      "type" : "integer"
+    },
+    "numSubtasks" : {
+      "type" : "integer"
+    },
+    "numAckSubtasks" : {
+      "type" : "integer"
+    },
+    "summary" : {
+      "type" : "object",
+      "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:TaskCheckpointStatisticsWithSubtaskDetails:Summary",
+      "properties" : {
+        "stateSize" : {
+          "type" : "object",
+          "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics",
+          "properties" : {
+            "minimum" : {
+              "type" : "integer"
+            },
+            "maximum" : {
+              "type" : "integer"
+            },
+            "average" : {
+              "type" : "integer"
+            }
+          }
+        },
+        "duration" : {
+          "type" : "object",
+          "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
+        },
+        "checkpointDuration" : {
+          "type" : "object",
+          "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:TaskCheckpointStatisticsWithSubtaskDetails:CheckpointDuration",
+          "properties" : {
+            "synchronousDuration" : {
+              "type" : "object",
+              "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
+            },
+            "asynchronousDuration" : {
+              "type" : "object",
+              "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
+            }
+          }
+        },
+        "checkpointAlignment" : {
+          "type" : "object",
+          "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:TaskCheckpointStatisticsWithSubtaskDetails:CheckpointAlignment",
+          "properties" : {
+            "bufferedData" : {
+              "type" : "object",
+              "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
+            },
+            "duration" : {
+              "type" : "object",
+              "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
+            }
+          }
+        }
+      }
+    },
+    "subtaskCheckpointStatistics" : {
+      "type" : "array",
+      "items" : {
+        "type" : "object",
+        "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:SubtaskCheckpointStatistics",
+        "properties" : {
+          "index" : {
+            "type" : "integer"
+          },
+          "checkpointStatus" : {
+            "type" : "string"
+          }
+        }
+      }
+    }
+  }
 }            </code>
           </pre>
          </div>
@@ -943,7 +1281,7 @@
 <table class="table table-bordered">
   <tbody>
     <tr>
-      <td class="text-left" colspan="2"><strong>/jobs/:jobid/checkpoints/details/:checkpointid/subtasks/:vertexid</strong></td>
+      <td class="text-left" colspan="2"><strong>/jobs/:jobid/checkpoints/:checkpointid</strong></td>
     </tr>
     <tr>
       <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
@@ -960,14 +1298,13 @@
         <ul>
 <li><code>jobid</code> - description</li>
 <li><code>checkpointid</code> - description</li>
-<li><code>vertexid</code> - description</li>
         </ul>
       </td>
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#2012912423">Request</button>
-        <div id="2012912423" class="collapse">
+        <button data-toggle="collapse" data-target="#-450903614">Request</button>
+        <div id="-450903614" class="collapse">
           <pre>
             <code>
 {}            </code>
@@ -977,108 +1314,80 @@
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#1953455481">Response</button>
-        <div id="1953455481" class="collapse">
-          <pre>
-            <code>
-{
-  "type" : "object",
-  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:TaskCheckpointStatisticsWithSubtaskDetails",
-  "properties" : {
-    "checkpointId" : {
-      "type" : "integer"
-    },
-    "checkpointStatus" : {
-      "type" : "string",
-      "enum" : [ "IN_PROGRESS", "COMPLETED", "FAILED" ]
-    },
-    "latestAckTimestamp" : {
-      "type" : "integer"
-    },
-    "stateSize" : {
-      "type" : "integer"
-    },
-    "duration" : {
-      "type" : "integer"
-    },
-    "alignmentBuffered" : {
-      "type" : "integer"
-    },
-    "numSubtasks" : {
-      "type" : "integer"
-    },
-    "numAckSubtasks" : {
-      "type" : "integer"
-    },
-    "summary" : {
-      "type" : "object",
-      "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:TaskCheckpointStatisticsWithSubtaskDetails:Summary",
-      "properties" : {
-        "stateSize" : {
-          "type" : "object",
-          "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics",
-          "properties" : {
-            "minimum" : {
-              "type" : "integer"
-            },
-            "maximum" : {
-              "type" : "integer"
-            },
-            "average" : {
-              "type" : "integer"
-            }
-          }
-        },
-        "duration" : {
-          "type" : "object",
-          "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
-        },
-        "checkpointDuration" : {
-          "type" : "object",
-          "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:TaskCheckpointStatisticsWithSubtaskDetails:CheckpointDuration",
-          "properties" : {
-            "synchronousDuration" : {
-              "type" : "object",
-              "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
-            },
-            "asynchronousDuration" : {
-              "type" : "object",
-              "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
-            }
-          }
-        },
-        "checkpointAlignment" : {
-          "type" : "object",
-          "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:TaskCheckpointStatisticsWithSubtaskDetails:CheckpointAlignment",
-          "properties" : {
-            "bufferedData" : {
-              "type" : "object",
-              "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
-            },
-            "duration" : {
-              "type" : "object",
-              "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
-            }
-          }
-        }
-      }
-    },
-    "subtaskCheckpointStatistics" : {
-      "type" : "array",
-      "items" : {
-        "type" : "object",
-        "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:SubtaskCheckpointStatistics",
-        "properties" : {
-          "index" : {
-            "type" : "integer"
-          },
-          "checkpointStatus" : {
-            "type" : "string"
-          }
-        }
-      }
-    }
-  }
+        <button data-toggle="collapse" data-target="#1821144940">Response</button>
+        <div id="1821144940" class="collapse">
+          <pre>
+            <code>
+{
+  "type" : "object",
+  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointStatistics",
+  "properties" : {
+    "id" : {
+      "type" : "integer"
+    },
+    "status" : {
+      "type" : "string",
+      "enum" : [ "IN_PROGRESS", "COMPLETED", "FAILED" ]
+    },
+    "savepoint" : {
+      "type" : "boolean"
+    },
+    "triggerTimestamp" : {
+      "type" : "integer"
+    },
+    "latestAckTimestamp" : {
+      "type" : "integer"
+    },
+    "stateSize" : {
+      "type" : "integer"
+    },
+    "duration" : {
+      "type" : "integer"
+    },
+    "alignmentBuffered" : {
+      "type" : "integer"
+    },
+    "numSubtasks" : {
+      "type" : "integer"
+    },
+    "numAckSubtasks" : {
+      "type" : "integer"
+    },
+    "checkpointStatisticsPerTask" : {
+      "type" : "object",
+      "additionalProperties" : {
+        "type" : "object",
+        "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:TaskCheckpointStatistics",
+        "properties" : {
+          "checkpointId" : {
+            "type" : "integer"
+          },
+          "checkpointStatus" : {
+            "type" : "string",
+            "enum" : [ "IN_PROGRESS", "COMPLETED", "FAILED" ]
+          },
+          "latestAckTimestamp" : {
+            "type" : "integer"
+          },
+          "stateSize" : {
+            "type" : "integer"
+          },
+          "duration" : {
+            "type" : "integer"
+          },
+          "alignmentBuffered" : {
+            "type" : "integer"
+          },
+          "numSubtasks" : {
+            "type" : "integer"
+          },
+          "numAckSubtasks" : {
+            "type" : "integer"
+          }
+        }
+      }
+    }
+  }
 }            </code>
           </pre>
          </div>
@@ -1089,7 +1398,7 @@
 <table class="table table-bordered">
   <tbody>
     <tr>
-      <td class="text-left" colspan="2"><strong>/jobs/:jobid/checkpoints/:checkpointid</strong></td>
+      <td class="text-left" colspan="2"><strong>/jobs/:jobid/config</strong></td>
     </tr>
     <tr>
       <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
@@ -1105,14 +1414,13 @@
       <td colspan="2">
         <ul>
 <li><code>jobid</code> - description</li>
-<li><code>checkpointid</code> - description</li>
         </ul>
       </td>
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#838171895">Request</button>
-        <div id="838171895" class="collapse">
+        <button data-toggle="collapse" data-target="#1648775609">Request</button>
+        <div id="1648775609" class="collapse">
           <pre>
             <code>
 {}            </code>
@@ -1122,80 +1430,39 @@
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#-1184746847">Response</button>
-        <div id="-1184746847" class="collapse">
-          <pre>
-            <code>
-{
-  "type" : "object",
-  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointStatistics",
-  "properties" : {
-    "id" : {
-      "type" : "integer"
-    },
-    "status" : {
-      "type" : "string",
-      "enum" : [ "IN_PROGRESS", "COMPLETED", "FAILED" ]
-    },
-    "savepoint" : {
-      "type" : "boolean"
-    },
-    "triggerTimestamp" : {
-      "type" : "integer"
-    },
-    "latestAckTimestamp" : {
-      "type" : "integer"
-    },
-    "stateSize" : {
-      "type" : "integer"
-    },
-    "duration" : {
-      "type" : "integer"
-    },
-    "alignmentBuffered" : {
-      "type" : "integer"
-    },
-    "numSubtasks" : {
-      "type" : "integer"
-    },
-    "numAckSubtasks" : {
-      "type" : "integer"
-    },
-    "checkpointStatisticsPerTask" : {
-      "type" : "object",
-      "additionalProperties" : {
-        "type" : "object",
-        "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:TaskCheckpointStatistics",
-        "properties" : {
-          "checkpointId" : {
-            "type" : "integer"
-          },
-          "checkpointStatus" : {
-            "type" : "string",
-            "enum" : [ "IN_PROGRESS", "COMPLETED", "FAILED" ]
-          },
-          "latestAckTimestamp" : {
-            "type" : "integer"
-          },
-          "stateSize" : {
-            "type" : "integer"
-          },
-          "duration" : {
-            "type" : "integer"
-          },
-          "alignmentBuffered" : {
-            "type" : "integer"
-          },
-          "numSubtasks" : {
-            "type" : "integer"
-          },
-          "numAckSubtasks" : {
-            "type" : "integer"
-          }
-        }
-      }
-    }
-  }
+        <button data-toggle="collapse" data-target="#-1215495906">Response</button>
+        <div id="-1215495906" class="collapse">
+          <pre>
+            <code>
+{
+  "type" : "object",
+  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobConfigInfo",
+  "properties" : {
+    "jobId" : {
+      "type" : "object",
+      "id" : "urn:jsonschema:org:apache:flink:api:common:JobID",
+      "properties" : {
+        "upperPart" : {
+          "type" : "integer"
+        },
+        "lowerPart" : {
+          "type" : "integer"
+        },
+        "bytes" : {
+          "type" : "array",
+          "items" : {
+            "type" : "integer"
+          }
+        }
+      }
+    },
+    "jobName" : {
+      "type" : "string"
+    },
+    "executionConfigInfo" : {
+      "type" : "any"
+    }
+  }
 }            </code>
           </pre>
          </div>
@@ -1206,7 +1473,7 @@
 <table class="table table-bordered">
   <tbody>
     <tr>
-      <td class="text-left" colspan="2"><strong>/jobs/:jobid/config</strong></td>
+      <td class="text-left" colspan="2"><strong>/jobs/:jobid/exceptions</strong></td>
     </tr>
     <tr>
       <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
@@ -1227,8 +1494,8 @@
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#-1357116178">Request</button>
-        <div id="-1357116178" class="collapse">
+        <button data-toggle="collapse" data-target="#-494359301">Request</button>
+        <div id="-494359301" class="collapse">
           <pre>
             <code>
 {}            </code>
@@ -1238,39 +1505,12 @@
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#73579603">Response</button>
-        <div id="73579603" class="collapse">
-          <pre>
-            <code>
-{
-  "type" : "object",
-  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobConfigInfo",
-  "properties" : {
-    "jobId" : {
-      "type" : "object",
-      "id" : "urn:jsonschema:org:apache:flink:api:common:JobID",
-      "properties" : {
-        "upperPart" : {
-          "type" : "integer"
-        },
-        "lowerPart" : {
-          "type" : "integer"
-        },
-        "bytes" : {
-          "type" : "array",
-          "items" : {
-            "type" : "integer"
-          }
-        }
-      }
-    },
-    "jobName" : {
-      "type" : "string"
-    },
-    "executionConfigInfo" : {
-      "type" : "any"
-    }
-  }
+        <button data-toggle="collapse" data-target="#-1465234270">Response</button>
+        <div id="-1465234270" class="collapse">
+          <pre>
+            <code>
+{
+  "type" : "any"
 }            </code>
           </pre>
          </div>
@@ -1281,7 +1521,7 @@
 <table class="table table-bordered">
   <tbody>
     <tr>
-      <td class="text-left" colspan="2"><strong>/jobs/:jobid/exceptions</strong></td>
+      <td class="text-left" colspan="2"><strong>/jobs/:jobid/execution-result</strong></td>
     </tr>
     <tr>
       <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
@@ -1302,8 +1542,8 @@
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#794716208">Request</button>
-        <div id="794716208" class="collapse">
+        <button data-toggle="collapse" data-target="#-1141690839">Request</button>
+        <div id="-1141690839" class="collapse">
           <pre>
             <code>
 {}            </code>
@@ -1313,12 +1553,79 @@
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#-176158761">Response</button>
-        <div id="-176158761" class="collapse">
+        <button data-toggle="collapse" data-target="#-1847991759">Response</button>
+        <div id="-1847991759" class="collapse">
           <pre>
             <code>
-{
-  "type" : "any"
+{
+  "type" : "object",
+  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobExecutionResultResponseBody",
+  "properties" : {
+    "status" : {
+      "type" : "object",
+      "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:queue:QueueStatus",
+      "properties" : {
+        "id" : {
+          "type" : "string",
+          "enum" : [ "IN_PROGRESS", "COMPLETED" ]
+        }
+      }
+    },
+    "jobExecutionResult" : {
+      "type" : "object",
+      "id" : "urn:jsonschema:org:apache:flink:runtime:jobmaster:JobResult",
+      "properties" : {
+        "jobId" : {
+          "type" : "object",
+          "id" : "urn:jsonschema:org:apache:flink:api:common:JobID",
+          "properties" : {
+            "upperPart" : {
+              "type" : "integer"
+            },
+            "lowerPart" : {
+              "type" : "integer"
+            },
+            "bytes" : {
+              "type" : "array",
+              "items" : {
+                "type" : "integer"
+              }
+            }
+          }
+        },
+        "accumulatorResults" : {
+          "type" : "object",
+          "additionalProperties" : {
+            "type" : "object",
+            "id" : "urn:jsonschema:org:apache:flink:util:SerializedValue&lt;org:apache:flink:util:OptionalFailure&lt;java:lang:Object&gt;&gt;",
+            "properties" : {
+              "byteArray" : {
+                "type" : "array",
+                "items" : {
+                  "type" : "integer"
+                }
+              }
+            }
+          }
+        },
+        "netRuntime" : {
+          "type" : "integer"
+        },
+        "serializedThrowable" : {
+          "type" : "object",
+          "id" : "urn:jsonschema:java:util:Optional&lt;org:apache:flink:util:SerializedThrowable&gt;",
+          "properties" : {
+            "present" : {
+              "type" : "boolean"
+            }
+          }
+        },
+        "success" : {
+          "type" : "boolean"
+        }
+      }
+    }
+  }
 }            </code>
           </pre>
          </div>
@@ -1360,8 +1667,8 @@
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#-1792172223">Request</button>
-        <div id="-1792172223" class="collapse">
+        <button data-toggle="collapse" data-target="#1213719564">Request</button>
+        <div id="1213719564" class="collapse">
           <pre>
             <code>
 {}            </code>
@@ -1371,30 +1678,30 @@
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#-1481443862">Response</button>
-        <div id="-1481443862" class="collapse">
+        <button data-toggle="collapse" data-target="#1524447925">Response</button>
+        <div id="1524447925" class="collapse">
           <pre>
             <code>
-{
-  "type" : "object",
-  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:MetricCollectionResponseBody",
-  "properties" : {
-    "metrics" : {
-      "type" : "array",
-      "items" : {
-        "type" : "object",
-        "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:Metric",
-        "properties" : {
-          "id" : {
-            "type" : "string"
-          },
-          "value" : {
-            "type" : "string"
-          }
-        }
-      }
-    }
-  }
+{
+  "type" : "object",
+  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:MetricCollectionResponseBody",
+  "properties" : {
+    "metrics" : {
+      "type" : "array",
+      "items" : {
+        "type" : "object",
+        "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:Metric",
+        "properties" : {
+          "id" : {
+            "type" : "string"
+          },
+          "value" : {
+            "type" : "string"
+          }
+        }
+      }
+    }
+  }
 }            </code>
           </pre>
          </div>
@@ -1426,8 +1733,8 @@
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#-895161707">Request</button>
-        <div id="-895161707" class="collapse">
+        <button data-toggle="collapse" data-target="#2110730080">Request</button>
+        <div id="2110730080" class="collapse">
           <pre>
             <code>
 {}            </code>
@@ -1437,18 +1744,18 @@
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#597567777">Response</button>
-        <div id="597567777" class="collapse">
+        <button data-toggle="collapse" data-target="#-691507732">Response</button>
+        <div id="-691507732" class="collapse">
           <pre>
             <code>
-{
-  "type" : "object",
-  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobPlanInfo",
-  "properties" : {
-    "jsonPlan" : {
-      "type" : "string"
-    }
-  }
+{
+  "type" : "object",
+  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobPlanInfo",
+  "properties" : {
+    "jsonPlan" : {
+      "type" : "string"
+    }
+  }
 }            </code>
           </pre>
          </div>
@@ -1459,10 +1766,10 @@
 <table class="table table-bordered">
   <tbody>
     <tr>
-      <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/accumulators</strong></td>
+      <td class="text-left" colspan="2"><strong>/jobs/:jobid/rescaling</strong></td>
     </tr>
     <tr>
-      <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
+      <td class="text-left" style="width: 20%">Verb: <code>PATCH</code></td>
       <td class="text-left">Response code: <code>200 OK</code></td>
     </tr>
     <tr>
@@ -1475,14 +1782,23 @@
       <td colspan="2">
         <ul>
 <li><code>jobid</code> - description</li>
-<li><code>vertexid</code> - description</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">Query parameters</td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <ul>
+<li><code>parallelism</code> (mandatory): description</li>
         </ul>
       </td>
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#-2003025577">Request</button>
-        <div id="-2003025577" class="collapse">
+        <button data-toggle="collapse" data-target="#1601881923">Request</button>
+        <div id="1601881923" class="collapse">
           <pre>
             <code>
 {}            </code>
@@ -1492,12 +1808,33 @@
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#736007659">Response</button>
-        <div id="736007659" class="collapse">
+        <button data-toggle="collapse" data-target="#971862112">Response</button>
+        <div id="971862112" class="collapse">
           <pre>
             <code>
-{
-  "type" : "any"
+{
+  "type" : "object",
+  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:handler:async:TriggerResponse",
+  "properties" : {
+    "triggerId" : {
+      "type" : "object",
+      "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:TriggerId",
+      "properties" : {
+        "upperPart" : {
+          "type" : "integer"
+        },
+        "lowerPart" : {
+          "type" : "integer"
+        },
+        "bytes" : {
+          "type" : "array",
+          "items" : {
+            "type" : "integer"
+          }
+        }
+      }
+    }
+  }
 }            </code>
           </pre>
          </div>
@@ -1508,7 +1845,7 @@
 <table class="table table-bordered">
   <tbody>
     <tr>
-      <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/metrics</strong></td>
+      <td class="text-left" colspan="2"><strong>/jobs/:jobid/rescaling/:triggerid</strong></td>
     </tr>
     <tr>
       <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
@@ -1524,57 +1861,109 @@
       <td colspan="2">
         <ul>
 <li><code>jobid</code> - description</li>
-<li><code>vertexid</code> - description</li>
+<li><code>triggerid</code> - description</li>
         </ul>
       </td>
     </tr>
     <tr>
-      <td colspan="2">Query parameters</td>
+      <td colspan="2">
+        <button data-toggle="collapse" data-target="#1396498187">Request</button>
+        <div id="1396498187" class="collapse">
+          <pre>
+            <code>
+{}            </code>
+          </pre>
+         </div>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <button data-toggle="collapse" data-target="#-436584949">Response</button>
+        <div id="-436584949" class="collapse">
+          <pre>
+            <code>
+{
+  "type" : "any"
+}            </code>
+          </pre>
+         </div>
+      </td>
+    </tr>
+  </tbody>
+</table>
+<table class="table table-bordered">
+  <tbody>
+    <tr>
+      <td class="text-left" colspan="2"><strong>/jobs/:jobid/savepoints</strong></td>
+    </tr>
+    <tr>
+      <td class="text-left" style="width: 20%">Verb: <code>POST</code></td>
+      <td class="text-left">Response code: <code>202 Accepted</code></td>
+    </tr>
+    <tr>
+      <td colspan="2">description</td>
+    </tr>
+    <tr>
+      <td colspan="2">Path parameters</td>
     </tr>
     <tr>
       <td colspan="2">
         <ul>
-<li><code>get</code> (optional): description</li>
+<li><code>jobid</code> - description</li>
         </ul>
       </td>
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#197623875">Request</button>
-        <div id="197623875" class="collapse">
+        <button data-toggle="collapse" data-target="#256459122">Request</button>
+        <div id="256459122" class="collapse">
           <pre>
             <code>
-{}            </code>
+{
+  "type" : "object",
+  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:savepoints:SavepointTriggerRequestBody",
+  "properties" : {
+    "targetDirectory" : {
+      "type" : "string"
+    },
+    "cancelJob" : {
+      "type" : "boolean"
+    }
+  }
+}            </code>
           </pre>
          </div>
       </td>
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#508352236">Response</button>
-        <div id="508352236" class="collapse">
+        <button data-toggle="collapse" data-target="#-591687676">Response</button>
+        <div id="-591687676" class="collapse">
           <pre>
             <code>
-{
-  "type" : "object",
-  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:MetricCollectionResponseBody",
-  "properties" : {
-    "metrics" : {
-      "type" : "array",
-      "items" : {
-        "type" : "object",
-        "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:Metric",
-        "properties" : {
-          "id" : {
-            "type" : "string"
-          },
-          "value" : {
-            "type" : "string"
-          }
-        }
-      }
-    }
-  }
+{
+  "type" : "object",
+  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:handler:async:TriggerResponse",
+  "properties" : {
+    "triggerId" : {
+      "type" : "object",
+      "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:TriggerId",
+      "properties" : {
+        "upperPart" : {
+          "type" : "integer"
+        },
+        "lowerPart" : {
+          "type" : "integer"
+        },
+        "bytes" : {
+          "type" : "array",
+          "items" : {
+            "type" : "integer"
+          }
+        }
+      }
+    }
+  }
 }            </code>
           </pre>
          </div>
@@ -1585,7 +1974,7 @@
 <table class="table table-bordered">
   <tbody>
     <tr>
-      <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskindex/metrics</strong></td>
+      <td class="text-left" colspan="2"><strong>/jobs/:jobid/savepoints/:triggerid</strong></td>
     </tr>
     <tr>
       <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
@@ -1601,25 +1990,63 @@
       <td colspan="2">
         <ul>
 <li><code>jobid</code> - description</li>
-<li><code>vertexid</code> - description</li>
-<li><code>subtaskindex</code> - description</li>
+<li><code>triggerid</code> - description</li>
         </ul>
       </td>
     </tr>
     <tr>
-      <td colspan="2">Query parameters</td>
+      <td colspan="2">
+        <button data-toggle="collapse" data-target="#-679507407">Request</button>
+        <div id="-679507407" class="collapse">
+          <pre>
+            <code>
+{}            </code>
+          </pre>
+         </div>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <button data-toggle="collapse" data-target="#1782376753">Response</button>
+        <div id="1782376753" class="collapse">
+          <pre>
+            <code>
+{
+  "type" : "any"
+}            </code>
+          </pre>
+         </div>
+      </td>
+    </tr>
+  </tbody>
+</table>
+<table class="table table-bordered">
+  <tbody>
+    <tr>
+      <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid</strong></td>
+    </tr>
+    <tr>
+      <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
+      <td class="text-left">Response code: <code>200 OK</code></td>
+    </tr>
+    <tr>
+      <td colspan="2">description</td>
+    </tr>
+    <tr>
+      <td colspan="2">Path parameters</td>
     </tr>
     <tr>
       <td colspan="2">
         <ul>
-<li><code>get</code> (optional): description</li>
+<li><code>jobid</code> - description</li>
+<li><code>vertexid</code> - description</li>
         </ul>
       </td>
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#-570514556">Request</button>
-        <div id="-570514556" class="collapse">
+        <button data-toggle="collapse" data-target="#155996506">Request</button>
+        <div id="155996506" class="collapse">
           <pre>
             <code>
 {}            </code>
@@ -1629,30 +2056,12 @@
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#-259786195">Response</button>
-        <div id="-259786195" class="collapse">
+        <button data-toggle="collapse" data-target="#-1181366619">Response</button>
+        <div id="-1181366619" class="collapse">
           <pre>
             <code>
-{
-  "type" : "object",
-  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:MetricCollectionResponseBody",
-  "properties" : {
-    "metrics" : {
-      "type" : "array",
-      "items" : {
-        "type" : "object",
-        "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:Metric",
-        "properties" : {
-          "id" : {
-            "type" : "string"
-          },
-          "value" : {
-            "type" : "string"
-          }
-        }
-      }
-    }
-  }
+{
+  "type" : "any"
 }            </code>
           </pre>
          </div>
@@ -1663,7 +2072,7 @@
 <table class="table table-bordered">
   <tbody>
     <tr>
-      <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/subtasktimes</strong></td>
+      <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/accumulators</strong></td>
     </tr>
     <tr>
       <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
@@ -1685,8 +2094,8 @@
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#-1181229237">Request</button>
-        <div id="-1181229237" class="collapse">
+        <button data-toggle="collapse" data-target="#1002866210">Request</button>
+        <div id="1002866210" class="collapse">
           <pre>
             <code>
 {}            </code>
@@ -1696,12 +2105,12 @@
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#2091701353">Response</button>
-        <div id="2091701353" class="collapse">
+        <button data-toggle="collapse" data-target="#-553067850">Response</button>
+        <div id="-553067850" class="collapse">
           <pre>
             <code>
-{
-  "type" : "any"
+{
+  "type" : "any"
 }            </code>
           </pre>
          </div>
@@ -1712,7 +2121,7 @@
 <table class="table table-bordered">
   <tbody>
     <tr>
-      <td class="text-left" colspan="2"><strong>/overview</strong></td>
+      <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/backpressure</strong></td>
     </tr>
     <tr>
       <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
@@ -1722,9 +2131,20 @@
       <td colspan="2">description</td>
     </tr>
     <tr>
+      <td colspan="2">Path parameters</td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <ul>
+<li><code>jobid</code> - description</li>
+<li><code>vertexid</code> - description</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#-1581342131">Request</button>
-        <div id="-1581342131" class="collapse">
+        <button data-toggle="collapse" data-target="#-587292255">Request</button>
+        <div id="-587292255" class="collapse">
           <pre>
             <code>
 {}            </code>
@@ -1734,42 +2154,45 @@
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#-1331669657">Response</button>
-        <div id="-1331669657" class="collapse">
-          <pre>
-            <code>
-{
-  "type" : "object",
-  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:handler:legacy:messages:ClusterOverviewWithVersion",
-  "properties" : {
-    "numJobsRunningOrPending" : {
-      "type" : "integer"
-    },
-    "numJobsFinished" : {
-      "type" : "integer"
-    },
-    "numJobsCancelled" : {
-      "type" : "integer"
-    },
-    "numJobsFailed" : {
-      "type" : "integer"
-    },
-    "numTaskManagersConnected" : {
-      "type" : "integer"
-    },
-    "numSlotsTotal" : {
-      "type" : "integer"
-    },
-    "numSlotsAvailable" : {
-      "type" : "integer"
-    },
-    "version" : {
-      "type" : "string"
-    },
-    "commitId" : {
-      "type" : "string"
-    }
-  }
+        <button data-toggle="collapse" data-target="#1388088916">Response</button>
+        <div id="1388088916" class="collapse">
+          <pre>
+            <code>
+{
+  "type" : "object",
+  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobVertexBackPressureInfo",
+  "properties" : {
+    "status" : {
+      "type" : "string",
+      "enum" : [ "DEPRECATED", "OK" ]
+    },
+    "backpressureLevel" : {
+      "type" : "string",
+      "enum" : [ "OK", "LOW", "HIGH" ]
+    },
+    "endTimestamp" : {
+      "type" : "integer"
+    },
+    "subtasks" : {
+      "type" : "array",
+      "items" : {
+        "type" : "object",
+        "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobVertexBackPressureInfo:SubtaskBackPressureInfo",
+        "properties" : {
+          "subtask" : {
+            "type" : "integer"
+          },
+          "backpressureLevel" : {
+            "type" : "string",
+            "enum" : [ "OK", "LOW", "HIGH" ]
+          },
+          "ratio" : {
+            "type" : "number"
+          }
+        }
+      }
+    }
+  }
 }            </code>
           </pre>
          </div>
@@ -1780,7 +2203,7 @@
 <table class="table table-bordered">
   <tbody>
     <tr>
-      <td class="text-left" colspan="2"><strong>/taskmanagers</strong></td>
+      <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/metrics</strong></td>
     </tr>
     <tr>
       <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
@@ -1790,9 +2213,30 @@
       <td colspan="2">description</td>
     </tr>
     <tr>
+      <td colspan="2">Path parameters</td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <ul>
+<li><code>jobid</code> - description</li>
+<li><code>vertexid</code> - description</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">Query parameters</td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <ul>
+<li><code>get</code> (optional): description</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#-1212462017">Request</button>
-        <div id="-1212462017" class="collapse">
+        <button data-toggle="collapse" data-target="#-1091451634">Request</button>
+        <div id="-1091451634" class="collapse">
           <pre>
             <code>
 {}            </code>
@@ -1802,70 +2246,30 @@
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#1694259450">Response</button>
-        <div id="1694259450" class="collapse">
-          <pre>
-            <code>
-{
-  "type" : "object",
-  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagersInfo",
-  "properties" : {
-    "taskManagerInfos" : {
-      "type" : "array",
-      "items" : {
-        "type" : "object",
-        "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerInfo",
-        "properties" : {
-          "resourceId" : {
-            "type" : "object",
-            "id" : "urn:jsonschema:org:apache:flink:runtime:clusterframework:types:ResourceID",
-            "properties" : {
-              "resourceIdString" : {
-                "type" : "string"
-              },
-              "resourceID" : {
-                "type" : "object",
-                "$ref" : "urn:jsonschema:org:apache:flink:runtime:clusterframework:types:ResourceID"
-              }
-            }
-          },
-          "address" : {
-            "type" : "string"
-          },
-          "dataPort" : {
-            "type" : "integer"
-          },
-          "lastHeartbeat" : {
-            "type" : "integer"
-          },
-          "numberSlots" : {
-            "type" : "integer"
-          },
-          "numberAvailableSlots" : {
-            "type" : "integer"
-          },
-          "hardwareDescription" : {
-            "type" : "object",
-            "id" : "urn:jsonschema:org:apache:flink:runtime:instance:HardwareDescription",
-            "properties" : {
-              "numberOfCPUCores" : {
-                "type" : "integer"
-              },
-              "sizeOfPhysicalMemory" : {
-                "type" : "integer"
-              },
-              "sizeOfJvmHeap" : {
-                "type" : "integer"
-              },
-              "sizeOfManagedMemory" : {
-                "type" : "integer"
-              }
-            }
-          }
-        }
-      }
-    }
-  }
+        <button data-toggle="collapse" data-target="#-780723273">Response</button>
+        <div id="-780723273" class="collapse">
+          <pre>
+            <code>
+{
+  "type" : "object",
+  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:MetricCollectionResponseBody",
+  "properties" : {
+    "metrics" : {
+      "type" : "array",
+      "items" : {
+        "type" : "object",
+        "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:Metric",
+        "properties" : {
+          "id" : {
+            "type" : "string"
+          },
+          "value" : {
+            "type" : "string"
+          }
+        }
+      }
+    }
+  }
 }            </code>
           </pre>
          </div>
@@ -1876,7 +2280,7 @@
 <table class="table table-bordered">
   <tbody>
     <tr>
-      <td class="text-left" colspan="2"><strong>/taskmanagers/:taskmanagerid</strong></td>
+      <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskindex</strong></td>
     </tr>
     <tr>
       <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
@@ -1891,14 +2295,16 @@
     <tr>
       <td colspan="2">
         <ul>
-<li><code>taskmanagerid</code> - description</li>
+<li><code>jobid</code> - description</li>
+<li><code>vertexid</code> - description</li>
+<li><code>subtaskindex</code> - description</li>
         </ul>
       </td>
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#-1209460381">Request</button>
-        <div id="-1209460381" class="collapse">
+        <button data-toggle="collapse" data-target="#-1504229989">Request</button>
+        <div id="-1504229989" class="collapse">
           <pre>
             <code>
 {}            </code>
@@ -1908,61 +2314,661 @@
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#-1726867379">Response</button>
-        <div id="-1726867379" class="collapse">
-          <pre>
-            <code>
-{
-  "type" : "object",
-  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerDetailsInfo",
-  "properties" : {
-    "resourceId" : {
-      "type" : "object",
-      "id" : "urn:jsonschema:org:apache:flink:runtime:clusterframework:types:ResourceID",
-      "properties" : {
-        "resourceIdString" : {
-          "type" : "string"
-        },
-        "resourceID" : {
-          "type" : "object",
-          "$ref" : "urn:jsonschema:org:apache:flink:runtime:clusterframework:types:ResourceID"
-        }
-      }
-    },
-    "address" : {
-      "type" : "string"
-    },
-    "dataPort" : {
-      "type" : "integer"
-    },
-    "lastHeartbeat" : {
-      "type" : "integer"
-    },
-    "numberSlots" : {
-      "type" : "integer"
-    },
-    "numberAvailableSlots" : {
-      "type" : "integer"
-    },
-    "hardwareDescription" : {
-      "type" : "object",
-      "id" : "urn:jsonschema:org:apache:flink:runtime:instance:HardwareDescription",
-      "properties" : {
-        "numberOfCPUCores" : {
-          "type" : "integer"
-        },
-        "sizeOfPhysicalMemory" : {
-          "type" : "integer"
-        },
-        "sizeOfJvmHeap" : {
-          "type" : "integer"
-        },
-        "sizeOfManagedMemory" : {
-          "type" : "integer"
-        }
-      }
-    }
-  }
+        <button data-toggle="collapse" data-target="#772003294">Response</button>
+        <div id="772003294" class="collapse">
+          <pre>
+            <code>
+{
+  "type" : "object",
+  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:SubtaskExecutionAttemptDetailsInfo",
+  "properties" : {
+    "subtaskIndex" : {
+      "type" : "integer"
+    },
+    "status" : {
+      "type" : "string",
+      "enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING" ]
+    },
+    "attempt" : {
+      "type" : "integer"
+    },
+    "host" : {
+      "type" : "string"
+    },
+    "startTime" : {
+      "type" : "integer"
+    },
+    "endTime" : {
+      "type" : "integer"
+    },
+    "duration" : {
+      "type" : "integer"
+    },
+    "ioMetricsInfo" : {
+      "type" : "object",
+      "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:IOMetricsInfo",
+      "properties" : {
+        "bytesRead" : {
+          "type" : "integer"
+        },
+        "bytesReadComplete" : {
+          "type" : "boolean"
+        },
+        "bytesWritten" : {
+          "type" : "integer"
+        },
+        "bytesWrittenComplete" : {
+          "type" : "boolean"
+        },
+        "recordsRead" : {
+          "type" : "integer"
+        },
+        "recordsReadComplet

<TRUNCATED>

[03/18] flink git commit: [FLINK-9069] Add checkstyle rule to detect multiple consecutive semicolons

Posted by ch...@apache.org.
[FLINK-9069] Add checkstyle rule to detect multiple consecutive semicolons

This closes #5769.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2d874532
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2d874532
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2d874532

Branch: refs/heads/master
Commit: 2d87453260b07e7f605e54cc4b76619ca7945e13
Parents: 767d79f
Author: jparkie <pa...@gmail.com>
Authored: Sun Mar 25 18:09:25 2018 -0700
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 4 08:59:05 2018 +0200

----------------------------------------------------------------------
 .../org/apache/flink/configuration/HistoryServerOptions.java  | 2 +-
 .../java/org/apache/calcite/avatica/util/DateTimeUtils.java   | 4 ++--
 .../runtime/io/network/netty/PartitionRequestQueueTest.java   | 2 +-
 .../main/java/org/apache/flink/yarn/YarnClusterClient.java    | 2 +-
 tools/maven/checkstyle.xml                                    | 7 +++++++
 5 files changed, 12 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2d874532/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
index a16fd7f..13cdc1e 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
@@ -34,7 +34,7 @@ public class HistoryServerOptions {
 	public static final ConfigOption<Long> HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL =
 		key("historyserver.archive.fs.refresh-interval")
 			.defaultValue(10000L)
-			.withDescription("Interval in milliseconds for refreshing the archived job directories.");;
+			.withDescription("Interval in milliseconds for refreshing the archived job directories.");
 
 	/**
 	 * Comma-separated list of directories which the HistoryServer polls for new archives.

http://git-wip-us.apache.org/repos/asf/flink/blob/2d874532/flink-libraries/flink-table/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java b/flink-libraries/flink-table/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java
index d1a87a7..fe09d18 100644
--- a/flink-libraries/flink-table/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java
+++ b/flink-libraries/flink-table/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java
@@ -427,7 +427,7 @@ public class DateTimeUtils {
 	}
 
 	public static int digitCount(int v) {
-		for (int n = 1;; n++) {
+		for (int n = 1; true; n++) {
 			v /= 10;
 			if (v == 0) {
 				return n;
@@ -960,7 +960,7 @@ public class DateTimeUtils {
 		// Start with an estimate.
 		// Since no month has more than 31 days, the estimate is <= the true value.
 		int m = (date0 - date1) / 31;
-		for (;;) {
+		while (true) {
 			int date2 = addMonths(date1, m);
 			if (date2 >= date0) {
 				return m;

http://git-wip-us.apache.org/repos/asf/flink/blob/2d874532/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
index f614c18..2deaa9b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
@@ -165,7 +165,7 @@ public class PartitionRequestQueueTest {
 		private final AtomicInteger buffersInBacklog;
 
 		private DefaultBufferResultSubpartitionView(int buffersInBacklog) {
-			this.buffersInBacklog = new AtomicInteger(buffersInBacklog);;
+			this.buffersInBacklog = new AtomicInteger(buffersInBacklog);
 		}
 
 		@Nullable

http://git-wip-us.apache.org/repos/asf/flink/blob/2d874532/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index 29ece26..2ac9664 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -244,7 +244,7 @@ public class YarnClusterClient extends ClusterClient<ApplicationId> {
 	public void waitForClusterToBeReady() {
 		logAndSysout("Waiting until all TaskManagers have connected");
 
-		for (GetClusterStatusResponse currentStatus, lastStatus = null;; lastStatus = currentStatus) {
+		for (GetClusterStatusResponse currentStatus, lastStatus = null; true; lastStatus = currentStatus) {
 			currentStatus = getClusterStatus();
 			if (currentStatus != null && !currentStatus.equals(lastStatus)) {
 				logAndSysout("TaskManager status (" + currentStatus.numRegisteredTaskManagers() + "/"

http://git-wip-us.apache.org/repos/asf/flink/blob/2d874532/tools/maven/checkstyle.xml
----------------------------------------------------------------------
diff --git a/tools/maven/checkstyle.xml b/tools/maven/checkstyle.xml
index a0168b0..a0e7dd7 100644
--- a/tools/maven/checkstyle.xml
+++ b/tools/maven/checkstyle.xml
@@ -462,6 +462,13 @@ This file is based on the checkstyle file of Apache Beam.
     <!-- Detects empty statements (standalone ";" semicolon). -->
     <module name="EmptyStatement"/>
 
+	<!-- Detect multiple consecutive semicolons (e.g. ";;"). -->
+	<module name="RegexpSinglelineJava">
+	  <property name="format" value=";{2,}"/>
+	  <property name="message" value="Use one semicolon"/>
+	  <property name="ignoreComments" value="true"/>
+	</module>
+
     <!--
 
     MODIFIERS CHECKS


[06/18] flink git commit: [FLINK-8704][tests] Port ScheduleOrUpdateConsumersTest

Posted by ch...@apache.org.
[FLINK-8704][tests] Port ScheduleOrUpdateConsumersTest

This closes #5697.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ac077615
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ac077615
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ac077615

Branch: refs/heads/master
Commit: ac077615d244661c99232a6b3a4b88afd9186e11
Parents: f0a6ff7
Author: zentol <ch...@apache.org>
Authored: Mon Mar 12 13:21:01 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 4 08:59:05 2018 +0200

----------------------------------------------------------------------
 .../LegacyScheduleOrUpdateConsumersTest.java    | 168 +++++++++++++++++++
 .../ScheduleOrUpdateConsumersTest.java          |  34 +++-
 2 files changed, 193 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ac077615/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/LegacyScheduleOrUpdateConsumersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/LegacyScheduleOrUpdateConsumersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/LegacyScheduleOrUpdateConsumersTest.java
new file mode 100644
index 0000000..846901a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/LegacyScheduleOrUpdateConsumersTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest.SubtaskIndexReceiver.CONFIG_KEY;
+
+public class LegacyScheduleOrUpdateConsumersTest extends TestLogger {
+
+	private static final int NUMBER_OF_TMS = 2;
+	private static final int NUMBER_OF_SLOTS_PER_TM = 2;
+	private static final int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
+
+	private static TestingCluster flink;
+
+	@BeforeClass
+	public static void setUp() throws Exception {
+		flink = TestingUtils.startTestingCluster(
+				NUMBER_OF_SLOTS_PER_TM,
+				NUMBER_OF_TMS,
+				TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
+	}
+
+	@AfterClass
+	public static void tearDown() throws Exception {
+		flink.stop();
+	}
+
+	/**
+	 * Tests notifications of multiple receivers when a task produces both a pipelined and blocking
+	 * result.
+	 *
+	 * <pre>
+	 *                             +----------+
+	 *            +-- pipelined -> | Receiver |
+	 * +--------+ |                +----------+
+	 * | Sender |-|
+	 * +--------+ |                +----------+
+	 *            +-- blocking --> | Receiver |
+	 *                             +----------+
+	 * </pre>
+	 *
+	 * The pipelined receiver gets deployed after the first buffer is available and the blocking
+	 * one after all subtasks are finished.
+	 */
+	@Test
+	public void testMixedPipelinedAndBlockingResults() throws Exception {
+		final JobVertex sender = new JobVertex("Sender");
+		sender.setInvokableClass(BinaryRoundRobinSubtaskIndexSender.class);
+		sender.getConfiguration().setInteger(BinaryRoundRobinSubtaskIndexSender.CONFIG_KEY, PARALLELISM);
+		sender.setParallelism(PARALLELISM);
+
+		final JobVertex pipelinedReceiver = new JobVertex("Pipelined Receiver");
+		pipelinedReceiver.setInvokableClass(SlotCountExceedingParallelismTest.SubtaskIndexReceiver.class);
+		pipelinedReceiver.getConfiguration().setInteger(CONFIG_KEY, PARALLELISM);
+		pipelinedReceiver.setParallelism(PARALLELISM);
+
+		pipelinedReceiver.connectNewDataSetAsInput(
+				sender,
+				DistributionPattern.ALL_TO_ALL,
+				ResultPartitionType.PIPELINED);
+
+		final JobVertex blockingReceiver = new JobVertex("Blocking Receiver");
+		blockingReceiver.setInvokableClass(SlotCountExceedingParallelismTest.SubtaskIndexReceiver.class);
+		blockingReceiver.getConfiguration().setInteger(CONFIG_KEY, PARALLELISM);
+		blockingReceiver.setParallelism(PARALLELISM);
+
+		blockingReceiver.connectNewDataSetAsInput(sender,
+				DistributionPattern.ALL_TO_ALL,
+				ResultPartitionType.BLOCKING);
+
+		SlotSharingGroup slotSharingGroup = new SlotSharingGroup(
+				sender.getID(), pipelinedReceiver.getID(), blockingReceiver.getID());
+
+		sender.setSlotSharingGroup(slotSharingGroup);
+		pipelinedReceiver.setSlotSharingGroup(slotSharingGroup);
+		blockingReceiver.setSlotSharingGroup(slotSharingGroup);
+
+		final JobGraph jobGraph = new JobGraph(
+				"Mixed pipelined and blocking result",
+				sender,
+				pipelinedReceiver,
+				blockingReceiver);
+
+		flink.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION());
+	}
+
+	// ---------------------------------------------------------------------------------------------
+
+	public static class BinaryRoundRobinSubtaskIndexSender extends AbstractInvokable {
+
+		public static final String CONFIG_KEY = "number-of-times-to-send";
+
+		public BinaryRoundRobinSubtaskIndexSender(Environment environment) {
+			super(environment);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			List<RecordWriter<IntValue>> writers = Lists.newArrayListWithCapacity(2);
+
+			// The order of intermediate result creation in the job graph specifies which produced
+			// result partition is pipelined/blocking.
+			final RecordWriter<IntValue> pipelinedWriter =
+					new RecordWriter<>(getEnvironment().getWriter(0));
+
+			final RecordWriter<IntValue> blockingWriter =
+					new RecordWriter<>(getEnvironment().getWriter(1));
+
+			writers.add(pipelinedWriter);
+			writers.add(blockingWriter);
+
+			final int numberOfTimesToSend = getTaskConfiguration().getInteger(CONFIG_KEY, 0);
+
+			final IntValue subtaskIndex = new IntValue(
+					getEnvironment().getTaskInfo().getIndexOfThisSubtask());
+
+			// Produce the first intermediate result and then the second in a serial fashion.
+			for (RecordWriter<IntValue> writer : writers) {
+				try {
+					for (int i = 0; i < numberOfTimesToSend; i++) {
+						writer.emit(subtaskIndex);
+					}
+					writer.flushAll();
+				}
+				finally {
+					writer.clearBuffers();
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ac077615/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
index f55dfe4..c743a63 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
@@ -18,16 +18,20 @@
 
 package org.apache.flink.runtime.jobmanager.scheduler;
 
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.testutils.category.Flip6;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.util.TestLogger;
 
@@ -36,30 +40,42 @@ import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import java.util.List;
 
 import static org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest.SubtaskIndexReceiver.CONFIG_KEY;
 
+@Category(Flip6.class)
 public class ScheduleOrUpdateConsumersTest extends TestLogger {
 
 	private static final int NUMBER_OF_TMS = 2;
 	private static final int NUMBER_OF_SLOTS_PER_TM = 2;
 	private static final int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
 
-	private static TestingCluster flink;
+	private static MiniCluster flink;
 
 	@BeforeClass
 	public static void setUp() throws Exception {
-		flink = TestingUtils.startTestingCluster(
-				NUMBER_OF_SLOTS_PER_TM,
-				NUMBER_OF_TMS,
-				TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
+		final Configuration config = new Configuration();
+		config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
+
+		final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
+			.setConfiguration(config)
+			.setNumTaskManagers(NUMBER_OF_TMS)
+			.setNumSlotsPerTaskManager(NUMBER_OF_SLOTS_PER_TM)
+			.build();
+
+		flink = new MiniCluster(miniClusterConfiguration);
+
+		flink.start();
 	}
 
 	@AfterClass
 	public static void tearDown() throws Exception {
-		flink.stop();
+		if (flink != null) {
+			flink.close();
+		}
 	}
 
 	/**
@@ -118,7 +134,7 @@ public class ScheduleOrUpdateConsumersTest extends TestLogger {
 				pipelinedReceiver,
 				blockingReceiver);
 
-		flink.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION());
+		flink.executeJobBlocking(jobGraph);
 	}
 
 	// ---------------------------------------------------------------------------------------------