You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/04 13:30:59 UTC
[01/19] flink git commit: [hotfix][tests] Properly disable
JoinCancelingITCase
Repository: flink
Updated Branches:
refs/heads/release-1.5 515069e14 -> c7d0f4768
[hotfix][tests] Properly disable JoinCancelingITCase
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1c98e364
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1c98e364
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1c98e364
Branch: refs/heads/release-1.5
Commit: 1c98e3641a6c191e10e0fa3636eb92e984e8ddb8
Parents: 20d7af7
Author: zentol <ch...@apache.org>
Authored: Wed Feb 28 13:43:42 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 4 08:59:18 2018 +0200
----------------------------------------------------------------------
.../test/cancelling/JoinCancelingITCase.java | 18 +++++++++++-------
1 file changed, 11 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1c98e364/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
index 66919e7..7288ba5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
@@ -30,9 +30,13 @@ import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator;
import org.apache.flink.test.util.InfiniteIntegerTupleInputFormat;
import org.apache.flink.test.util.UniformIntTupleGeneratorInputFormat;
+import org.junit.Ignore;
+import org.junit.Test;
+
/**
* Test job cancellation from within a JoinFunction.
*/
+@Ignore("Takes too long.")
public class JoinCancelingITCase extends CancelingTestBase {
// --------------- Test Sort Matches that are canceled while still reading / sorting -----------------
@@ -56,17 +60,17 @@ public class JoinCancelingITCase extends CancelingTestBase {
runAndCancelJob(env.createProgramPlan(), 5 * 1000, 10 * 1000);
}
-// @Test
+ @Test
public void testCancelSortMatchWhileReadingSlowInputs() throws Exception {
executeTask(new SimpleMatcher<Integer>(), true);
}
-// @Test
+ @Test
public void testCancelSortMatchWhileReadingFastInputs() throws Exception {
executeTask(new SimpleMatcher<Integer>(), false);
}
-// @Test
+ @Test
public void testCancelSortMatchPriorToFirstRecordReading() throws Exception {
executeTask(new StuckInOpenMatcher<Integer>(), false);
}
@@ -90,26 +94,26 @@ public class JoinCancelingITCase extends CancelingTestBase {
runAndCancelJob(env.createProgramPlan(), msecsTillCanceling, maxTimeTillCanceled);
}
-// @Test
+ @Test
public void testCancelSortMatchWhileDoingHeavySorting() throws Exception {
executeTaskWithGenerator(new SimpleMatcher<Integer>(), 50000, 100, 30 * 1000, 30 * 1000);
}
// --------------- Test Sort Matches that are canceled while in the Matching Phase -----------------
-// @Test
+ @Test
public void testCancelSortMatchWhileJoining() throws Exception {
executeTaskWithGenerator(new DelayingMatcher<Integer>(), 500, 3, 10 * 1000, 20 * 1000);
}
-// @Test
+ @Test
public void testCancelSortMatchWithLongCancellingResponse() throws Exception {
executeTaskWithGenerator(new LongCancelTimeMatcher<Integer>(), 500, 3, 10 * 1000, 10 * 1000);
}
// -------------------------------------- Test System corner cases ---------------------------------
-// @Test
+ @Test
public void testCancelSortMatchWithHighparallelism() throws Exception {
executeTask(new SimpleMatcher<Integer>(), false, 64);
}
[07/19] flink git commit: [FLINK-9042][tests] Port
ResumeCheckpointManuallyITCase to flip6
Posted by ch...@apache.org.
[FLINK-9042][tests] Port ResumeCheckpointManuallyITCase to flip6
This closes #5736.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cad474f7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cad474f7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cad474f7
Branch: refs/heads/release-1.5
Commit: cad474f76a40fb24100322b23fd3623325df4a98
Parents: 515069e
Author: zentol <ch...@apache.org>
Authored: Wed Mar 21 13:31:56 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 4 08:59:18 2018 +0200
----------------------------------------------------------------------
.../ResumeCheckpointManuallyITCase.java | 146 +++++++++++++------
1 file changed, 104 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/cad474f7/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
index 537f864..add4243 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
@@ -18,25 +18,26 @@
package org.apache.flink.test.checkpointing;
-import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
-import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.test.state.ManualWindowSpeedITCase;
+import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.util.TestLogger;
import org.apache.curator.test.TestingServer;
@@ -44,9 +45,17 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import javax.annotation.Nullable;
+
import java.io.File;
import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Optional;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertNotNull;
/**
* IT case for resuming from checkpoints manually via their external pointer, rather than automatic
@@ -240,14 +249,10 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
final Configuration config = new Configuration();
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, SLOTS_PER_TASK_MANAGER);
-
final File savepointDir = temporaryFolder.newFolder();
config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
- config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
if (localRecovery) {
config.setString(
@@ -263,56 +268,113 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haDir.toURI().toString());
}
- TestingCluster cluster = new TestingCluster(config);
- cluster.start();
+ MiniClusterResource cluster = new MiniClusterResource(
+ new MiniClusterResource.MiniClusterResourceConfiguration(
+ config,
+ NUM_TASK_MANAGERS,
+ SLOTS_PER_TASK_MANAGER),
+ true);
+
+ cluster.before();
- String externalCheckpoint = null;
+ ClusterClient<?> client = cluster.getClusterClient();
+ client.setDetached(true);
try {
+ // main test sequence: start job -> eCP -> restore job -> eCP -> restore job
+ String firstExternalCheckpoint = runJobAndGetExternalizedCheckpoint(backend, checkpointDir, null, client);
+ assertNotNull(firstExternalCheckpoint);
+
+ String secondExternalCheckpoint = runJobAndGetExternalizedCheckpoint(backend, checkpointDir, firstExternalCheckpoint, client);
+ assertNotNull(secondExternalCheckpoint);
- // main test sequence: start job -> eCP -> restore job -> eCP -> restore job -> eCP
- for (int i = 0; i < 3; ++i) {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ String thirdExternalCheckpoint = runJobAndGetExternalizedCheckpoint(backend, checkpointDir, secondExternalCheckpoint, client);
+ assertNotNull(thirdExternalCheckpoint);
+ } finally {
+ cluster.after();
+ }
+ }
- env.setStateBackend(backend);
- env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
- env.setParallelism(PARALLELISM);
+ private static String runJobAndGetExternalizedCheckpoint(StateBackend backend, File checkpointDir, @Nullable String externalCheckpoint, ClusterClient<?> client) throws Exception {
+ JobGraph initialJobGraph = getJobGraph(backend, externalCheckpoint);
+ NotifyingInfiniteTupleSource.countDownLatch = new CountDownLatch(PARALLELISM);
- // initialize count down latch
- NotifyingInfiniteTupleSource.countDownLatch = new CountDownLatch(PARALLELISM);
+ client.submitJob(initialJobGraph, ResumeCheckpointManuallyITCase.class.getClassLoader());
- env.addSource(new NotifyingInfiniteTupleSource(10_000))
- .keyBy(0)
- .timeWindow(Time.seconds(3))
- .reduce((value1, value2) -> Tuple2.of(value1.f0, value1.f1 + value2.f1))
- .filter(value -> value.f0.startsWith("Tuple 0"));
+ // wait until all sources have been started
+ NotifyingInfiniteTupleSource.countDownLatch.await();
- StreamGraph streamGraph = env.getStreamGraph();
- streamGraph.setJobName("Test");
+ waitUntilExternalizedCheckpointCreated(checkpointDir, initialJobGraph.getJobID());
+ client.cancel(initialJobGraph.getJobID());
+ waitUntilCanceled(initialJobGraph.getJobID(), client);
- JobGraph jobGraph = streamGraph.getJobGraph();
+ return getExternalizedCheckpointCheckpointPath(checkpointDir, initialJobGraph.getJobID());
+ }
+
+ private static String getExternalizedCheckpointCheckpointPath(File checkpointDir, JobID jobId) throws IOException {
+ Optional<Path> checkpoint = findExternalizedCheckpoint(checkpointDir, jobId);
+ if (!checkpoint.isPresent()) {
+ throw new AssertionError("No complete checkpoint could be found.");
+ } else {
+ return checkpoint.get().toString();
+ }
+ }
- // recover from previous iteration?
- if (externalCheckpoint != null) {
- jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(externalCheckpoint));
+ private static void waitUntilExternalizedCheckpointCreated(File checkpointDir, JobID jobId) throws InterruptedException, IOException {
+ while (true) {
+ Thread.sleep(50);
+ Optional<Path> externalizedCheckpoint = findExternalizedCheckpoint(checkpointDir, jobId);
+ if (externalizedCheckpoint.isPresent()) {
+ break;
+ }
+ }
+ }
+
+ private static Optional<Path> findExternalizedCheckpoint(File checkpointDir, JobID jobId) throws IOException {
+ return Files.list(checkpointDir.toPath().resolve(jobId.toString()))
+ .filter(path -> path.getFileName().toString().startsWith("chk-"))
+ .filter(path -> {
+ try {
+ return Files.list(path).anyMatch(child -> child.getFileName().toString().contains("meta"));
+ } catch (IOException ignored) {
+ return false;
}
+ })
+ .findAny();
+ }
- config.addAll(jobGraph.getJobConfiguration());
- JobSubmissionResult submissionResult = cluster.submitJobDetached(jobGraph);
+ private static void waitUntilCanceled(JobID jobId, ClusterClient<?> client) throws ExecutionException, InterruptedException {
+ while (client.getJobStatus(jobId).get() != JobStatus.CANCELLING) {
+ Thread.sleep(50);
+ }
+ }
- // wait until all sources have been started
- NotifyingInfiniteTupleSource.countDownLatch.await();
+ private static JobGraph getJobGraph(StateBackend backend, @Nullable String externalCheckpoint) {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- externalCheckpoint = cluster.requestCheckpoint(
- submissionResult.getJobID(),
- CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION);
+ env.enableCheckpointing(500);
+ env.setStateBackend(backend);
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+ env.setParallelism(PARALLELISM);
+ env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
- cluster.cancelJob(submissionResult.getJobID());
- }
- } finally {
- cluster.stop();
- cluster.awaitTermination();
+ env.addSource(new NotifyingInfiniteTupleSource(10_000))
+ .keyBy(0)
+ .timeWindow(Time.seconds(3))
+ .reduce((value1, value2) -> Tuple2.of(value1.f0, value1.f1 + value2.f1))
+ .filter(value -> value.f0.startsWith("Tuple 0"));
+
+ StreamGraph streamGraph = env.getStreamGraph();
+ streamGraph.setJobName("Test");
+
+ JobGraph jobGraph = streamGraph.getJobGraph();
+
+ // recover from previous iteration?
+ if (externalCheckpoint != null) {
+ jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(externalCheckpoint));
}
+
+ return jobGraph;
}
/**
[16/19] flink git commit: [FLINK-9104][docs] Update generator and
regenerate REST API docs
Posted by ch...@apache.org.
[FLINK-9104][docs] Update generator and regenerate REST API docs
This closes #5797.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f8172675
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f8172675
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f8172675
Branch: refs/heads/release-1.5
Commit: f817267534d8e092eefbeba044632dbf2ecd61ca
Parents: cf03f9e
Author: Rong Rong <wa...@hotmail.com>
Authored: Sat Mar 31 12:29:59 2018 -0700
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 4 14:26:51 2018 +0200
----------------------------------------------------------------------
docs/_includes/generated/rest_dispatcher.html | 3054 ++++++++++++------
.../flink/docs/rest/RestAPIDocGenerator.java | 41 +
2 files changed, 2071 insertions(+), 1024 deletions(-)
----------------------------------------------------------------------
[18/19] flink git commit: [FLINK-8697] Rename DummyFlinkKafkaConsumer
in Kinesis tests This closes #5809.
Posted by ch...@apache.org.
[FLINK-8697] Rename DummyFlinkKafkaConsumer in Kinesis tests This closes #5809.
This closes #5785.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/897dc515
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/897dc515
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/897dc515
Branch: refs/heads/release-1.5
Commit: 897dc515a5b9efbe8df9c8415162145bdea83002
Parents: d7b1257
Author: Bowen Li <bo...@gmail.com>
Authored: Tue Apr 3 13:53:23 2018 -0700
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 4 14:26:54 2018 +0200
----------------------------------------------------------------------
.../connectors/kinesis/internals/KinesisDataFetcherTest.java | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/897dc515/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
index 7854d03..ccf39d0 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
@@ -113,7 +113,7 @@ public class KinesisDataFetcherTest extends TestLogger {
// FlinkKinesisConsumer is responsible for setting up the fetcher before it can be run;
// run the consumer until it reaches the point where the fetcher starts to run
- final DummyFlinkKafkaConsumer<String> consumer = new DummyFlinkKafkaConsumer<>(TestUtils.getStandardProperties(), fetcher, 1, 0);
+ final DummyFlinkKinesisConsumer<String> consumer = new DummyFlinkKinesisConsumer<>(TestUtils.getStandardProperties(), fetcher, 1, 0);
CheckedThread consumerThread = new CheckedThread() {
@Override
@@ -171,7 +171,7 @@ public class KinesisDataFetcherTest extends TestLogger {
subscribedStreamsToLastSeenShardIdsUnderTest,
FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(streamToShardCount));
- final DummyFlinkKafkaConsumer<String> consumer = new DummyFlinkKafkaConsumer<>(
+ final DummyFlinkKinesisConsumer<String> consumer = new DummyFlinkKinesisConsumer<>(
TestUtils.getStandardProperties(), fetcher, 1, 0);
CheckedThread consumerThread = new CheckedThread() {
@@ -609,7 +609,7 @@ public class KinesisDataFetcherTest extends TestLogger {
assertEquals(streamShardHandle, KinesisDataFetcher.convertToStreamShardHandle(kinesisStreamShard));
}
- private static class DummyFlinkKafkaConsumer<T> extends FlinkKinesisConsumer<T> {
+ private static class DummyFlinkKinesisConsumer<T> extends FlinkKinesisConsumer<T> {
private static final long serialVersionUID = 1L;
private final KinesisDataFetcher<T> fetcher;
@@ -618,7 +618,7 @@ public class KinesisDataFetcherTest extends TestLogger {
private final int subtaskIndex;
@SuppressWarnings("unchecked")
- DummyFlinkKafkaConsumer(
+ DummyFlinkKinesisConsumer(
Properties properties,
KinesisDataFetcher<T> fetcher,
int numParallelSubtasks,
[06/19] flink git commit: [hotfix][tests] Hide output from config.sh
Posted by ch...@apache.org.
[hotfix][tests] Hide output from config.sh
This closes #5763.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1eb2b3a5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1eb2b3a5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1eb2b3a5
Branch: refs/heads/release-1.5
Commit: 1eb2b3a58c3359bf9d891826b2a9117d36448221
Parents: cad474f
Author: zentol <ch...@apache.org>
Authored: Mon Mar 26 13:36:24 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 4 08:59:18 2018 +0200
----------------------------------------------------------------------
flink-dist/src/test/bin/calcTMHeapSizeMB.sh | 2 +-
flink-dist/src/test/bin/calcTMNetBufMem.sh | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1eb2b3a5/flink-dist/src/test/bin/calcTMHeapSizeMB.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/test/bin/calcTMHeapSizeMB.sh b/flink-dist/src/test/bin/calcTMHeapSizeMB.sh
index 3956643..d5b7742 100755
--- a/flink-dist/src/test/bin/calcTMHeapSizeMB.sh
+++ b/flink-dist/src/test/bin/calcTMHeapSizeMB.sh
@@ -37,6 +37,6 @@ if [[ -z "${FLINK_TM_MEM_MANAGED_FRACTION}" ]]; then
fi
FLINK_CONF_DIR=${bin}/../../main/resources
-. ${bin}/../../main/flink-bin/bin/config.sh
+. ${bin}/../../main/flink-bin/bin/config.sh > /dev/null
calculateTaskManagerHeapSizeMB
http://git-wip-us.apache.org/repos/asf/flink/blob/1eb2b3a5/flink-dist/src/test/bin/calcTMNetBufMem.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/test/bin/calcTMNetBufMem.sh b/flink-dist/src/test/bin/calcTMNetBufMem.sh
index 9948d9c..355a978 100755
--- a/flink-dist/src/test/bin/calcTMNetBufMem.sh
+++ b/flink-dist/src/test/bin/calcTMNetBufMem.sh
@@ -34,6 +34,6 @@ if [[ -z "${FLINK_TM_NET_BUF_MAX}" ]]; then
fi
FLINK_CONF_DIR=${bin}/../../main/resources
-. ${bin}/../../main/flink-bin/bin/config.sh
+. ${bin}/../../main/flink-bin/bin/config.sh > /dev/null
calculateNetworkBufferMemory
[19/19] flink git commit: [FLINK-8771][build] Upgrade scalastyle to
1.0.0
Posted by ch...@apache.org.
[FLINK-8771][build] Upgrade scalastyle to 1.0.0
This closes #5702.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c7d0f476
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c7d0f476
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c7d0f476
Branch: refs/heads/release-1.5
Commit: c7d0f4768070007dd4f4e0d6c48950552e852b6c
Parents: 897dc51
Author: Bowen Li <bo...@gmail.com>
Authored: Wed Mar 14 17:26:18 2018 -0700
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 4 14:26:56 2018 +0200
----------------------------------------------------------------------
.../runtime/jobmanager/SlotSharingITCase.scala | 32 +++++++++----------
.../TaskManagerFailsWithSlotSharingITCase.scala | 33 ++++++++++----------
.../taskmanager/TaskManagerFailsITCase.scala | 32 +++++++++----------
pom.xml | 2 +-
4 files changed, 49 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c7d0f476/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
index 4fffd68..1c26901 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
@@ -1,20 +1,20 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.flink.runtime.jobmanager
http://git-wip-us.apache.org/repos/asf/flink/blob/c7d0f476/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
index 9775d33..e2702c7 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
@@ -1,26 +1,25 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.flink.runtime.jobmanager
import akka.actor.{ActorSystem, Kill, PoisonPill}
import akka.testkit.{ImplicitSender, TestKit}
-import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.runtime.akka.ListeningBehaviour
import org.apache.flink.runtime.client.JobExecutionException
import org.apache.flink.runtime.io.network.partition.ResultPartitionType
http://git-wip-us.apache.org/repos/asf/flink/blob/c7d0f476/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
index 61cb8cc..a065e5b 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
@@ -1,20 +1,20 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.flink.api.scala.runtime.taskmanager
http://git-wip-us.apache.org/repos/asf/flink/blob/c7d0f476/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7918345..49a0a56 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1429,7 +1429,7 @@ under the License.
<plugin>
<groupId>org.scalastyle</groupId>
<artifactId>scalastyle-maven-plugin</artifactId>
- <version>0.8.0</version>
+ <version>1.0.0</version>
<executions>
<execution>
<goals>
[10/19] flink git commit: [FLINK-8704][tests] Port
PartialConsumerPipelinedResultTest
Posted by ch...@apache.org.
[FLINK-8704][tests] Port PartialConsumerPipelinedResultTest
This closes #5695.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fc1244eb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fc1244eb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fc1244eb
Branch: refs/heads/release-1.5
Commit: fc1244eb9e8197bb0437aaa210f0ecaaa42af98c
Parents: 40b9a63
Author: zentol <ch...@apache.org>
Authored: Mon Mar 12 13:09:44 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 4 14:26:46 2018 +0200
----------------------------------------------------------------------
...LegacyPartialConsumePipelinedResultTest.java | 150 +++++++++++++++++++
.../PartialConsumePipelinedResultTest.java | 33 ++--
2 files changed, 171 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fc1244eb/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/LegacyPartialConsumePipelinedResultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/LegacyPartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/LegacyPartialConsumePipelinedResultTest.java
new file mode 100644
index 0000000..aecab75
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/LegacyPartialConsumePipelinedResultTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class LegacyPartialConsumePipelinedResultTest extends TestLogger {
+
+ // Test configuration
+ private final static int NUMBER_OF_TMS = 1;
+ private final static int NUMBER_OF_SLOTS_PER_TM = 1;
+ private final static int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
+
+ private final static int NUMBER_OF_NETWORK_BUFFERS = 128;
+
+ private static TestingCluster flink;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ final Configuration config = new Configuration();
+ config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUMBER_OF_TMS);
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUMBER_OF_SLOTS_PER_TM);
+ config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
+ config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, NUMBER_OF_NETWORK_BUFFERS);
+
+ flink = new TestingCluster(config, true);
+
+ flink.start();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ flink.stop();
+ }
+
+ /**
+ * Tests a fix for FLINK-1930.
+ *
+ * <p> When consuming a pipelined result only partially, is is possible that local channels
+ * release the buffer pool, which is associated with the result partition, too early. If the
+ * producer is still producing data when this happens, it runs into an IllegalStateException,
+ * because of the destroyed buffer pool.
+ *
+ * @see <a href="https://issues.apache.org/jira/browse/FLINK-1930">FLINK-1930</a>
+ */
+ @Test
+ public void testPartialConsumePipelinedResultReceiver() throws Exception {
+ final JobVertex sender = new JobVertex("Sender");
+ sender.setInvokableClass(SlowBufferSender.class);
+ sender.setParallelism(PARALLELISM);
+
+ final JobVertex receiver = new JobVertex("Receiver");
+ receiver.setInvokableClass(SingleBufferReceiver.class);
+ receiver.setParallelism(PARALLELISM);
+
+ // The partition needs to be pipelined, otherwise the original issue does not occur, because
+ // the sender and receiver are not online at the same time.
+ receiver.connectNewDataSetAsInput(
+ sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+
+ final JobGraph jobGraph = new JobGraph("Partial Consume of Pipelined Result", sender, receiver);
+
+ final SlotSharingGroup slotSharingGroup = new SlotSharingGroup(
+ sender.getID(), receiver.getID());
+
+ sender.setSlotSharingGroup(slotSharingGroup);
+ receiver.setSlotSharingGroup(slotSharingGroup);
+
+ flink.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION());
+ }
+
+ // ---------------------------------------------------------------------------------------------
+
+ /**
+ * Sends a fixed number of buffers and sleeps in-between sends.
+ */
+ public static class SlowBufferSender extends AbstractInvokable {
+
+ public SlowBufferSender(Environment environment) {
+ super(environment);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ final ResultPartitionWriter writer = getEnvironment().getWriter(0);
+
+ for (int i = 0; i < 8; i++) {
+ final BufferBuilder bufferBuilder = writer.getBufferProvider().requestBufferBuilderBlocking();
+ writer.addBufferConsumer(bufferBuilder.createBufferConsumer(), 0);
+ Thread.sleep(50);
+ bufferBuilder.finish();
+ }
+ }
+ }
+
+ /**
+ * Reads a single buffer and recycles it.
+ */
+ public static class SingleBufferReceiver extends AbstractInvokable {
+
+ public SingleBufferReceiver(Environment environment) {
+ super(environment);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ InputGate gate = getEnvironment().getInputGate(0);
+ Buffer buffer = gate.getNextBufferOrEvent().orElseThrow(IllegalStateException::new).getBuffer();
+ if (buffer != null) {
+ buffer.recycleBuffer();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fc1244eb/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index ced1a33..7169796 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.execution.Environment;
@@ -32,41 +31,51 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.testutils.category.New;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
+@Category(New.class)
public class PartialConsumePipelinedResultTest extends TestLogger {
// Test configuration
- private final static int NUMBER_OF_TMS = 1;
- private final static int NUMBER_OF_SLOTS_PER_TM = 1;
- private final static int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
+ private static final int NUMBER_OF_TMS = 1;
+ private static final int NUMBER_OF_SLOTS_PER_TM = 1;
+ private static final int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
- private final static int NUMBER_OF_NETWORK_BUFFERS = 128;
+ private static final int NUMBER_OF_NETWORK_BUFFERS = 128;
- private static TestingCluster flink;
+ private static MiniCluster flink;
@BeforeClass
public static void setUp() throws Exception {
final Configuration config = new Configuration();
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUMBER_OF_TMS);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUMBER_OF_SLOTS_PER_TM);
config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, NUMBER_OF_NETWORK_BUFFERS);
- flink = new TestingCluster(config, true);
+ final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
+ .setConfiguration(config)
+ .setNumTaskManagers(NUMBER_OF_TMS)
+ .setNumSlotsPerTaskManager(NUMBER_OF_SLOTS_PER_TM)
+ .build();
+
+ flink = new MiniCluster(miniClusterConfiguration);
flink.start();
}
@AfterClass
public static void tearDown() throws Exception {
- flink.stop();
+ if (flink != null) {
+ flink.close();
+ }
}
/**
@@ -102,7 +111,7 @@ public class PartialConsumePipelinedResultTest extends TestLogger {
sender.setSlotSharingGroup(slotSharingGroup);
receiver.setSlotSharingGroup(slotSharingGroup);
- flink.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION());
+ flink.executeJobBlocking(jobGraph);
}
// ---------------------------------------------------------------------------------------------
[08/19] flink git commit: [FLINK-8704][tests] Port
ScheduleOrUpdateConsumersTest
Posted by ch...@apache.org.
[FLINK-8704][tests] Port ScheduleOrUpdateConsumersTest
This closes #5697.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/15e44496
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/15e44496
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/15e44496
Branch: refs/heads/release-1.5
Commit: 15e44496e8af0f07e107e38bb8c2d1c3bb399511
Parents: 1c98e36
Author: zentol <ch...@apache.org>
Authored: Mon Mar 12 13:21:01 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 4 14:26:45 2018 +0200
----------------------------------------------------------------------
.../LegacyScheduleOrUpdateConsumersTest.java | 168 +++++++++++++++++++
.../ScheduleOrUpdateConsumersTest.java | 34 +++-
2 files changed, 193 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/15e44496/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/LegacyScheduleOrUpdateConsumersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/LegacyScheduleOrUpdateConsumersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/LegacyScheduleOrUpdateConsumersTest.java
new file mode 100644
index 0000000..846901a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/LegacyScheduleOrUpdateConsumersTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest.SubtaskIndexReceiver.CONFIG_KEY;
+
+public class LegacyScheduleOrUpdateConsumersTest extends TestLogger {
+
+ private static final int NUMBER_OF_TMS = 2;
+ private static final int NUMBER_OF_SLOTS_PER_TM = 2;
+ private static final int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
+
+ private static TestingCluster flink;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ flink = TestingUtils.startTestingCluster(
+ NUMBER_OF_SLOTS_PER_TM,
+ NUMBER_OF_TMS,
+ TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ flink.stop();
+ }
+
+ /**
+ * Tests notifications of multiple receivers when a task produces both a pipelined and blocking
+ * result.
+ *
+ * <pre>
+ * +----------+
+ * +-- pipelined -> | Receiver |
+ * +--------+ | +----------+
+ * | Sender |-|
+ * +--------+ | +----------+
+ * +-- blocking --> | Receiver |
+ * +----------+
+ * </pre>
+ *
+ * The pipelined receiver gets deployed after the first buffer is available and the blocking
+ * one after all subtasks are finished.
+ */
+ @Test
+ public void testMixedPipelinedAndBlockingResults() throws Exception {
+ final JobVertex sender = new JobVertex("Sender");
+ sender.setInvokableClass(BinaryRoundRobinSubtaskIndexSender.class);
+ sender.getConfiguration().setInteger(BinaryRoundRobinSubtaskIndexSender.CONFIG_KEY, PARALLELISM);
+ sender.setParallelism(PARALLELISM);
+
+ final JobVertex pipelinedReceiver = new JobVertex("Pipelined Receiver");
+ pipelinedReceiver.setInvokableClass(SlotCountExceedingParallelismTest.SubtaskIndexReceiver.class);
+ pipelinedReceiver.getConfiguration().setInteger(CONFIG_KEY, PARALLELISM);
+ pipelinedReceiver.setParallelism(PARALLELISM);
+
+ pipelinedReceiver.connectNewDataSetAsInput(
+ sender,
+ DistributionPattern.ALL_TO_ALL,
+ ResultPartitionType.PIPELINED);
+
+ final JobVertex blockingReceiver = new JobVertex("Blocking Receiver");
+ blockingReceiver.setInvokableClass(SlotCountExceedingParallelismTest.SubtaskIndexReceiver.class);
+ blockingReceiver.getConfiguration().setInteger(CONFIG_KEY, PARALLELISM);
+ blockingReceiver.setParallelism(PARALLELISM);
+
+ blockingReceiver.connectNewDataSetAsInput(sender,
+ DistributionPattern.ALL_TO_ALL,
+ ResultPartitionType.BLOCKING);
+
+ SlotSharingGroup slotSharingGroup = new SlotSharingGroup(
+ sender.getID(), pipelinedReceiver.getID(), blockingReceiver.getID());
+
+ sender.setSlotSharingGroup(slotSharingGroup);
+ pipelinedReceiver.setSlotSharingGroup(slotSharingGroup);
+ blockingReceiver.setSlotSharingGroup(slotSharingGroup);
+
+ final JobGraph jobGraph = new JobGraph(
+ "Mixed pipelined and blocking result",
+ sender,
+ pipelinedReceiver,
+ blockingReceiver);
+
+ flink.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION());
+ }
+
+ // ---------------------------------------------------------------------------------------------
+
+ public static class BinaryRoundRobinSubtaskIndexSender extends AbstractInvokable {
+
+ public static final String CONFIG_KEY = "number-of-times-to-send";
+
+ public BinaryRoundRobinSubtaskIndexSender(Environment environment) {
+ super(environment);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ List<RecordWriter<IntValue>> writers = Lists.newArrayListWithCapacity(2);
+
+ // The order of intermediate result creation in the job graph specifies which produced
+ // result partition is pipelined/blocking.
+ final RecordWriter<IntValue> pipelinedWriter =
+ new RecordWriter<>(getEnvironment().getWriter(0));
+
+ final RecordWriter<IntValue> blockingWriter =
+ new RecordWriter<>(getEnvironment().getWriter(1));
+
+ writers.add(pipelinedWriter);
+ writers.add(blockingWriter);
+
+ final int numberOfTimesToSend = getTaskConfiguration().getInteger(CONFIG_KEY, 0);
+
+ final IntValue subtaskIndex = new IntValue(
+ getEnvironment().getTaskInfo().getIndexOfThisSubtask());
+
+ // Produce the first intermediate result and then the second in a serial fashion.
+ for (RecordWriter<IntValue> writer : writers) {
+ try {
+ for (int i = 0; i < numberOfTimesToSend; i++) {
+ writer.emit(subtaskIndex);
+ }
+ writer.flushAll();
+ }
+ finally {
+ writer.clearBuffers();
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/15e44496/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
index f55dfe4..e16c3ed 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
@@ -18,16 +18,20 @@
package org.apache.flink.runtime.jobmanager.scheduler;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.testutils.category.New;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.TestLogger;
@@ -36,30 +40,42 @@ import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import java.util.List;
import static org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest.SubtaskIndexReceiver.CONFIG_KEY;
+@Category(New.class)
public class ScheduleOrUpdateConsumersTest extends TestLogger {
private static final int NUMBER_OF_TMS = 2;
private static final int NUMBER_OF_SLOTS_PER_TM = 2;
private static final int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
- private static TestingCluster flink;
+ private static MiniCluster flink;
@BeforeClass
public static void setUp() throws Exception {
- flink = TestingUtils.startTestingCluster(
- NUMBER_OF_SLOTS_PER_TM,
- NUMBER_OF_TMS,
- TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
+ final Configuration config = new Configuration();
+ config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
+
+ final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
+ .setConfiguration(config)
+ .setNumTaskManagers(NUMBER_OF_TMS)
+ .setNumSlotsPerTaskManager(NUMBER_OF_SLOTS_PER_TM)
+ .build();
+
+ flink = new MiniCluster(miniClusterConfiguration);
+
+ flink.start();
}
@AfterClass
public static void tearDown() throws Exception {
- flink.stop();
+ if (flink != null) {
+ flink.close();
+ }
}
/**
@@ -118,7 +134,7 @@ public class ScheduleOrUpdateConsumersTest extends TestLogger {
pipelinedReceiver,
blockingReceiver);
- flink.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION());
+ flink.executeJobBlocking(jobGraph);
}
// ---------------------------------------------------------------------------------------------
[13/19] flink git commit: [FLINK-8966][tests] Port
AvroExternalJarProgramITCase to flip6
Posted by ch...@apache.org.
[FLINK-8966][tests] Port AvroExternalJarProgramITCase to flip6
This closes #5766.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cf03f9e9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cf03f9e9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cf03f9e9
Branch: refs/heads/release-1.5
Commit: cf03f9e90353b7658cb822987d4c327662d6bfe5
Parents: 7131d61
Author: zentol <ch...@apache.org>
Authored: Tue Apr 3 11:20:12 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 4 14:26:50 2018 +0200
----------------------------------------------------------------------
.../avro/AvroExternalJarProgramITCase.java | 75 +++++++---------
.../LegacyAvroExternalJarProgramITCase.java | 92 ++++++++++++++++++++
2 files changed, 124 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/cf03f9e9/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java
index 985471a..6766947 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java
@@ -19,74 +19,63 @@
package org.apache.flink.formats.avro;
import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.testjar.AvroExternalJarProgram;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.test.util.TestEnvironment;
+import org.apache.flink.testutils.category.New;
import org.apache.flink.util.TestLogger;
-import org.junit.Assert;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import java.io.File;
-import java.net.URL;
import java.util.Collections;
/**
* IT case for the {@link AvroExternalJarProgram}.
*/
+@Category(New.class)
public class AvroExternalJarProgramITCase extends TestLogger {
private static final String JAR_FILE = "maven-test-jar.jar";
private static final String TEST_DATA_FILE = "/testdata.avro";
- @Test
- public void testExternalProgram() {
-
- LocalFlinkMiniCluster testMiniCluster = null;
+ private static final int PARALLELISM = 4;
- try {
- int parallelism = 4;
- Configuration config = new Configuration();
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
- testMiniCluster = new LocalFlinkMiniCluster(config, false);
- testMiniCluster.start();
+ private static final MiniCluster MINI_CLUSTER = new MiniCluster(
+ new MiniClusterConfiguration.Builder()
+ .setNumTaskManagers(1)
+ .setNumSlotsPerTaskManager(PARALLELISM)
+ .build());
- String jarFile = JAR_FILE;
- String testData = getClass().getResource(TEST_DATA_FILE).toString();
+ @BeforeClass
+ public static void setUp() throws Exception {
+ MINI_CLUSTER.start();
+ }
- PackagedProgram program = new PackagedProgram(new File(jarFile), new String[] { testData });
+ @AfterClass
+ public static void tearDown() {
+ TestEnvironment.unsetAsContext();
+ MINI_CLUSTER.closeAsync();
+ }
- TestEnvironment.setAsContext(
- testMiniCluster,
- parallelism,
- Collections.singleton(new Path(jarFile)),
- Collections.<URL>emptyList());
+ @Test
+ public void testExternalProgram() throws Exception {
+ TestEnvironment.setAsContext(
+ MINI_CLUSTER,
+ PARALLELISM,
+ Collections.singleton(new Path(JAR_FILE)),
+ Collections.emptyList());
- config.setString(JobManagerOptions.ADDRESS, "localhost");
- config.setInteger(JobManagerOptions.PORT, testMiniCluster.getLeaderRPCPort());
+ String testData = getClass().getResource(TEST_DATA_FILE).toString();
- program.invokeInteractiveModeForExecution();
- }
- catch (Throwable t) {
- System.err.println(t.getMessage());
- t.printStackTrace();
- Assert.fail("Error during the packaged program execution: " + t.getMessage());
- }
- finally {
- TestEnvironment.unsetAsContext();
+ PackagedProgram program = new PackagedProgram(new File(JAR_FILE), new String[]{testData});
- if (testMiniCluster != null) {
- try {
- testMiniCluster.stop();
- } catch (Throwable t) {
- // ignore
- }
- }
- }
+ program.invokeInteractiveModeForExecution();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf03f9e9/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/LegacyAvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/LegacyAvroExternalJarProgramITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/LegacyAvroExternalJarProgramITCase.java
new file mode 100644
index 0000000..1dd56a7
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/LegacyAvroExternalJarProgramITCase.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.avro.testjar.AvroExternalJarProgram;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.test.util.TestEnvironment;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.net.URL;
+import java.util.Collections;
+
+/**
+ * IT case for the {@link AvroExternalJarProgram}.
+ */
+public class LegacyAvroExternalJarProgramITCase extends TestLogger {
+
+ private static final String JAR_FILE = "maven-test-jar.jar";
+
+ private static final String TEST_DATA_FILE = "/testdata.avro";
+
+ @Test
+ public void testExternalProgram() {
+
+ LocalFlinkMiniCluster testMiniCluster = null;
+
+ try {
+ int parallelism = 4;
+ Configuration config = new Configuration();
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
+ testMiniCluster = new LocalFlinkMiniCluster(config, false);
+ testMiniCluster.start();
+
+ String jarFile = JAR_FILE;
+ String testData = getClass().getResource(TEST_DATA_FILE).toString();
+
+ PackagedProgram program = new PackagedProgram(new File(jarFile), new String[] { testData });
+
+ TestEnvironment.setAsContext(
+ testMiniCluster,
+ parallelism,
+ Collections.singleton(new Path(jarFile)),
+ Collections.<URL>emptyList());
+
+ config.setString(JobManagerOptions.ADDRESS, "localhost");
+ config.setInteger(JobManagerOptions.PORT, testMiniCluster.getLeaderRPCPort());
+
+ program.invokeInteractiveModeForExecution();
+ }
+ catch (Throwable t) {
+ System.err.println(t.getMessage());
+ t.printStackTrace();
+ Assert.fail("Error during the packaged program execution: " + t.getMessage());
+ }
+ finally {
+ TestEnvironment.unsetAsContext();
+
+ if (testMiniCluster != null) {
+ try {
+ testMiniCluster.stop();
+ } catch (Throwable t) {
+ // ignore
+ }
+ }
+ }
+ }
+}
[14/19] flink git commit: [FLINK-9104][docs] Update generator and
regenerate REST API docs
Posted by ch...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/f8172675/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
----------------------------------------------------------------------
diff --git a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
index be5f677..8ece7b1 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
@@ -20,6 +20,7 @@ package org.apache.flink.docs.rest;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
@@ -43,6 +44,9 @@ import org.apache.flink.util.ConfigurationException;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.SerializableString;
+import com.fasterxml.jackson.core.io.CharacterEscapes;
+import com.fasterxml.jackson.core.io.SerializedString;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator;
@@ -56,7 +60,9 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
@@ -99,6 +105,7 @@ public class RestAPIDocGenerator {
static {
mapper = new ObjectMapper();
+ mapper.getFactory().setCharacterEscapes(new HTMLCharacterEscapes());
schemaGen = new JsonSchemaGenerator(mapper);
}
@@ -259,6 +266,39 @@ public class RestAPIDocGenerator {
}
/**
+ * Create character escapes for HTML when generating JSON request/response string.
+ *
+ * <p>This is to avoid exception when generating JSON with Field schema contains generic types.
+ */
+ private static class HTMLCharacterEscapes extends CharacterEscapes {
+ private final int[] asciiEscapes;
+ private final Map<Integer, SerializableString> escapeSequences;
+
+ public HTMLCharacterEscapes() {
+ int[] esc = CharacterEscapes.standardAsciiEscapesForJSON();
+ esc['<'] = CharacterEscapes.ESCAPE_CUSTOM;
+ esc['>'] = CharacterEscapes.ESCAPE_CUSTOM;
+ esc['&'] = CharacterEscapes.ESCAPE_CUSTOM;
+ Map<Integer, SerializableString> escMap = new HashMap<>();
+ escMap.put((int) '<', new SerializedString("<"));
+ escMap.put((int) '>', new SerializedString(">"));
+ escMap.put((int) '&', new SerializedString("&"));
+ asciiEscapes = esc;
+ escapeSequences = escMap;
+ }
+
+ @Override
+ public int[] getEscapeCodesForAscii() {
+ return asciiEscapes;
+ }
+
+ @Override
+ public SerializableString getEscapeSequence(int i) {
+ return escapeSequences.getOrDefault(i, null);
+ }
+ }
+
+ /**
* Utility class to extract the {@link MessageHeaders} that the {@link DispatcherRestEndpoint} supports.
*/
private static class DocumentingDispatcherRestEndpoint extends DispatcherRestEndpoint implements DocumentingRestEndpoint {
@@ -273,6 +313,7 @@ public class RestAPIDocGenerator {
static {
config = new Configuration();
+ config.setString(RestOptions.REST_ADDRESS, "localhost");
try {
restConfig = RestServerEndpointConfiguration.fromConfiguration(config);
} catch (ConfigurationException e) {
[12/19] flink git commit: [FLINK-8966][tests] Upload user-jars in
MiniCluster
Posted by ch...@apache.org.
[FLINK-8966][tests] Upload user-jars in MiniCluster
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7131d610
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7131d610
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7131d610
Branch: refs/heads/release-1.5
Commit: 7131d610928b0c9329254fbaa30b1252035c69b1
Parents: 491aceb
Author: zentol <ch...@apache.org>
Authored: Wed Mar 14 12:49:11 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 4 14:26:48 2018 +0200
----------------------------------------------------------------------
.../flink/runtime/minicluster/MiniCluster.java | 38 +++++++++++++++++++-
1 file changed, 37 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7131d610/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 2e826eb..64d46c6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -25,9 +25,12 @@ import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobCacheService;
+import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
@@ -86,8 +89,10 @@ import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
@@ -632,7 +637,10 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
// from the ResourceManager
jobGraph.setAllowQueuedScheduling(true);
- final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = dispatcherGateway.submitJob(jobGraph, rpcTimeout);
+ final CompletableFuture<Void> jarUploadFuture = uploadAndSetJarFiles(dispatcherGateway, jobGraph);
+
+ final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture.thenCompose(
+ (Void ack) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout));
return acknowledgeCompletableFuture.thenApply(
(Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID()));
@@ -661,6 +669,34 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
}
}
+ private CompletableFuture<Void> uploadAndSetJarFiles(final DispatcherGateway currentDispatcherGateway, final JobGraph job) {
+ List<Path> userJars = job.getUserJars();
+ if (!userJars.isEmpty()) {
+ CompletableFuture<List<PermanentBlobKey>> jarUploadFuture = uploadJarFiles(currentDispatcherGateway, job.getJobID(), job.getUserJars());
+ return jarUploadFuture.thenAccept(blobKeys -> {
+ for (PermanentBlobKey blobKey : blobKeys) {
+ job.addBlob(blobKey);
+ }
+ });
+ } else {
+ LOG.debug("No jars to upload for job {}.", job.getJobID());
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+
+ private CompletableFuture<List<PermanentBlobKey>> uploadJarFiles(final DispatcherGateway currentDispatcherGateway, final JobID jobId, final List<Path> jars) {
+ return currentDispatcherGateway.getBlobServerPort(rpcTimeout)
+ .thenApply(blobServerPort -> {
+ InetSocketAddress blobServerAddress = new InetSocketAddress(currentDispatcherGateway.getHostname(), blobServerPort);
+
+ try {
+ return BlobClient.uploadJarFiles(blobServerAddress, miniClusterConfiguration.getConfiguration(), jobId, jars);
+ } catch (IOException ioe) {
+ throw new CompletionException(new FlinkException("Could not upload job jar files.", ioe));
+ }
+ });
+ }
+
// ------------------------------------------------------------------------
// factories - can be overridden by subclasses to alter behavior
// ------------------------------------------------------------------------
[15/19] flink git commit: [FLINK-9104][docs] Update generator and
regenerate REST API docs
Posted by ch...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/f8172675/docs/_includes/generated/rest_dispatcher.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/rest_dispatcher.html b/docs/_includes/generated/rest_dispatcher.html
index 25bdff1..e02c232 100644
--- a/docs/_includes/generated/rest_dispatcher.html
+++ b/docs/_includes/generated/rest_dispatcher.html
@@ -12,8 +12,8 @@
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#-1881098330">Request</button>
- <div id="-1881098330" class="collapse">
+ <button data-toggle="collapse" data-target="#1124793457">Request</button>
+ <div id="1124793457" class="collapse">
<pre>
<code>
{} </code>
@@ -23,18 +23,18 @@
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#-574888474">Response</button>
- <div id="-574888474" class="collapse">
+ <button data-toggle="collapse" data-target="#-1863963983">Response</button>
+ <div id="-1863963983" class="collapse">
<pre>
<code>
-{
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:BlobServerPortResponseBody",
- "properties" : {
- "port" : {
- "type" : "integer"
- }
- }
+{
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:BlobServerPortResponseBody",
+ "properties" : {
+ "port" : {
+ "type" : "integer"
+ }
+ }
} </code>
</pre>
</div>
@@ -56,8 +56,8 @@
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#-1357269034">Request</button>
- <div id="-1357269034" class="collapse">
+ <button data-toggle="collapse" data-target="#1648622753">Request</button>
+ <div id="1648622753" class="collapse">
<pre>
<code>
{} </code>
@@ -67,30 +67,30 @@
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#-642531372">Response</button>
- <div id="-642531372" class="collapse">
+ <button data-toggle="collapse" data-target="#-1931606881">Response</button>
+ <div id="-1931606881" class="collapse">
<pre>
<code>
-{
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:DashboardConfiguration",
- "properties" : {
- "refreshInterval" : {
- "type" : "integer"
- },
- "timeZoneName" : {
- "type" : "string"
- },
- "timeZoneOffset" : {
- "type" : "integer"
- },
- "flinkVersion" : {
- "type" : "string"
- },
- "flinkRevision" : {
- "type" : "string"
- }
- }
+{
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:DashboardConfiguration",
+ "properties" : {
+ "refreshInterval" : {
+ "type" : "integer"
+ },
+ "timeZoneName" : {
+ "type" : "string"
+ },
+ "timeZoneOffset" : {
+ "type" : "integer"
+ },
+ "flinkVersion" : {
+ "type" : "string"
+ },
+ "flinkRevision" : {
+ "type" : "string"
+ }
+ }
} </code>
</pre>
</div>
@@ -112,8 +112,8 @@
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#-1353094089">Request</button>
- <div id="-1353094089" class="collapse">
+ <button data-toggle="collapse" data-target="#1652797698">Request</button>
+ <div id="1652797698" class="collapse">
<pre>
<code>
{} </code>
@@ -123,24 +123,24 @@
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#-1672532771">Response</button>
- <div id="-1672532771" class="collapse">
+ <button data-toggle="collapse" data-target="#1333359016">Response</button>
+ <div id="1333359016" class="collapse">
<pre>
<code>
-{
- "type" : "array",
- "items" : {
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ClusterConfigurationInfoEntry",
- "properties" : {
- "key" : {
- "type" : "string"
- },
- "value" : {
- "type" : "string"
- }
- }
- }
+{
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ClusterConfigurationInfoEntry",
+ "properties" : {
+ "key" : {
+ "type" : "string"
+ },
+ "value" : {
+ "type" : "string"
+ }
+ }
+ }
} </code>
</pre>
</div>
@@ -172,8 +172,8 @@
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#-1667487464">Request</button>
- <div id="-1667487464" class="collapse">
+ <button data-toggle="collapse" data-target="#1338404323">Request</button>
+ <div id="1338404323" class="collapse">
<pre>
<code>
{} </code>
@@ -183,30 +183,30 @@
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#-1356759103">Response</button>
- <div id="-1356759103" class="collapse">
+ <button data-toggle="collapse" data-target="#1649132684">Response</button>
+ <div id="1649132684" class="collapse">
<pre>
<code>
-{
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:MetricCollectionResponseBody",
- "properties" : {
- "metrics" : {
- "type" : "array",
- "items" : {
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:Metric",
- "properties" : {
- "id" : {
- "type" : "string"
- },
- "value" : {
- "type" : "string"
- }
- }
- }
- }
- }
+{
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:MetricCollectionResponseBody",
+ "properties" : {
+ "metrics" : {
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:Metric",
+ "properties" : {
+ "id" : {
+ "type" : "string"
+ },
+ "value" : {
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }
} </code>
</pre>
</div>
@@ -228,8 +228,8 @@
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#1281196202">Request</button>
- <div id="1281196202" class="collapse">
+ <button data-toggle="collapse" data-target="#-7879307">Request</button>
+ <div id="-7879307" class="collapse">
<pre>
<code>
{} </code>
@@ -239,46 +239,46 @@
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#1630160880">Response</button>
- <div id="1630160880" class="collapse">
- <pre>
- <code>
-{
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:messages:webmonitor:JobIdsWithStatusOverview",
- "properties" : {
- "jobsWithStatus" : {
- "type" : "array",
- "items" : {
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:messages:webmonitor:JobIdsWithStatusOverview:JobIdWithStatus",
- "properties" : {
- "jobId" : {
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:api:common:JobID",
- "properties" : {
- "upperPart" : {
- "type" : "integer"
- },
- "lowerPart" : {
- "type" : "integer"
- },
- "bytes" : {
- "type" : "array",
- "items" : {
- "type" : "integer"
- }
- }
- }
- },
- "jobStatus" : {
- "type" : "string",
- "enum" : [ "CREATED", "RUNNING", "FAILING", "FAILED", "CANCELLING", "CANCELED", "FINISHED", "RESTARTING", "SUSPENDED", "RECONCILING" ]
- }
- }
- }
- }
- }
+ <button data-toggle="collapse" data-target="#341085371">Response</button>
+ <div id="341085371" class="collapse">
+ <pre>
+ <code>
+{
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:messages:webmonitor:JobIdsWithStatusOverview",
+ "properties" : {
+ "jobsWithStatus" : {
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:messages:webmonitor:JobIdsWithStatusOverview:JobIdWithStatus",
+ "properties" : {
+ "jobId" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:api:common:JobID",
+ "properties" : {
+ "upperPart" : {
+ "type" : "integer"
+ },
+ "lowerPart" : {
+ "type" : "integer"
+ },
+ "bytes" : {
+ "type" : "array",
+ "items" : {
+ "type" : "integer"
+ }
+ }
+ }
+ },
+ "jobStatus" : {
+ "type" : "string",
+ "enum" : [ "CREATED", "RUNNING", "FAILING", "FAILED", "CANCELLING", "CANCELED", "FINISHED", "RESTARTING", "SUSPENDING", "SUSPENDED", "RECONCILING" ]
+ }
+ }
+ }
+ }
+ }
} </code>
</pre>
</div>
@@ -300,21 +300,21 @@
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#1710504926">Request</button>
- <div id="1710504926" class="collapse">
+ <button data-toggle="collapse" data-target="#-1188986258">Request</button>
+ <div id="-1188986258" class="collapse">
<pre>
<code>
-{
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobSubmitRequestBody",
- "properties" : {
- "serializedJobGraph" : {
- "type" : "array",
- "items" : {
- "type" : "integer"
- }
- }
- }
+{
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobSubmitRequestBody",
+ "properties" : {
+ "serializedJobGraph" : {
+ "type" : "array",
+ "items" : {
+ "type" : "integer"
+ }
+ }
+ }
} </code>
</pre>
</div>
@@ -322,18 +322,18 @@
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#-476159276">Response</button>
- <div id="-476159276" class="collapse">
+ <button data-toggle="collapse" data-target="#919316836">Response</button>
+ <div id="919316836" class="collapse">
<pre>
<code>
-{
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobSubmitResponseBody",
- "properties" : {
- "jobUrl" : {
- "type" : "string"
- }
- }
+{
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobSubmitResponseBody",
+ "properties" : {
+ "jobUrl" : {
+ "type" : "string"
+ }
+ }
} </code>
</pre>
</div>
@@ -355,8 +355,114 @@
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#-297252120">Request</button>
- <div id="-297252120" class="collapse">
+ <button data-toggle="collapse" data-target="#-1586327629">Request</button>
+ <div id="-1586327629" class="collapse">
+ <pre>
+ <code>
+{} </code>
+ </pre>
+ </div>
+ </td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <button data-toggle="collapse" data-target="#-1663567735">Response</button>
+ <div id="-1663567735" class="collapse">
+ <pre>
+ <code>
+{
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:messages:webmonitor:MultipleJobsDetails",
+ "properties" : {
+ "jobs" : {
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:messages:webmonitor:JobDetails",
+ "properties" : {
+ "jobId" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:api:common:JobID",
+ "properties" : {
+ "upperPart" : {
+ "type" : "integer"
+ },
+ "lowerPart" : {
+ "type" : "integer"
+ },
+ "bytes" : {
+ "type" : "array",
+ "items" : {
+ "type" : "integer"
+ }
+ }
+ }
+ },
+ "jobName" : {
+ "type" : "string"
+ },
+ "startTime" : {
+ "type" : "integer"
+ },
+ "endTime" : {
+ "type" : "integer"
+ },
+ "duration" : {
+ "type" : "integer"
+ },
+ "status" : {
+ "type" : "string",
+ "enum" : [ "CREATED", "RUNNING", "FAILING", "FAILED", "CANCELLING", "CANCELED", "FINISHED", "RESTARTING", "SUSPENDING", "SUSPENDED", "RECONCILING" ]
+ },
+ "lastUpdateTime" : {
+ "type" : "integer"
+ },
+ "tasksPerState" : {
+ "type" : "array",
+ "items" : {
+ "type" : "integer"
+ }
+ },
+ "numTasks" : {
+ "type" : "integer"
+ }
+ }
+ }
+ }
+ }
+} </code>
+ </pre>
+ </div>
+ </td>
+ </tr>
+ </tbody>
+</table>
+<table class="table table-bordered">
+ <tbody>
+ <tr>
+ <td class="text-left" colspan="2"><strong>/jobs/:jobid</strong></td>
+ </tr>
+ <tr>
+ <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
+ <td class="text-left">Response code: <code>200 OK</code></td>
+ </tr>
+ <tr>
+ <td colspan="2">description</td>
+ </tr>
+ <tr>
+ <td colspan="2">Path parameters</td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <ul>
+<li><code>jobid</code> - description</li>
+ </ul>
+ </td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <button data-toggle="collapse" data-target="#-2071441832">Request</button>
+ <div id="-2071441832" class="collapse">
<pre>
<code>
{} </code>
@@ -366,70 +472,152 @@
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#-374492226">Response</button>
- <div id="-374492226" class="collapse">
- <pre>
- <code>
-{
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:messages:webmonitor:MultipleJobsDetails",
- "properties" : {
- "jobs" : {
- "type" : "array",
- "items" : {
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:messages:webmonitor:JobDetails",
- "properties" : {
- "jobId" : {
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:api:common:JobID",
- "properties" : {
- "upperPart" : {
- "type" : "integer"
- },
- "lowerPart" : {
- "type" : "integer"
- },
- "bytes" : {
- "type" : "array",
- "items" : {
- "type" : "integer"
- }
- }
- }
- },
- "jobName" : {
- "type" : "string"
- },
- "startTime" : {
- "type" : "integer"
- },
- "endTime" : {
- "type" : "integer"
- },
- "duration" : {
- "type" : "integer"
- },
- "status" : {
- "type" : "string",
- "enum" : [ "CREATED", "RUNNING", "FAILING", "FAILED", "CANCELLING", "CANCELED", "FINISHED", "RESTARTING", "SUSPENDED", "RECONCILING" ]
- },
- "lastUpdateTime" : {
- "type" : "integer"
- },
- "tasksPerState" : {
- "type" : "array",
- "items" : {
- "type" : "integer"
- }
- },
- "numTasks" : {
- "type" : "integer"
- }
- }
- }
- }
- }
+ <button data-toggle="collapse" data-target="#998294776">Response</button>
+ <div id="998294776" class="collapse">
+ <pre>
+ <code>
+{
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobDetailsInfo",
+ "properties" : {
+ "jobId" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:api:common:JobID",
+ "properties" : {
+ "upperPart" : {
+ "type" : "integer"
+ },
+ "lowerPart" : {
+ "type" : "integer"
+ },
+ "bytes" : {
+ "type" : "array",
+ "items" : {
+ "type" : "integer"
+ }
+ }
+ }
+ },
+ "name" : {
+ "type" : "string"
+ },
+ "jobStatus" : {
+ "type" : "string",
+ "enum" : [ "CREATED", "RUNNING", "FAILING", "FAILED", "CANCELLING", "CANCELED", "FINISHED", "RESTARTING", "SUSPENDING", "SUSPENDED", "RECONCILING" ]
+ },
+ "startTime" : {
+ "type" : "integer"
+ },
+ "endTime" : {
+ "type" : "integer"
+ },
+ "duration" : {
+ "type" : "integer"
+ },
+ "now" : {
+ "type" : "integer"
+ },
+ "timestamps" : {
+ "type" : "object",
+ "additionalProperties" : {
+ "type" : "integer"
+ }
+ },
+ "jobVertexInfos" : {
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobDetailsInfo:JobVertexDetailsInfo",
+ "properties" : {
+ "jobVertexID" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:jobgraph:JobVertexID",
+ "properties" : {
+ "upperPart" : {
+ "type" : "integer"
+ },
+ "lowerPart" : {
+ "type" : "integer"
+ },
+ "bytes" : {
+ "type" : "array",
+ "items" : {
+ "type" : "integer"
+ }
+ }
+ }
+ },
+ "name" : {
+ "type" : "string"
+ },
+ "parallelism" : {
+ "type" : "integer"
+ },
+ "executionState" : {
+ "type" : "string",
+ "enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING" ]
+ },
+ "startTime" : {
+ "type" : "integer"
+ },
+ "endTime" : {
+ "type" : "integer"
+ },
+ "duration" : {
+ "type" : "integer"
+ },
+ "tasksPerState" : {
+ "type" : "object",
+ "additionalProperties" : {
+ "type" : "integer"
+ }
+ },
+ "jobVertexMetrics" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:IOMetricsInfo",
+ "properties" : {
+ "bytesRead" : {
+ "type" : "integer"
+ },
+ "bytesReadComplete" : {
+ "type" : "boolean"
+ },
+ "bytesWritten" : {
+ "type" : "integer"
+ },
+ "bytesWrittenComplete" : {
+ "type" : "boolean"
+ },
+ "recordsRead" : {
+ "type" : "integer"
+ },
+ "recordsReadComplete" : {
+ "type" : "boolean"
+ },
+ "recordsWritten" : {
+ "type" : "integer"
+ },
+ "recordsWrittenComplete" : {
+ "type" : "boolean"
+ }
+ }
+ }
+ }
+ }
+ },
+ "jobVerticesPerState" : {
+ "type" : "object",
+ "additionalProperties" : {
+ "type" : "integer"
+ }
+ },
+ "jsonPlan" : {
+ "type" : "string"
+ },
+ "stoppable" : {
+ "type" : "boolean"
+ }
+ }
} </code>
</pre>
</div>
@@ -471,8 +659,8 @@
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#1776493329">Request</button>
- <div id="1776493329" class="collapse">
+ <button data-toggle="collapse" data-target="#-1977276770">Request</button>
+ <div id="-1977276770" class="collapse">
<pre>
<code>
{} </code>
@@ -482,8 +670,8 @@
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#1961476153">Response</button>
- <div id="1961476153" class="collapse">
+ <button data-toggle="collapse" data-target="#-1792293946">Response</button>
+ <div id="-1792293946" class="collapse">
<pre>
<code>
{} </code>
@@ -496,7 +684,7 @@
<table class="table table-bordered">
<tbody>
<tr>
- <td class="text-left" colspan="2"><strong>/jobs/:jobid</strong></td>
+ <td class="text-left" colspan="2"><strong>/jobs/:jobid/accumulators</strong></td>
</tr>
<tr>
<td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
@@ -516,9 +704,19 @@
</td>
</tr>
<tr>
+ <td colspan="2">Query parameters</td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <ul>
+<li><code>includeSerializedValue</code> (optional): description</li>
+ </ul>
+ </td>
+ </tr>
+ <tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#-782366323">Request</button>
- <div id="-782366323" class="collapse">
+ <button data-toggle="collapse" data-target="#-2064509468">Request</button>
+ <div id="-2064509468" class="collapse">
<pre>
<code>
{} </code>
@@ -528,12 +726,54 @@
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#-2007597011">Response</button>
- <div id="-2007597011" class="collapse">
+ <button data-toggle="collapse" data-target="#-1744345868">Response</button>
+ <div id="-1744345868" class="collapse">
<pre>
<code>
-{
- "type" : "any"
+{
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobAccumulatorsInfo",
+ "properties" : {
+ "jobAccumulators" : {
+ "type" : "array",
+ "items" : {
+ "type" : "any"
+ }
+ },
+ "userAccumulators" : {
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobAccumulatorsInfo:UserTaskAccumulator",
+ "properties" : {
+ "name" : {
+ "type" : "string"
+ },
+ "type" : {
+ "type" : "string"
+ },
+ "value" : {
+ "type" : "string"
+ }
+ }
+ }
+ },
+ "serializedUserAccumulators" : {
+ "type" : "object",
+ "additionalProperties" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:util:SerializedValue<org:apache:flink:util:OptionalFailure<java:lang:Object>>",
+ "properties" : {
+ "byteArray" : {
+ "type" : "array",
+ "items" : {
+ "type" : "integer"
+ }
+ }
+ }
+ }
+ }
+ }
} </code>
</pre>
</div>
@@ -544,7 +784,7 @@
<table class="table table-bordered">
<tbody>
<tr>
- <td class="text-left" colspan="2"><strong>/jobs/:jobid/accumulators</strong></td>
+ <td class="text-left" colspan="2"><strong>/jobs/:jobid/checkpoints</strong></td>
</tr>
<tr>
<td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
@@ -565,8 +805,8 @@
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#-775433959">Request</button>
- <div id="-775433959" class="collapse">
+ <button data-toggle="collapse" data-target="#-1417778508">Request</button>
+ <div id="-1417778508" class="collapse">
<pre>
<code>
{} </code>
@@ -576,12 +816,267 @@
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#-455270359">Response</button>
- <div id="-455270359" class="collapse">
+ <button data-toggle="collapse" data-target="#-850872826">Response</button>
+ <div id="-850872826" class="collapse">
<pre>
<code>
-{
- "type" : "any"
+{
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointingStatistics",
+ "properties" : {
+ "counts" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointingStatistics:Counts",
+ "properties" : {
+ "numberRestoredCheckpoints" : {
+ "type" : "integer"
+ },
+ "totalNumberCheckpoints" : {
+ "type" : "integer"
+ },
+ "numberInProgressCheckpoints" : {
+ "type" : "integer"
+ },
+ "numberCompletedCheckpoints" : {
+ "type" : "integer"
+ },
+ "numberFailedCheckpoints" : {
+ "type" : "integer"
+ }
+ }
+ },
+ "summary" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointingStatistics:Summary",
+ "properties" : {
+ "stateSize" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics",
+ "properties" : {
+ "minimum" : {
+ "type" : "integer"
+ },
+ "maximum" : {
+ "type" : "integer"
+ },
+ "average" : {
+ "type" : "integer"
+ }
+ }
+ },
+ "duration" : {
+ "type" : "object",
+ "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
+ },
+ "alignmentBuffered" : {
+ "type" : "object",
+ "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
+ }
+ }
+ },
+ "latestCheckpoints" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointingStatistics:LatestCheckpoints",
+ "properties" : {
+ "completedCheckpointStatistics" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointStatistics:CompletedCheckpointStatistics",
+ "properties" : {
+ "id" : {
+ "type" : "integer"
+ },
+ "status" : {
+ "type" : "string",
+ "enum" : [ "IN_PROGRESS", "COMPLETED", "FAILED" ]
+ },
+ "savepoint" : {
+ "type" : "boolean"
+ },
+ "triggerTimestamp" : {
+ "type" : "integer"
+ },
+ "latestAckTimestamp" : {
+ "type" : "integer"
+ },
+ "stateSize" : {
+ "type" : "integer"
+ },
+ "duration" : {
+ "type" : "integer"
+ },
+ "alignmentBuffered" : {
+ "type" : "integer"
+ },
+ "numSubtasks" : {
+ "type" : "integer"
+ },
+ "numAckSubtasks" : {
+ "type" : "integer"
+ },
+ "checkpointStatisticsPerTask" : {
+ "type" : "object",
+ "additionalProperties" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:TaskCheckpointStatistics",
+ "properties" : {
+ "checkpointId" : {
+ "type" : "integer"
+ },
+ "checkpointStatus" : {
+ "type" : "string",
+ "enum" : [ "IN_PROGRESS", "COMPLETED", "FAILED" ]
+ },
+ "latestAckTimestamp" : {
+ "type" : "integer"
+ },
+ "stateSize" : {
+ "type" : "integer"
+ },
+ "duration" : {
+ "type" : "integer"
+ },
+ "alignmentBuffered" : {
+ "type" : "integer"
+ },
+ "numSubtasks" : {
+ "type" : "integer"
+ },
+ "numAckSubtasks" : {
+ "type" : "integer"
+ }
+ }
+ }
+ },
+ "externalPath" : {
+ "type" : "string"
+ },
+ "discarded" : {
+ "type" : "boolean"
+ }
+ }
+ },
+ "savepointStatistics" : {
+ "type" : "object",
+ "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointStatistics:CompletedCheckpointStatistics"
+ },
+ "failedCheckpointStatistics" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointStatistics:FailedCheckpointStatistics",
+ "properties" : {
+ "id" : {
+ "type" : "integer"
+ },
+ "status" : {
+ "type" : "string",
+ "enum" : [ "IN_PROGRESS", "COMPLETED", "FAILED" ]
+ },
+ "savepoint" : {
+ "type" : "boolean"
+ },
+ "triggerTimestamp" : {
+ "type" : "integer"
+ },
+ "latestAckTimestamp" : {
+ "type" : "integer"
+ },
+ "stateSize" : {
+ "type" : "integer"
+ },
+ "duration" : {
+ "type" : "integer"
+ },
+ "alignmentBuffered" : {
+ "type" : "integer"
+ },
+ "numSubtasks" : {
+ "type" : "integer"
+ },
+ "numAckSubtasks" : {
+ "type" : "integer"
+ },
+ "checkpointStatisticsPerTask" : {
+ "type" : "object",
+ "additionalProperties" : {
+ "type" : "object",
+ "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:TaskCheckpointStatistics"
+ }
+ },
+ "failureTimestamp" : {
+ "type" : "integer"
+ },
+ "failureMessage" : {
+ "type" : "string"
+ }
+ }
+ },
+ "restoredCheckpointStatistics" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointingStatistics:RestoredCheckpointStatistics",
+ "properties" : {
+ "id" : {
+ "type" : "integer"
+ },
+ "restoreTimestamp" : {
+ "type" : "integer"
+ },
+ "savepoint" : {
+ "type" : "boolean"
+ },
+ "externalPath" : {
+ "type" : "string"
+ }
+ }
+ }
+ }
+ },
+ "history" : {
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointStatistics",
+ "properties" : {
+ "id" : {
+ "type" : "integer"
+ },
+ "status" : {
+ "type" : "string",
+ "enum" : [ "IN_PROGRESS", "COMPLETED", "FAILED" ]
+ },
+ "savepoint" : {
+ "type" : "boolean"
+ },
+ "triggerTimestamp" : {
+ "type" : "integer"
+ },
+ "latestAckTimestamp" : {
+ "type" : "integer"
+ },
+ "stateSize" : {
+ "type" : "integer"
+ },
+ "duration" : {
+ "type" : "integer"
+ },
+ "alignmentBuffered" : {
+ "type" : "integer"
+ },
+ "numSubtasks" : {
+ "type" : "integer"
+ },
+ "numAckSubtasks" : {
+ "type" : "integer"
+ },
+ "checkpointStatisticsPerTask" : {
+ "type" : "object",
+ "additionalProperties" : {
+ "type" : "object",
+ "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:TaskCheckpointStatistics"
+ }
+ }
+ }
+ }
+ }
+ }
} </code>
</pre>
</div>
@@ -592,7 +1087,7 @@
<table class="table table-bordered">
<tbody>
<tr>
- <td class="text-left" colspan="2"><strong>/jobs/:jobid/checkpoints</strong></td>
+ <td class="text-left" colspan="2"><strong>/jobs/:jobid/checkpoints/config</strong></td>
</tr>
<tr>
<td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
@@ -613,8 +1108,8 @@
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#-128702999">Request</button>
- <div id="-128702999" class="collapse">
+ <button data-toggle="collapse" data-target="#-1872439843">Request</button>
+ <div id="-1872439843" class="collapse">
<pre>
<code>
{} </code>
@@ -624,267 +1119,12 @@
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#438202683">Response</button>
- <div id="438202683" class="collapse">
- <pre>
- <code>
-{
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointingStatistics",
- "properties" : {
- "counts" : {
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointingStatistics:Counts",
- "properties" : {
- "numberRestoredCheckpoints" : {
- "type" : "integer"
- },
- "totalNumberCheckpoints" : {
- "type" : "integer"
- },
- "numberInProgressCheckpoints" : {
- "type" : "integer"
- },
- "numberCompletedCheckpoints" : {
- "type" : "integer"
- },
- "numberFailedCheckpoints" : {
- "type" : "integer"
- }
- }
- },
- "summary" : {
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointingStatistics:Summary",
- "properties" : {
- "stateSize" : {
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics",
- "properties" : {
- "minimum" : {
- "type" : "integer"
- },
- "maximum" : {
- "type" : "integer"
- },
- "average" : {
- "type" : "integer"
- }
- }
- },
- "duration" : {
- "type" : "object",
- "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
- },
- "alignmentBuffered" : {
- "type" : "object",
- "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
- }
- }
- },
- "latestCheckpoints" : {
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointingStatistics:LatestCheckpoints",
- "properties" : {
- "completedCheckpointStatistics" : {
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointStatistics:CompletedCheckpointStatistics",
- "properties" : {
- "id" : {
- "type" : "integer"
- },
- "status" : {
- "type" : "string",
- "enum" : [ "IN_PROGRESS", "COMPLETED", "FAILED" ]
- },
- "savepoint" : {
- "type" : "boolean"
- },
- "triggerTimestamp" : {
- "type" : "integer"
- },
- "latestAckTimestamp" : {
- "type" : "integer"
- },
- "stateSize" : {
- "type" : "integer"
- },
- "duration" : {
- "type" : "integer"
- },
- "alignmentBuffered" : {
- "type" : "integer"
- },
- "numSubtasks" : {
- "type" : "integer"
- },
- "numAckSubtasks" : {
- "type" : "integer"
- },
- "checkpointStatisticsPerTask" : {
- "type" : "object",
- "additionalProperties" : {
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:TaskCheckpointStatistics",
- "properties" : {
- "checkpointId" : {
- "type" : "integer"
- },
- "checkpointStatus" : {
- "type" : "string",
- "enum" : [ "IN_PROGRESS", "COMPLETED", "FAILED" ]
- },
- "latestAckTimestamp" : {
- "type" : "integer"
- },
- "stateSize" : {
- "type" : "integer"
- },
- "duration" : {
- "type" : "integer"
- },
- "alignmentBuffered" : {
- "type" : "integer"
- },
- "numSubtasks" : {
- "type" : "integer"
- },
- "numAckSubtasks" : {
- "type" : "integer"
- }
- }
- }
- },
- "externalPath" : {
- "type" : "string"
- },
- "discarded" : {
- "type" : "boolean"
- }
- }
- },
- "savepointStatistics" : {
- "type" : "object",
- "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointStatistics:CompletedCheckpointStatistics"
- },
- "failedCheckpointStatistics" : {
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointStatistics:FailedCheckpointStatistics",
- "properties" : {
- "id" : {
- "type" : "integer"
- },
- "status" : {
- "type" : "string",
- "enum" : [ "IN_PROGRESS", "COMPLETED", "FAILED" ]
- },
- "savepoint" : {
- "type" : "boolean"
- },
- "triggerTimestamp" : {
- "type" : "integer"
- },
- "latestAckTimestamp" : {
- "type" : "integer"
- },
- "stateSize" : {
- "type" : "integer"
- },
- "duration" : {
- "type" : "integer"
- },
- "alignmentBuffered" : {
- "type" : "integer"
- },
- "numSubtasks" : {
- "type" : "integer"
- },
- "numAckSubtasks" : {
- "type" : "integer"
- },
- "checkpointStatisticsPerTask" : {
- "type" : "object",
- "additionalProperties" : {
- "type" : "object",
- "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:TaskCheckpointStatistics"
- }
- },
- "failureTimestamp" : {
- "type" : "integer"
- },
- "failureMessage" : {
- "type" : "string"
- }
- }
- },
- "restoredCheckpointStatistics" : {
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointingStatistics:RestoredCheckpointStatistics",
- "properties" : {
- "id" : {
- "type" : "integer"
- },
- "restoreTimestamp" : {
- "type" : "integer"
- },
- "savepoint" : {
- "type" : "boolean"
- },
- "externalPath" : {
- "type" : "string"
- }
- }
- }
- }
- },
- "history" : {
- "type" : "array",
- "items" : {
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointStatistics",
- "properties" : {
- "id" : {
- "type" : "integer"
- },
- "status" : {
- "type" : "string",
- "enum" : [ "IN_PROGRESS", "COMPLETED", "FAILED" ]
- },
- "savepoint" : {
- "type" : "boolean"
- },
- "triggerTimestamp" : {
- "type" : "integer"
- },
- "latestAckTimestamp" : {
- "type" : "integer"
- },
- "stateSize" : {
- "type" : "integer"
- },
- "duration" : {
- "type" : "integer"
- },
- "alignmentBuffered" : {
- "type" : "integer"
- },
- "numSubtasks" : {
- "type" : "integer"
- },
- "numAckSubtasks" : {
- "type" : "integer"
- },
- "checkpointStatisticsPerTask" : {
- "type" : "object",
- "additionalProperties" : {
- "type" : "object",
- "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:TaskCheckpointStatistics"
- }
- }
- }
- }
- }
- }
+ <button data-toggle="collapse" data-target="#1325385076">Response</button>
+ <div id="1325385076" class="collapse">
+ <pre>
+ <code>
+{
+ "type" : "any"
} </code>
</pre>
</div>
@@ -895,7 +1135,7 @@
<table class="table table-bordered">
<tbody>
<tr>
- <td class="text-left" colspan="2"><strong>/jobs/:jobid/checkpoints/config</strong></td>
+ <td class="text-left" colspan="2"><strong>/jobs/:jobid/checkpoints/details/:checkpointid/subtasks/:vertexid</strong></td>
</tr>
<tr>
<td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
@@ -911,13 +1151,15 @@
<td colspan="2">
<ul>
<li><code>jobid</code> - description</li>
+<li><code>checkpointid</code> - description</li>
+<li><code>vertexid</code> - description</li>
</ul>
</td>
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#-583364334">Request</button>
- <div id="-583364334" class="collapse">
+ <button data-toggle="collapse" data-target="#723836914">Request</button>
+ <div id="723836914" class="collapse">
<pre>
<code>
{} </code>
@@ -927,12 +1169,108 @@
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#-1680506711">Response</button>
- <div id="-1680506711" class="collapse">
+ <button data-toggle="collapse" data-target="#664379972">Response</button>
+ <div id="664379972" class="collapse">
<pre>
<code>
-{
- "type" : "any"
+{
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:TaskCheckpointStatisticsWithSubtaskDetails",
+ "properties" : {
+ "checkpointId" : {
+ "type" : "integer"
+ },
+ "checkpointStatus" : {
+ "type" : "string",
+ "enum" : [ "IN_PROGRESS", "COMPLETED", "FAILED" ]
+ },
+ "latestAckTimestamp" : {
+ "type" : "integer"
+ },
+ "stateSize" : {
+ "type" : "integer"
+ },
+ "duration" : {
+ "type" : "integer"
+ },
+ "alignmentBuffered" : {
+ "type" : "integer"
+ },
+ "numSubtasks" : {
+ "type" : "integer"
+ },
+ "numAckSubtasks" : {
+ "type" : "integer"
+ },
+ "summary" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:TaskCheckpointStatisticsWithSubtaskDetails:Summary",
+ "properties" : {
+ "stateSize" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics",
+ "properties" : {
+ "minimum" : {
+ "type" : "integer"
+ },
+ "maximum" : {
+ "type" : "integer"
+ },
+ "average" : {
+ "type" : "integer"
+ }
+ }
+ },
+ "duration" : {
+ "type" : "object",
+ "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
+ },
+ "checkpointDuration" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:TaskCheckpointStatisticsWithSubtaskDetails:CheckpointDuration",
+ "properties" : {
+ "synchronousDuration" : {
+ "type" : "object",
+ "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
+ },
+ "asynchronousDuration" : {
+ "type" : "object",
+ "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
+ }
+ }
+ },
+ "checkpointAlignment" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:TaskCheckpointStatisticsWithSubtaskDetails:CheckpointAlignment",
+ "properties" : {
+ "bufferedData" : {
+ "type" : "object",
+ "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
+ },
+ "duration" : {
+ "type" : "object",
+ "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
+ }
+ }
+ }
+ }
+ },
+ "subtaskCheckpointStatistics" : {
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:SubtaskCheckpointStatistics",
+ "properties" : {
+ "index" : {
+ "type" : "integer"
+ },
+ "checkpointStatus" : {
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }
} </code>
</pre>
</div>
@@ -943,7 +1281,7 @@
<table class="table table-bordered">
<tbody>
<tr>
- <td class="text-left" colspan="2"><strong>/jobs/:jobid/checkpoints/details/:checkpointid/subtasks/:vertexid</strong></td>
+ <td class="text-left" colspan="2"><strong>/jobs/:jobid/checkpoints/:checkpointid</strong></td>
</tr>
<tr>
<td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
@@ -960,14 +1298,13 @@
<ul>
<li><code>jobid</code> - description</li>
<li><code>checkpointid</code> - description</li>
-<li><code>vertexid</code> - description</li>
</ul>
</td>
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#2012912423">Request</button>
- <div id="2012912423" class="collapse">
+ <button data-toggle="collapse" data-target="#-450903614">Request</button>
+ <div id="-450903614" class="collapse">
<pre>
<code>
{} </code>
@@ -977,108 +1314,80 @@
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#1953455481">Response</button>
- <div id="1953455481" class="collapse">
- <pre>
- <code>
-{
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:TaskCheckpointStatisticsWithSubtaskDetails",
- "properties" : {
- "checkpointId" : {
- "type" : "integer"
- },
- "checkpointStatus" : {
- "type" : "string",
- "enum" : [ "IN_PROGRESS", "COMPLETED", "FAILED" ]
- },
- "latestAckTimestamp" : {
- "type" : "integer"
- },
- "stateSize" : {
- "type" : "integer"
- },
- "duration" : {
- "type" : "integer"
- },
- "alignmentBuffered" : {
- "type" : "integer"
- },
- "numSubtasks" : {
- "type" : "integer"
- },
- "numAckSubtasks" : {
- "type" : "integer"
- },
- "summary" : {
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:TaskCheckpointStatisticsWithSubtaskDetails:Summary",
- "properties" : {
- "stateSize" : {
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics",
- "properties" : {
- "minimum" : {
- "type" : "integer"
- },
- "maximum" : {
- "type" : "integer"
- },
- "average" : {
- "type" : "integer"
- }
- }
- },
- "duration" : {
- "type" : "object",
- "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
- },
- "checkpointDuration" : {
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:TaskCheckpointStatisticsWithSubtaskDetails:CheckpointDuration",
- "properties" : {
- "synchronousDuration" : {
- "type" : "object",
- "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
- },
- "asynchronousDuration" : {
- "type" : "object",
- "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
- }
- }
- },
- "checkpointAlignment" : {
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:TaskCheckpointStatisticsWithSubtaskDetails:CheckpointAlignment",
- "properties" : {
- "bufferedData" : {
- "type" : "object",
- "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
- },
- "duration" : {
- "type" : "object",
- "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
- }
- }
- }
- }
- },
- "subtaskCheckpointStatistics" : {
- "type" : "array",
- "items" : {
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:SubtaskCheckpointStatistics",
- "properties" : {
- "index" : {
- "type" : "integer"
- },
- "checkpointStatus" : {
- "type" : "string"
- }
- }
- }
- }
- }
+ <button data-toggle="collapse" data-target="#1821144940">Response</button>
+ <div id="1821144940" class="collapse">
+ <pre>
+ <code>
+{
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointStatistics",
+ "properties" : {
+ "id" : {
+ "type" : "integer"
+ },
+ "status" : {
+ "type" : "string",
+ "enum" : [ "IN_PROGRESS", "COMPLETED", "FAILED" ]
+ },
+ "savepoint" : {
+ "type" : "boolean"
+ },
+ "triggerTimestamp" : {
+ "type" : "integer"
+ },
+ "latestAckTimestamp" : {
+ "type" : "integer"
+ },
+ "stateSize" : {
+ "type" : "integer"
+ },
+ "duration" : {
+ "type" : "integer"
+ },
+ "alignmentBuffered" : {
+ "type" : "integer"
+ },
+ "numSubtasks" : {
+ "type" : "integer"
+ },
+ "numAckSubtasks" : {
+ "type" : "integer"
+ },
+ "checkpointStatisticsPerTask" : {
+ "type" : "object",
+ "additionalProperties" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:TaskCheckpointStatistics",
+ "properties" : {
+ "checkpointId" : {
+ "type" : "integer"
+ },
+ "checkpointStatus" : {
+ "type" : "string",
+ "enum" : [ "IN_PROGRESS", "COMPLETED", "FAILED" ]
+ },
+ "latestAckTimestamp" : {
+ "type" : "integer"
+ },
+ "stateSize" : {
+ "type" : "integer"
+ },
+ "duration" : {
+ "type" : "integer"
+ },
+ "alignmentBuffered" : {
+ "type" : "integer"
+ },
+ "numSubtasks" : {
+ "type" : "integer"
+ },
+ "numAckSubtasks" : {
+ "type" : "integer"
+ }
+ }
+ }
+ }
+ }
} </code>
</pre>
</div>
@@ -1089,7 +1398,7 @@
<table class="table table-bordered">
<tbody>
<tr>
- <td class="text-left" colspan="2"><strong>/jobs/:jobid/checkpoints/:checkpointid</strong></td>
+ <td class="text-left" colspan="2"><strong>/jobs/:jobid/config</strong></td>
</tr>
<tr>
<td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
@@ -1105,14 +1414,13 @@
<td colspan="2">
<ul>
<li><code>jobid</code> - description</li>
-<li><code>checkpointid</code> - description</li>
</ul>
</td>
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#838171895">Request</button>
- <div id="838171895" class="collapse">
+ <button data-toggle="collapse" data-target="#1648775609">Request</button>
+ <div id="1648775609" class="collapse">
<pre>
<code>
{} </code>
@@ -1122,80 +1430,39 @@
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#-1184746847">Response</button>
- <div id="-1184746847" class="collapse">
- <pre>
- <code>
-{
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointStatistics",
- "properties" : {
- "id" : {
- "type" : "integer"
- },
- "status" : {
- "type" : "string",
- "enum" : [ "IN_PROGRESS", "COMPLETED", "FAILED" ]
- },
- "savepoint" : {
- "type" : "boolean"
- },
- "triggerTimestamp" : {
- "type" : "integer"
- },
- "latestAckTimestamp" : {
- "type" : "integer"
- },
- "stateSize" : {
- "type" : "integer"
- },
- "duration" : {
- "type" : "integer"
- },
- "alignmentBuffered" : {
- "type" : "integer"
- },
- "numSubtasks" : {
- "type" : "integer"
- },
- "numAckSubtasks" : {
- "type" : "integer"
- },
- "checkpointStatisticsPerTask" : {
- "type" : "object",
- "additionalProperties" : {
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:TaskCheckpointStatistics",
- "properties" : {
- "checkpointId" : {
- "type" : "integer"
- },
- "checkpointStatus" : {
- "type" : "string",
- "enum" : [ "IN_PROGRESS", "COMPLETED", "FAILED" ]
- },
- "latestAckTimestamp" : {
- "type" : "integer"
- },
- "stateSize" : {
- "type" : "integer"
- },
- "duration" : {
- "type" : "integer"
- },
- "alignmentBuffered" : {
- "type" : "integer"
- },
- "numSubtasks" : {
- "type" : "integer"
- },
- "numAckSubtasks" : {
- "type" : "integer"
- }
- }
- }
- }
- }
+ <button data-toggle="collapse" data-target="#-1215495906">Response</button>
+ <div id="-1215495906" class="collapse">
+ <pre>
+ <code>
+{
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobConfigInfo",
+ "properties" : {
+ "jobId" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:api:common:JobID",
+ "properties" : {
+ "upperPart" : {
+ "type" : "integer"
+ },
+ "lowerPart" : {
+ "type" : "integer"
+ },
+ "bytes" : {
+ "type" : "array",
+ "items" : {
+ "type" : "integer"
+ }
+ }
+ }
+ },
+ "jobName" : {
+ "type" : "string"
+ },
+ "executionConfigInfo" : {
+ "type" : "any"
+ }
+ }
} </code>
</pre>
</div>
@@ -1206,7 +1473,7 @@
<table class="table table-bordered">
<tbody>
<tr>
- <td class="text-left" colspan="2"><strong>/jobs/:jobid/config</strong></td>
+ <td class="text-left" colspan="2"><strong>/jobs/:jobid/exceptions</strong></td>
</tr>
<tr>
<td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
@@ -1227,8 +1494,8 @@
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#-1357116178">Request</button>
- <div id="-1357116178" class="collapse">
+ <button data-toggle="collapse" data-target="#-494359301">Request</button>
+ <div id="-494359301" class="collapse">
<pre>
<code>
{} </code>
@@ -1238,39 +1505,12 @@
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#73579603">Response</button>
- <div id="73579603" class="collapse">
- <pre>
- <code>
-{
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobConfigInfo",
- "properties" : {
- "jobId" : {
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:api:common:JobID",
- "properties" : {
- "upperPart" : {
- "type" : "integer"
- },
- "lowerPart" : {
- "type" : "integer"
- },
- "bytes" : {
- "type" : "array",
- "items" : {
- "type" : "integer"
- }
- }
- }
- },
- "jobName" : {
- "type" : "string"
- },
- "executionConfigInfo" : {
- "type" : "any"
- }
- }
+ <button data-toggle="collapse" data-target="#-1465234270">Response</button>
+ <div id="-1465234270" class="collapse">
+ <pre>
+ <code>
+{
+ "type" : "any"
} </code>
</pre>
</div>
@@ -1281,7 +1521,7 @@
<table class="table table-bordered">
<tbody>
<tr>
- <td class="text-left" colspan="2"><strong>/jobs/:jobid/exceptions</strong></td>
+ <td class="text-left" colspan="2"><strong>/jobs/:jobid/execution-result</strong></td>
</tr>
<tr>
<td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
@@ -1302,8 +1542,8 @@
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#794716208">Request</button>
- <div id="794716208" class="collapse">
+ <button data-toggle="collapse" data-target="#-1141690839">Request</button>
+ <div id="-1141690839" class="collapse">
<pre>
<code>
{} </code>
@@ -1313,12 +1553,79 @@
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#-176158761">Response</button>
- <div id="-176158761" class="collapse">
+ <button data-toggle="collapse" data-target="#-1847991759">Response</button>
+ <div id="-1847991759" class="collapse">
<pre>
<code>
-{
- "type" : "any"
+{
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobExecutionResultResponseBody",
+ "properties" : {
+ "status" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:queue:QueueStatus",
+ "properties" : {
+ "id" : {
+ "type" : "string",
+ "enum" : [ "IN_PROGRESS", "COMPLETED" ]
+ }
+ }
+ },
+ "jobExecutionResult" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:jobmaster:JobResult",
+ "properties" : {
+ "jobId" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:api:common:JobID",
+ "properties" : {
+ "upperPart" : {
+ "type" : "integer"
+ },
+ "lowerPart" : {
+ "type" : "integer"
+ },
+ "bytes" : {
+ "type" : "array",
+ "items" : {
+ "type" : "integer"
+ }
+ }
+ }
+ },
+ "accumulatorResults" : {
+ "type" : "object",
+ "additionalProperties" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:util:SerializedValue<org:apache:flink:util:OptionalFailure<java:lang:Object>>",
+ "properties" : {
+ "byteArray" : {
+ "type" : "array",
+ "items" : {
+ "type" : "integer"
+ }
+ }
+ }
+ }
+ },
+ "netRuntime" : {
+ "type" : "integer"
+ },
+ "serializedThrowable" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:java:util:Optional<org:apache:flink:util:SerializedThrowable>",
+ "properties" : {
+ "present" : {
+ "type" : "boolean"
+ }
+ }
+ },
+ "success" : {
+ "type" : "boolean"
+ }
+ }
+ }
+ }
} </code>
</pre>
</div>
@@ -1360,8 +1667,8 @@
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#-1792172223">Request</button>
- <div id="-1792172223" class="collapse">
+ <button data-toggle="collapse" data-target="#1213719564">Request</button>
+ <div id="1213719564" class="collapse">
<pre>
<code>
{} </code>
@@ -1371,30 +1678,30 @@
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#-1481443862">Response</button>
- <div id="-1481443862" class="collapse">
+ <button data-toggle="collapse" data-target="#1524447925">Response</button>
+ <div id="1524447925" class="collapse">
<pre>
<code>
-{
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:MetricCollectionResponseBody",
- "properties" : {
- "metrics" : {
- "type" : "array",
- "items" : {
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:Metric",
- "properties" : {
- "id" : {
- "type" : "string"
- },
- "value" : {
- "type" : "string"
- }
- }
- }
- }
- }
+{
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:MetricCollectionResponseBody",
+ "properties" : {
+ "metrics" : {
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:Metric",
+ "properties" : {
+ "id" : {
+ "type" : "string"
+ },
+ "value" : {
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }
} </code>
</pre>
</div>
@@ -1426,8 +1733,8 @@
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#-895161707">Request</button>
- <div id="-895161707" class="collapse">
+ <button data-toggle="collapse" data-target="#2110730080">Request</button>
+ <div id="2110730080" class="collapse">
<pre>
<code>
{} </code>
@@ -1437,18 +1744,18 @@
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#597567777">Response</button>
- <div id="597567777" class="collapse">
+ <button data-toggle="collapse" data-target="#-691507732">Response</button>
+ <div id="-691507732" class="collapse">
<pre>
<code>
-{
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobPlanInfo",
- "properties" : {
- "jsonPlan" : {
- "type" : "string"
- }
- }
+{
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobPlanInfo",
+ "properties" : {
+ "jsonPlan" : {
+ "type" : "string"
+ }
+ }
} </code>
</pre>
</div>
@@ -1459,10 +1766,10 @@
<table class="table table-bordered">
<tbody>
<tr>
- <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/accumulators</strong></td>
+ <td class="text-left" colspan="2"><strong>/jobs/:jobid/rescaling</strong></td>
</tr>
<tr>
- <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
+ <td class="text-left" style="width: 20%">Verb: <code>PATCH</code></td>
<td class="text-left">Response code: <code>200 OK</code></td>
</tr>
<tr>
@@ -1475,14 +1782,23 @@
<td colspan="2">
<ul>
<li><code>jobid</code> - description</li>
-<li><code>vertexid</code> - description</li>
+ </ul>
+ </td>
+ </tr>
+ <tr>
+ <td colspan="2">Query parameters</td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <ul>
+<li><code>parallelism</code> (mandatory): description</li>
</ul>
</td>
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#-2003025577">Request</button>
- <div id="-2003025577" class="collapse">
+ <button data-toggle="collapse" data-target="#1601881923">Request</button>
+ <div id="1601881923" class="collapse">
<pre>
<code>
{} </code>
@@ -1492,12 +1808,33 @@
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#736007659">Response</button>
- <div id="736007659" class="collapse">
+ <button data-toggle="collapse" data-target="#971862112">Response</button>
+ <div id="971862112" class="collapse">
<pre>
<code>
-{
- "type" : "any"
+{
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:handler:async:TriggerResponse",
+ "properties" : {
+ "triggerId" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:TriggerId",
+ "properties" : {
+ "upperPart" : {
+ "type" : "integer"
+ },
+ "lowerPart" : {
+ "type" : "integer"
+ },
+ "bytes" : {
+ "type" : "array",
+ "items" : {
+ "type" : "integer"
+ }
+ }
+ }
+ }
+ }
} </code>
</pre>
</div>
@@ -1508,7 +1845,7 @@
<table class="table table-bordered">
<tbody>
<tr>
- <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/metrics</strong></td>
+ <td class="text-left" colspan="2"><strong>/jobs/:jobid/rescaling/:triggerid</strong></td>
</tr>
<tr>
<td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
@@ -1524,57 +1861,109 @@
<td colspan="2">
<ul>
<li><code>jobid</code> - description</li>
-<li><code>vertexid</code> - description</li>
+<li><code>triggerid</code> - description</li>
</ul>
</td>
</tr>
<tr>
- <td colspan="2">Query parameters</td>
+ <td colspan="2">
+ <button data-toggle="collapse" data-target="#1396498187">Request</button>
+ <div id="1396498187" class="collapse">
+ <pre>
+ <code>
+{} </code>
+ </pre>
+ </div>
+ </td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <button data-toggle="collapse" data-target="#-436584949">Response</button>
+ <div id="-436584949" class="collapse">
+ <pre>
+ <code>
+{
+ "type" : "any"
+} </code>
+ </pre>
+ </div>
+ </td>
+ </tr>
+ </tbody>
+</table>
+<table class="table table-bordered">
+ <tbody>
+ <tr>
+ <td class="text-left" colspan="2"><strong>/jobs/:jobid/savepoints</strong></td>
+ </tr>
+ <tr>
+ <td class="text-left" style="width: 20%">Verb: <code>POST</code></td>
+ <td class="text-left">Response code: <code>202 Accepted</code></td>
+ </tr>
+ <tr>
+ <td colspan="2">description</td>
+ </tr>
+ <tr>
+ <td colspan="2">Path parameters</td>
</tr>
<tr>
<td colspan="2">
<ul>
-<li><code>get</code> (optional): description</li>
+<li><code>jobid</code> - description</li>
</ul>
</td>
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#197623875">Request</button>
- <div id="197623875" class="collapse">
+ <button data-toggle="collapse" data-target="#256459122">Request</button>
+ <div id="256459122" class="collapse">
<pre>
<code>
-{} </code>
+{
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:savepoints:SavepointTriggerRequestBody",
+ "properties" : {
+ "targetDirectory" : {
+ "type" : "string"
+ },
+ "cancelJob" : {
+ "type" : "boolean"
+ }
+ }
+} </code>
</pre>
</div>
</td>
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#508352236">Response</button>
- <div id="508352236" class="collapse">
+ <button data-toggle="collapse" data-target="#-591687676">Response</button>
+ <div id="-591687676" class="collapse">
<pre>
<code>
-{
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:MetricCollectionResponseBody",
- "properties" : {
- "metrics" : {
- "type" : "array",
- "items" : {
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:Metric",
- "properties" : {
- "id" : {
- "type" : "string"
- },
- "value" : {
- "type" : "string"
- }
- }
- }
- }
- }
+{
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:handler:async:TriggerResponse",
+ "properties" : {
+ "triggerId" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:TriggerId",
+ "properties" : {
+ "upperPart" : {
+ "type" : "integer"
+ },
+ "lowerPart" : {
+ "type" : "integer"
+ },
+ "bytes" : {
+ "type" : "array",
+ "items" : {
+ "type" : "integer"
+ }
+ }
+ }
+ }
+ }
} </code>
</pre>
</div>
@@ -1585,7 +1974,7 @@
<table class="table table-bordered">
<tbody>
<tr>
- <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskindex/metrics</strong></td>
+ <td class="text-left" colspan="2"><strong>/jobs/:jobid/savepoints/:triggerid</strong></td>
</tr>
<tr>
<td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
@@ -1601,25 +1990,63 @@
<td colspan="2">
<ul>
<li><code>jobid</code> - description</li>
-<li><code>vertexid</code> - description</li>
-<li><code>subtaskindex</code> - description</li>
+<li><code>triggerid</code> - description</li>
</ul>
</td>
</tr>
<tr>
- <td colspan="2">Query parameters</td>
+ <td colspan="2">
+ <button data-toggle="collapse" data-target="#-679507407">Request</button>
+ <div id="-679507407" class="collapse">
+ <pre>
+ <code>
+{} </code>
+ </pre>
+ </div>
+ </td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <button data-toggle="collapse" data-target="#1782376753">Response</button>
+ <div id="1782376753" class="collapse">
+ <pre>
+ <code>
+{
+ "type" : "any"
+} </code>
+ </pre>
+ </div>
+ </td>
+ </tr>
+ </tbody>
+</table>
+<table class="table table-bordered">
+ <tbody>
+ <tr>
+ <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid</strong></td>
+ </tr>
+ <tr>
+ <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
+ <td class="text-left">Response code: <code>200 OK</code></td>
+ </tr>
+ <tr>
+ <td colspan="2">description</td>
+ </tr>
+ <tr>
+ <td colspan="2">Path parameters</td>
</tr>
<tr>
<td colspan="2">
<ul>
-<li><code>get</code> (optional): description</li>
+<li><code>jobid</code> - description</li>
+<li><code>vertexid</code> - description</li>
</ul>
</td>
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#-570514556">Request</button>
- <div id="-570514556" class="collapse">
+ <button data-toggle="collapse" data-target="#155996506">Request</button>
+ <div id="155996506" class="collapse">
<pre>
<code>
{} </code>
@@ -1629,30 +2056,12 @@
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#-259786195">Response</button>
- <div id="-259786195" class="collapse">
+ <button data-toggle="collapse" data-target="#-1181366619">Response</button>
+ <div id="-1181366619" class="collapse">
<pre>
<code>
-{
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:MetricCollectionResponseBody",
- "properties" : {
- "metrics" : {
- "type" : "array",
- "items" : {
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:Metric",
- "properties" : {
- "id" : {
- "type" : "string"
- },
- "value" : {
- "type" : "string"
- }
- }
- }
- }
- }
+{
+ "type" : "any"
} </code>
</pre>
</div>
@@ -1663,7 +2072,7 @@
<table class="table table-bordered">
<tbody>
<tr>
- <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/subtasktimes</strong></td>
+ <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/accumulators</strong></td>
</tr>
<tr>
<td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
@@ -1685,8 +2094,8 @@
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#-1181229237">Request</button>
- <div id="-1181229237" class="collapse">
+ <button data-toggle="collapse" data-target="#1002866210">Request</button>
+ <div id="1002866210" class="collapse">
<pre>
<code>
{} </code>
@@ -1696,12 +2105,12 @@
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#2091701353">Response</button>
- <div id="2091701353" class="collapse">
+ <button data-toggle="collapse" data-target="#-553067850">Response</button>
+ <div id="-553067850" class="collapse">
<pre>
<code>
-{
- "type" : "any"
+{
+ "type" : "any"
} </code>
</pre>
</div>
@@ -1712,7 +2121,7 @@
<table class="table table-bordered">
<tbody>
<tr>
- <td class="text-left" colspan="2"><strong>/overview</strong></td>
+ <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/backpressure</strong></td>
</tr>
<tr>
<td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
@@ -1722,9 +2131,20 @@
<td colspan="2">description</td>
</tr>
<tr>
+ <td colspan="2">Path parameters</td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <ul>
+<li><code>jobid</code> - description</li>
+<li><code>vertexid</code> - description</li>
+ </ul>
+ </td>
+ </tr>
+ <tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#-1581342131">Request</button>
- <div id="-1581342131" class="collapse">
+ <button data-toggle="collapse" data-target="#-587292255">Request</button>
+ <div id="-587292255" class="collapse">
<pre>
<code>
{} </code>
@@ -1734,42 +2154,45 @@
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#-1331669657">Response</button>
- <div id="-1331669657" class="collapse">
- <pre>
- <code>
-{
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:handler:legacy:messages:ClusterOverviewWithVersion",
- "properties" : {
- "numJobsRunningOrPending" : {
- "type" : "integer"
- },
- "numJobsFinished" : {
- "type" : "integer"
- },
- "numJobsCancelled" : {
- "type" : "integer"
- },
- "numJobsFailed" : {
- "type" : "integer"
- },
- "numTaskManagersConnected" : {
- "type" : "integer"
- },
- "numSlotsTotal" : {
- "type" : "integer"
- },
- "numSlotsAvailable" : {
- "type" : "integer"
- },
- "version" : {
- "type" : "string"
- },
- "commitId" : {
- "type" : "string"
- }
- }
+ <button data-toggle="collapse" data-target="#1388088916">Response</button>
+ <div id="1388088916" class="collapse">
+ <pre>
+ <code>
+{
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobVertexBackPressureInfo",
+ "properties" : {
+ "status" : {
+ "type" : "string",
+ "enum" : [ "DEPRECATED", "OK" ]
+ },
+ "backpressureLevel" : {
+ "type" : "string",
+ "enum" : [ "OK", "LOW", "HIGH" ]
+ },
+ "endTimestamp" : {
+ "type" : "integer"
+ },
+ "subtasks" : {
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobVertexBackPressureInfo:SubtaskBackPressureInfo",
+ "properties" : {
+ "subtask" : {
+ "type" : "integer"
+ },
+ "backpressureLevel" : {
+ "type" : "string",
+ "enum" : [ "OK", "LOW", "HIGH" ]
+ },
+ "ratio" : {
+ "type" : "number"
+ }
+ }
+ }
+ }
+ }
} </code>
</pre>
</div>
@@ -1780,7 +2203,7 @@
<table class="table table-bordered">
<tbody>
<tr>
- <td class="text-left" colspan="2"><strong>/taskmanagers</strong></td>
+ <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/metrics</strong></td>
</tr>
<tr>
<td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
@@ -1790,9 +2213,30 @@
<td colspan="2">description</td>
</tr>
<tr>
+ <td colspan="2">Path parameters</td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <ul>
+<li><code>jobid</code> - description</li>
+<li><code>vertexid</code> - description</li>
+ </ul>
+ </td>
+ </tr>
+ <tr>
+ <td colspan="2">Query parameters</td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <ul>
+<li><code>get</code> (optional): description</li>
+ </ul>
+ </td>
+ </tr>
+ <tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#-1212462017">Request</button>
- <div id="-1212462017" class="collapse">
+ <button data-toggle="collapse" data-target="#-1091451634">Request</button>
+ <div id="-1091451634" class="collapse">
<pre>
<code>
{} </code>
@@ -1802,70 +2246,30 @@
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#1694259450">Response</button>
- <div id="1694259450" class="collapse">
- <pre>
- <code>
-{
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagersInfo",
- "properties" : {
- "taskManagerInfos" : {
- "type" : "array",
- "items" : {
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerInfo",
- "properties" : {
- "resourceId" : {
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:clusterframework:types:ResourceID",
- "properties" : {
- "resourceIdString" : {
- "type" : "string"
- },
- "resourceID" : {
- "type" : "object",
- "$ref" : "urn:jsonschema:org:apache:flink:runtime:clusterframework:types:ResourceID"
- }
- }
- },
- "address" : {
- "type" : "string"
- },
- "dataPort" : {
- "type" : "integer"
- },
- "lastHeartbeat" : {
- "type" : "integer"
- },
- "numberSlots" : {
- "type" : "integer"
- },
- "numberAvailableSlots" : {
- "type" : "integer"
- },
- "hardwareDescription" : {
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:instance:HardwareDescription",
- "properties" : {
- "numberOfCPUCores" : {
- "type" : "integer"
- },
- "sizeOfPhysicalMemory" : {
- "type" : "integer"
- },
- "sizeOfJvmHeap" : {
- "type" : "integer"
- },
- "sizeOfManagedMemory" : {
- "type" : "integer"
- }
- }
- }
- }
- }
- }
- }
+ <button data-toggle="collapse" data-target="#-780723273">Response</button>
+ <div id="-780723273" class="collapse">
+ <pre>
+ <code>
+{
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:MetricCollectionResponseBody",
+ "properties" : {
+ "metrics" : {
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:Metric",
+ "properties" : {
+ "id" : {
+ "type" : "string"
+ },
+ "value" : {
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }
} </code>
</pre>
</div>
@@ -1876,7 +2280,7 @@
<table class="table table-bordered">
<tbody>
<tr>
- <td class="text-left" colspan="2"><strong>/taskmanagers/:taskmanagerid</strong></td>
+ <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskindex</strong></td>
</tr>
<tr>
<td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
@@ -1891,14 +2295,16 @@
<tr>
<td colspan="2">
<ul>
-<li><code>taskmanagerid</code> - description</li>
+<li><code>jobid</code> - description</li>
+<li><code>vertexid</code> - description</li>
+<li><code>subtaskindex</code> - description</li>
</ul>
</td>
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#-1209460381">Request</button>
- <div id="-1209460381" class="collapse">
+ <button data-toggle="collapse" data-target="#-1504229989">Request</button>
+ <div id="-1504229989" class="collapse">
<pre>
<code>
{} </code>
@@ -1908,61 +2314,661 @@
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#-1726867379">Response</button>
- <div id="-1726867379" class="collapse">
- <pre>
- <code>
-{
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerDetailsInfo",
- "properties" : {
- "resourceId" : {
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:clusterframework:types:ResourceID",
- "properties" : {
- "resourceIdString" : {
- "type" : "string"
- },
- "resourceID" : {
- "type" : "object",
- "$ref" : "urn:jsonschema:org:apache:flink:runtime:clusterframework:types:ResourceID"
- }
- }
- },
- "address" : {
- "type" : "string"
- },
- "dataPort" : {
- "type" : "integer"
- },
- "lastHeartbeat" : {
- "type" : "integer"
- },
- "numberSlots" : {
- "type" : "integer"
- },
- "numberAvailableSlots" : {
- "type" : "integer"
- },
- "hardwareDescription" : {
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:instance:HardwareDescription",
- "properties" : {
- "numberOfCPUCores" : {
- "type" : "integer"
- },
- "sizeOfPhysicalMemory" : {
- "type" : "integer"
- },
- "sizeOfJvmHeap" : {
- "type" : "integer"
- },
- "sizeOfManagedMemory" : {
- "type" : "integer"
- }
- }
- }
- }
+ <button data-toggle="collapse" data-target="#772003294">Response</button>
+ <div id="772003294" class="collapse">
+ <pre>
+ <code>
+{
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:SubtaskExecutionAttemptDetailsInfo",
+ "properties" : {
+ "subtaskIndex" : {
+ "type" : "integer"
+ },
+ "status" : {
+ "type" : "string",
+ "enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING" ]
+ },
+ "attempt" : {
+ "type" : "integer"
+ },
+ "host" : {
+ "type" : "string"
+ },
+ "startTime" : {
+ "type" : "integer"
+ },
+ "endTime" : {
+ "type" : "integer"
+ },
+ "duration" : {
+ "type" : "integer"
+ },
+ "ioMetricsInfo" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:IOMetricsInfo",
+ "properties" : {
+ "bytesRead" : {
+ "type" : "integer"
+ },
+ "bytesReadComplete" : {
+ "type" : "boolean"
+ },
+ "bytesWritten" : {
+ "type" : "integer"
+ },
+ "bytesWrittenComplete" : {
+ "type" : "boolean"
+ },
+ "recordsRead" : {
+ "type" : "integer"
+ },
+ "recordsReadComplet
<TRUNCATED>
[09/19] flink git commit: [hotfix][docs][table] Minor fixes
Posted by ch...@apache.org.
[hotfix][docs][table] Minor fixes
This closes #5795.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/40b9a638
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/40b9a638
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/40b9a638
Branch: refs/heads/release-1.5
Commit: 40b9a638c939769505bfbdcb5635cdc7c634e4e8
Parents: 15e4449
Author: mayyamus <ch...@outlook.com>
Authored: Fri Mar 30 21:44:47 2018 +0800
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 4 14:26:45 2018 +0200
----------------------------------------------------------------------
docs/dev/table/common.md | 4 ++--
docs/dev/table/streaming.md | 2 +-
2 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/40b9a638/docs/dev/table/common.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/common.md b/docs/dev/table/common.md
index e1d3809..3e3965a 100644
--- a/docs/dev/table/common.md
+++ b/docs/dev/table/common.md
@@ -775,8 +775,8 @@ DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
Types.STRING(),
Types.INT());
-DataStream<Tuple2<String, Integer>> dsTuple =
- tableEnv.toAppendStream(table, tupleType);
+DataSet<Tuple2<String, Integer>> dsTuple =
+ tableEnv.toDataSet(table, tupleType);
{% endhighlight %}
</div>
http://git-wip-us.apache.org/repos/asf/flink/blob/40b9a638/docs/dev/table/streaming.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/streaming.md b/docs/dev/table/streaming.md
index 310121e..dc0fdf8 100644
--- a/docs/dev/table/streaming.md
+++ b/docs/dev/table/streaming.md
@@ -362,7 +362,7 @@ In either case the event time timestamp field will hold the value of the `DataSt
// Option 1:
// extract timestamp and assign watermarks based on knowledge of the stream
-DataStream<Tuple3<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
+DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
// declare an additional logical field as an event time attribute
Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.rowtime");
[03/19] flink git commit: [FLINK-8963][tests] Port
BigUserProgramJobSubmitITCase to MiniClusterResource
Posted by ch...@apache.org.
[FLINK-8963][tests] Port BigUserProgramJobSubmitITCase to MiniClusterResource
This closes #5772.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/25a97404
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/25a97404
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/25a97404
Branch: refs/heads/release-1.5
Commit: 25a97404919b0ca717c75dd52434d8f7f01c1b96
Parents: 27cf4be
Author: zentol <ch...@apache.org>
Authored: Tue Feb 20 18:02:51 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 4 08:59:18 2018 +0200
----------------------------------------------------------------------
.../runtime/BigUserProgramJobSubmitITCase.java | 84 +++++++++++++-------
1 file changed, 56 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/25a97404/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java
index a4d5958..b10dbec 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java
@@ -18,21 +18,27 @@
package org.apache.flink.test.streaming.runtime;
import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.client.deployment.StandaloneClusterId;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
-import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
+import org.apache.flink.testutils.category.New;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.junit.experimental.categories.Category;
+import java.net.URI;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -44,37 +50,48 @@ import static org.junit.Assert.assertEquals;
* Integration test that verifies that a user program with a big(ger) payload is successfully
* submitted and run.
*/
-@Ignore("Fails on job submission payload being too large - [FLINK-7285]")
+@Category(New.class)
public class BigUserProgramJobSubmitITCase extends TestLogger {
// ------------------------------------------------------------------------
// The mini cluster that is shared across tests
// ------------------------------------------------------------------------
- private static final int DEFAULT_PARALLELISM = 1;
+ private static final MiniCluster CLUSTER;
+ private static final RestClusterClient<StandaloneClusterId> CLIENT;
- private static LocalFlinkMiniCluster cluster;
+ static {
+ try {
+ MiniClusterConfiguration clusterConfiguration = new MiniClusterConfiguration.Builder()
+ .setNumTaskManagers(1)
+ .setNumSlotsPerTaskManager(1)
+ .build();
+ CLUSTER = new MiniCluster(clusterConfiguration);
+ CLUSTER.start();
- private static final Logger LOG = LoggerFactory.getLogger(BigUserProgramJobSubmitITCase.class);
+ URI restAddress = CLUSTER.getRestAddress();
+
+ final Configuration clientConfig = new Configuration();
+ clientConfig.setString(JobManagerOptions.ADDRESS, restAddress.getHost());
+ clientConfig.setInteger(RestOptions.REST_PORT, restAddress.getPort());
+
+ CLIENT = new RestClusterClient<>(
+ clientConfig,
+ StandaloneClusterId.getInstance());
+
+ } catch (Exception e) {
+ throw new AssertionError("Could not setup cluster.", e);
+ }
+ }
// ------------------------------------------------------------------------
// Cluster setup & teardown
// ------------------------------------------------------------------------
- @BeforeClass
- public static void setup() throws Exception {
- // make sure we do not use a singleActorSystem for the tests
- // (therefore, we cannot simply inherit from StreamingMultipleProgramsTestBase)
- LOG.info("Starting FlinkMiniCluster");
- cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, false, false, false);
- TestStreamEnvironment.setAsContext(cluster, DEFAULT_PARALLELISM);
- }
-
@AfterClass
public static void teardown() throws Exception {
- LOG.info("Closing FlinkMiniCluster");
- TestStreamEnvironment.unsetAsContext();
- TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
+ CLIENT.shutdown();
+ CLUSTER.close();
}
private final Random rnd = new Random();
@@ -85,15 +102,16 @@ public class BigUserProgramJobSubmitITCase extends TestLogger {
@Test
public void bigDataInMap() throws Exception {
- final byte[] data = new byte[100 * 1024 * 1024]; // 100 MB
+ final byte[] data = new byte[16 * 1024 * 1024]; // 16 MB
rnd.nextBytes(data); // use random data so that Java does not optimise it away
data[1] = 0;
data[3] = 0;
data[5] = 0;
- TestListResultSink<String> resultSink = new TestListResultSink<>();
+ CollectingSink resultSink = new CollectingSink();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
DataStream<Integer> src = env.fromElements(1, 3, 5);
@@ -106,15 +124,25 @@ public class BigUserProgramJobSubmitITCase extends TestLogger {
}
}).addSink(resultSink);
- env.execute();
+ JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+ CLIENT.setDetached(false);
+ CLIENT.submitJob(jobGraph, BigUserProgramJobSubmitITCase.class.getClassLoader());
List<String> expected = Arrays.asList("x 1 0", "x 3 0", "x 5 0");
- List<String> result = resultSink.getResult();
+ List<String> result = CollectingSink.result;
Collections.sort(expected);
Collections.sort(result);
assertEquals(expected, result);
}
+
+ private static class CollectingSink implements SinkFunction<String> {
+ private static final List<String> result = Collections.synchronizedList(new ArrayList<>(3));
+
+ public void invoke(String value, Context context) throws Exception {
+ result.add(value);
+ }
+ }
}
[05/19] flink git commit: [FLINK-8704][tests] Port
SlotCountExceedingParallelismTest
Posted by ch...@apache.org.
[FLINK-8704][tests] Port SlotCountExceedingParallelismTest
This closes #5694.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b1a7e4fd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b1a7e4fd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b1a7e4fd
Branch: refs/heads/release-1.5
Commit: b1a7e4fd89ca97149f933cd754a13ffde5ba9797
Parents: 25a9740
Author: zentol <ch...@apache.org>
Authored: Mon Mar 12 13:52:57 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 4 08:59:18 2018 +0200
----------------------------------------------------------------------
...LegacySlotCountExceedingParallelismTest.java | 212 +++++++++++++++++++
.../SlotCountExceedingParallelismTest.java | 32 ++-
2 files changed, 235 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b1a7e4fd/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/LegacySlotCountExceedingParallelismTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/LegacySlotCountExceedingParallelismTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/LegacySlotCountExceedingParallelismTest.java
new file mode 100644
index 0000000..356d94a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/LegacySlotCountExceedingParallelismTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.reader.RecordReader;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.BitSet;
+
+public class LegacySlotCountExceedingParallelismTest extends TestLogger {
+
+ // Test configuration
+ private static final int NUMBER_OF_TMS = 2;
+ private static final int NUMBER_OF_SLOTS_PER_TM = 2;
+ private static final int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
+
+ public static final String JOB_NAME = "SlotCountExceedingParallelismTest (no slot sharing, blocking results)";
+
+ private static TestingCluster flink;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ flink = TestingUtils.startTestingCluster(
+ NUMBER_OF_SLOTS_PER_TM,
+ NUMBER_OF_TMS,
+ TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ if (flink != null) {
+ flink.stop();
+ }
+ }
+
+ @Test
+ public void testNoSlotSharingAndBlockingResultSender() throws Exception {
+ // Sender with higher parallelism than available slots
+ JobGraph jobGraph = createTestJobGraph(JOB_NAME, PARALLELISM * 2, PARALLELISM);
+ submitJobGraphAndWait(jobGraph);
+ }
+
+ @Test
+ public void testNoSlotSharingAndBlockingResultReceiver() throws Exception {
+ // Receiver with higher parallelism than available slots
+ JobGraph jobGraph = createTestJobGraph(JOB_NAME, PARALLELISM, PARALLELISM * 2);
+ submitJobGraphAndWait(jobGraph);
+ }
+
+ @Test
+ public void testNoSlotSharingAndBlockingResultBoth() throws Exception {
+ // Both sender and receiver with higher parallelism than available slots
+ JobGraph jobGraph = createTestJobGraph(JOB_NAME, PARALLELISM * 2, PARALLELISM * 2);
+ submitJobGraphAndWait(jobGraph);
+ }
+
+ // ---------------------------------------------------------------------------------------------
+
+ private void submitJobGraphAndWait(final JobGraph jobGraph) throws JobExecutionException {
+ flink.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION());
+ }
+
+ private JobGraph createTestJobGraph(
+ String jobName,
+ int senderParallelism,
+ int receiverParallelism) {
+
+ // The sender and receiver invokable logic ensure that each subtask gets the expected data
+ final JobVertex sender = new JobVertex("Sender");
+ sender.setInvokableClass(RoundRobinSubtaskIndexSender.class);
+ sender.getConfiguration().setInteger(RoundRobinSubtaskIndexSender.CONFIG_KEY, receiverParallelism);
+ sender.setParallelism(senderParallelism);
+
+ final JobVertex receiver = new JobVertex("Receiver");
+ receiver.setInvokableClass(SubtaskIndexReceiver.class);
+ receiver.getConfiguration().setInteger(SubtaskIndexReceiver.CONFIG_KEY, senderParallelism);
+ receiver.setParallelism(receiverParallelism);
+
+ receiver.connectNewDataSetAsInput(
+ sender,
+ DistributionPattern.ALL_TO_ALL,
+ ResultPartitionType.BLOCKING);
+
+ final JobGraph jobGraph = new JobGraph(jobName, sender, receiver);
+
+ // We need to allow queued scheduling, because there are not enough slots available
+ // to run all tasks at once. We queue tasks and then let them finish/consume the blocking
+ // result one after the other.
+ jobGraph.setAllowQueuedScheduling(true);
+
+ return jobGraph;
+ }
+
+ /**
+ * Sends the subtask index a configurable number of times in a round-robin fashion.
+ */
+ public static class RoundRobinSubtaskIndexSender extends AbstractInvokable {
+
+ public static final String CONFIG_KEY = "number-of-times-to-send";
+
+ public RoundRobinSubtaskIndexSender(Environment environment) {
+ super(environment);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ RecordWriter<IntValue> writer = new RecordWriter<>(getEnvironment().getWriter(0));
+ final int numberOfTimesToSend = getTaskConfiguration().getInteger(CONFIG_KEY, 0);
+
+ final IntValue subtaskIndex = new IntValue(
+ getEnvironment().getTaskInfo().getIndexOfThisSubtask());
+
+ try {
+ for (int i = 0; i < numberOfTimesToSend; i++) {
+ writer.emit(subtaskIndex);
+ }
+ writer.flushAll();
+ }
+ finally {
+ writer.clearBuffers();
+ }
+ }
+ }
+
+ /**
+ * Expects to receive the subtask index from a configurable number of sender tasks.
+ */
+ public static class SubtaskIndexReceiver extends AbstractInvokable {
+
+ public static final String CONFIG_KEY = "number-of-indexes-to-receive";
+
+ public SubtaskIndexReceiver(Environment environment) {
+ super(environment);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ RecordReader<IntValue> reader = new RecordReader<>(
+ getEnvironment().getInputGate(0),
+ IntValue.class,
+ getEnvironment().getTaskManagerInfo().getTmpDirectories());
+
+ try {
+ final int numberOfSubtaskIndexesToReceive = getTaskConfiguration().getInteger(CONFIG_KEY, 0);
+ final BitSet receivedSubtaskIndexes = new BitSet(numberOfSubtaskIndexesToReceive);
+
+ IntValue record;
+
+ int numberOfReceivedSubtaskIndexes = 0;
+
+ while ((record = reader.next()) != null) {
+ // Check that we don't receive more than expected
+ numberOfReceivedSubtaskIndexes++;
+
+ if (numberOfReceivedSubtaskIndexes > numberOfSubtaskIndexesToReceive) {
+ throw new IllegalStateException("Received more records than expected.");
+ }
+
+ int subtaskIndex = record.getValue();
+
+ // Check that we only receive each subtask index once
+ if (receivedSubtaskIndexes.get(subtaskIndex)) {
+ throw new IllegalStateException("Received expected subtask index twice.");
+ }
+ else {
+ receivedSubtaskIndexes.set(subtaskIndex, true);
+ }
+ }
+
+ // Check that we have received all expected subtask indexes
+ if (receivedSubtaskIndexes.cardinality() != numberOfSubtaskIndexesToReceive) {
+ throw new IllegalStateException("Finished receive, but did not receive "
+ + "all expected subtask indexes.");
+ }
+ }
+ finally {
+ reader.clearBuffers();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b1a7e4fd/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
index cba80aa..2f3f555 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.jobmanager;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.reader.RecordReader;
@@ -27,17 +29,21 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.testutils.category.New;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import java.util.BitSet;
+@Category(New.class)
public class SlotCountExceedingParallelismTest extends TestLogger {
// Test configuration
@@ -47,20 +53,28 @@ public class SlotCountExceedingParallelismTest extends TestLogger {
public static final String JOB_NAME = "SlotCountExceedingParallelismTest (no slot sharing, blocking results)";
- private static TestingCluster flink;
+ private static MiniCluster flink;
@BeforeClass
public static void setUp() throws Exception {
- flink = TestingUtils.startTestingCluster(
- NUMBER_OF_SLOTS_PER_TM,
- NUMBER_OF_TMS,
- TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
+ final Configuration config = new Configuration();
+ config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
+
+ final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
+ .setConfiguration(config)
+ .setNumTaskManagers(NUMBER_OF_TMS)
+ .setNumSlotsPerTaskManager(NUMBER_OF_SLOTS_PER_TM)
+ .build();
+
+ flink = new MiniCluster(miniClusterConfiguration);
+
+ flink.start();
}
@AfterClass
public static void tearDown() throws Exception {
if (flink != null) {
- flink.stop();
+ flink.close();
}
}
@@ -87,8 +101,8 @@ public class SlotCountExceedingParallelismTest extends TestLogger {
// ---------------------------------------------------------------------------------------------
- private void submitJobGraphAndWait(final JobGraph jobGraph) throws JobExecutionException {
- flink.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION());
+ private void submitJobGraphAndWait(final JobGraph jobGraph) throws JobExecutionException, InterruptedException {
+ flink.executeJobBlocking(jobGraph);
}
private JobGraph createTestJobGraph(
[11/19] flink git commit: [hotfix][tests] Properly pass
jarfiles/classpaths to EnvironmentFactory
Posted by ch...@apache.org.
[hotfix][tests] Properly pass jarfiles/classpaths to EnvironmentFactory
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/491aceb0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/491aceb0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/491aceb0
Branch: refs/heads/release-1.5
Commit: 491aceb0514f31bf47efb889413b781a22340bd8
Parents: fc1244e
Author: zentol <ch...@apache.org>
Authored: Mon Mar 26 12:58:43 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 4 14:26:47 2018 +0200
----------------------------------------------------------------------
.../src/main/java/org/apache/flink/test/util/TestEnvironment.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/491aceb0/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
index 1d82f87..948755b 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
@@ -140,7 +140,7 @@ public class TestEnvironment extends ExecutionEnvironment {
ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
@Override
public ExecutionEnvironment createExecutionEnvironment() {
- lastEnv = new TestEnvironment(jobExecutor, getParallelism(), getConfig().isObjectReuseEnabled());
+ lastEnv = new TestEnvironment(jobExecutor, getParallelism(), getConfig().isObjectReuseEnabled(), jarFiles, classPaths);
return lastEnv;
}
};
[17/19] flink git commit: [FLINK-8804][build] Bump
flink-shaded-jackson version to 3.0
Posted by ch...@apache.org.
[FLINK-8804][build] Bump flink-shaded-jackson version to 3.0
This closes #5596.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d7b1257c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d7b1257c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d7b1257c
Branch: refs/heads/release-1.5
Commit: d7b1257ce6b5c7a3d999acb69d5a6a5ff749425a
Parents: f817267
Author: zentol <ch...@apache.org>
Authored: Wed Feb 28 11:42:21 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 4 14:26:53 2018 +0200
----------------------------------------------------------------------
flink-dist/pom.xml | 7 ---
flink-docs/pom.xml | 18 +-------
.../flink/docs/rest/RestAPIDocGenerator.java | 14 +++---
flink-libraries/flink-sql-client/pom.xml | 11 +----
.../flink/table/client/config/ConfigUtil.java | 10 ++--
pom.xml | 48 +++++++++++++++++++-
6 files changed, 63 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d7b1257c/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index 79eb53f..923f078 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -506,13 +506,6 @@ under the License.
<exclude>log4j:log4j</exclude>
</excludes>
</artifactSet>
- <relocations>
- <relocation>
- <!-- relocate jackson services, which isn't done by flink-shaded-jackson -->
- <pattern>com.fasterxml.jackson</pattern>
- <shadedPattern>org.apache.flink.shaded.jackson2.com.fasterxml.jackson</shadedPattern>
- </relocation>
- </relocations>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
http://git-wip-us.apache.org/repos/asf/flink/blob/d7b1257c/flink-docs/pom.xml
----------------------------------------------------------------------
diff --git a/flink-docs/pom.xml b/flink-docs/pom.xml
index f44cf13..94716ab 100644
--- a/flink-docs/pom.xml
+++ b/flink-docs/pom.xml
@@ -63,23 +63,9 @@ under the License.
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-netty</artifactId>
</dependency>
-
- <dependency>
- <!-- We use standard jackson since jackson-module-jsonSchema isn't part of flink-shaded-jackson -->
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
- <version>${jackson.version}</version>
- </dependency>
<dependency>
- <!-- We use standard jackson since jackson-module-jsonSchema isn't part of flink-shaded-jackson -->
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- <version>${jackson.version}</version>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.module</groupId>
- <artifactId>jackson-module-jsonSchema</artifactId>
- <version>${jackson.version}</version>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded-jackson-module-jsonSchema</artifactId>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/flink/blob/d7b1257c/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
----------------------------------------------------------------------
diff --git a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
index 8ece7b1..2d5ec8f 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
@@ -41,15 +41,15 @@ import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.SerializableString;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.io.CharacterEscapes;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.io.SerializedString;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.module.jsonSchema.JsonSchema;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.SerializableString;
-import com.fasterxml.jackson.core.io.CharacterEscapes;
-import com.fasterxml.jackson.core.io.SerializedString;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
-import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/flink/blob/d7b1257c/flink-libraries/flink-sql-client/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/pom.xml b/flink-libraries/flink-sql-client/pom.xml
index a005f85..dcc2743 100644
--- a/flink-libraries/flink-sql-client/pom.xml
+++ b/flink-libraries/flink-sql-client/pom.xml
@@ -99,15 +99,8 @@ under the License.
<!-- configuration -->
<dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- <version>${jackson.version}</version>
- </dependency>
-
- <dependency>
- <groupId>com.fasterxml.jackson.dataformat</groupId>
- <artifactId>jackson-dataformat-yaml</artifactId>
- <version>${jackson.version}</version>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded-jackson</artifactId>
</dependency>
<!-- test dependencies -->
http://git-wip-us.apache.org/repos/asf/flink/blob/d7b1257c/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/ConfigUtil.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/ConfigUtil.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/ConfigUtil.java
index 87201a6..337d803 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/ConfigUtil.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/ConfigUtil.java
@@ -18,11 +18,11 @@
package org.apache.flink.table.client.config;
-import com.fasterxml.jackson.core.JsonToken;
-import com.fasterxml.jackson.core.io.IOContext;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
-import com.fasterxml.jackson.dataformat.yaml.YAMLParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonToken;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.io.IOContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.yaml.YAMLParser;
import java.io.IOException;
import java.io.InputStream;
http://git-wip-us.apache.org/repos/asf/flink/blob/d7b1257c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3df9bb4..7918345 100644
--- a/pom.xml
+++ b/pom.xml
@@ -247,7 +247,53 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-jackson</artifactId>
- <version>${jackson.version}-${flink.shaded.version}</version>
+ <!-- We use a newer version since we didn't have to time to do a proper switch to 3.0 -->
+ <version>${jackson.version}-3.0</version>
+ <!-- Dependencies aren't properly hidden in 3.0 -->
+ <exclusions>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-yaml</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded-jackson-module-jsonSchema</artifactId>
+ <!-- We use a newer version since we didn't have to time to do a proper switch to 3.0 -->
+ <version>${jackson.version}-3.0</version>
+ <!-- Dependencies aren't properly hidden in 3.0 -->
+ <exclusions>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.module</groupId>
+ <artifactId>jackson-module-jsonSchema</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
[04/19] flink git commit: [FLINK-9069] Add checkstyle rule to detect
multiple consecutive semicolons
Posted by ch...@apache.org.
[FLINK-9069] Add checkstyle rule to detect multiple consecutive semicolons
This closes #5769.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/27cf4be6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/27cf4be6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/27cf4be6
Branch: refs/heads/release-1.5
Commit: 27cf4be66d53a28a9ad6900c9a2ef67ef2254270
Parents: 1eb2b3a
Author: jparkie <pa...@gmail.com>
Authored: Sun Mar 25 18:09:25 2018 -0700
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 4 08:59:18 2018 +0200
----------------------------------------------------------------------
.../org/apache/flink/configuration/HistoryServerOptions.java | 2 +-
.../java/org/apache/calcite/avatica/util/DateTimeUtils.java | 4 ++--
.../runtime/io/network/netty/PartitionRequestQueueTest.java | 2 +-
.../main/java/org/apache/flink/yarn/YarnClusterClient.java | 2 +-
tools/maven/checkstyle.xml | 7 +++++++
5 files changed, 12 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/27cf4be6/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
index a16fd7f..13cdc1e 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
@@ -34,7 +34,7 @@ public class HistoryServerOptions {
public static final ConfigOption<Long> HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL =
key("historyserver.archive.fs.refresh-interval")
.defaultValue(10000L)
- .withDescription("Interval in milliseconds for refreshing the archived job directories.");;
+ .withDescription("Interval in milliseconds for refreshing the archived job directories.");
/**
* Comma-separated list of directories which the HistoryServer polls for new archives.
http://git-wip-us.apache.org/repos/asf/flink/blob/27cf4be6/flink-libraries/flink-table/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java b/flink-libraries/flink-table/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java
index d1a87a7..fe09d18 100644
--- a/flink-libraries/flink-table/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java
+++ b/flink-libraries/flink-table/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java
@@ -427,7 +427,7 @@ public class DateTimeUtils {
}
public static int digitCount(int v) {
- for (int n = 1;; n++) {
+ for (int n = 1; true; n++) {
v /= 10;
if (v == 0) {
return n;
@@ -960,7 +960,7 @@ public class DateTimeUtils {
// Start with an estimate.
// Since no month has more than 31 days, the estimate is <= the true value.
int m = (date0 - date1) / 31;
- for (;;) {
+ while (true) {
int date2 = addMonths(date1, m);
if (date2 >= date0) {
return m;
http://git-wip-us.apache.org/repos/asf/flink/blob/27cf4be6/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
index f614c18..2deaa9b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
@@ -165,7 +165,7 @@ public class PartitionRequestQueueTest {
private final AtomicInteger buffersInBacklog;
private DefaultBufferResultSubpartitionView(int buffersInBacklog) {
- this.buffersInBacklog = new AtomicInteger(buffersInBacklog);;
+ this.buffersInBacklog = new AtomicInteger(buffersInBacklog);
}
@Nullable
http://git-wip-us.apache.org/repos/asf/flink/blob/27cf4be6/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index 29ece26..2ac9664 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -244,7 +244,7 @@ public class YarnClusterClient extends ClusterClient<ApplicationId> {
public void waitForClusterToBeReady() {
logAndSysout("Waiting until all TaskManagers have connected");
- for (GetClusterStatusResponse currentStatus, lastStatus = null;; lastStatus = currentStatus) {
+ for (GetClusterStatusResponse currentStatus, lastStatus = null; true; lastStatus = currentStatus) {
currentStatus = getClusterStatus();
if (currentStatus != null && !currentStatus.equals(lastStatus)) {
logAndSysout("TaskManager status (" + currentStatus.numRegisteredTaskManagers() + "/"
http://git-wip-us.apache.org/repos/asf/flink/blob/27cf4be6/tools/maven/checkstyle.xml
----------------------------------------------------------------------
diff --git a/tools/maven/checkstyle.xml b/tools/maven/checkstyle.xml
index a0168b0..a0e7dd7 100644
--- a/tools/maven/checkstyle.xml
+++ b/tools/maven/checkstyle.xml
@@ -462,6 +462,13 @@ This file is based on the checkstyle file of Apache Beam.
<!-- Detects empty statements (standalone ";" semicolon). -->
<module name="EmptyStatement"/>
+ <!-- Detect multiple consecutive semicolons (e.g. ";;"). -->
+ <module name="RegexpSinglelineJava">
+ <property name="format" value=";{2,}"/>
+ <property name="message" value="Use one semicolon"/>
+ <property name="ignoreComments" value="true"/>
+ </module>
+
<!--
MODIFIERS CHECKS
[02/19] flink git commit: [FLINK-8703][tests] Port CancelingTestBase
to MiniClusterResource
Posted by ch...@apache.org.
[FLINK-8703][tests] Port CancelingTestBase to MiniClusterResource
This closes #5664.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/20d7af77
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/20d7af77
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/20d7af77
Branch: refs/heads/release-1.5
Commit: 20d7af77005f831f1cadf3769e0a9a059b9f37d6
Parents: b1a7e4f
Author: zentol <ch...@apache.org>
Authored: Mon Feb 26 15:36:37 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 4 08:59:18 2018 +0200
----------------------------------------------------------------------
.../test/cancelling/CancelingTestBase.java | 133 ++++++-------------
.../test/cancelling/JoinCancelingITCase.java | 9 +-
.../test/cancelling/MapCancelingITCase.java | 7 +-
3 files changed, 45 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/20d7af77/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
index 03ca649..cac16f0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
@@ -18,9 +18,10 @@
package org.apache.flink.test.cancelling;
+import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
+import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.TaskManagerOptions;
@@ -28,150 +29,100 @@ import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
+import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.util.TestLogger;
-import org.apache.hadoop.fs.FileSystem;
-import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.junit.ClassRule;
import java.util.concurrent.TimeUnit;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
-import static org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
-import static org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
-import static org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
-
/**
* Base class for testing job cancellation.
*/
public abstract class CancelingTestBase extends TestLogger {
- private static final Logger LOG = LoggerFactory.getLogger(CancelingTestBase.class);
-
private static final int MINIMUM_HEAP_SIZE_MB = 192;
- /**
- * Defines the number of seconds after which an issued cancel request is expected to have taken effect (i.e. the job
- * is canceled), starting from the point in time when the cancel request is issued.
- */
- private static final int DEFAULT_CANCEL_FINISHED_INTERVAL = 10 * 1000;
-
- private static final int DEFAULT_TASK_MANAGER_NUM_SLOTS = 1;
+ protected static final int PARALLELISM = 4;
// --------------------------------------------------------------------------------------------
- protected LocalFlinkMiniCluster executor;
-
- protected int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
+ @ClassRule
+ public static final MiniClusterResource CLUSTER = new MiniClusterResource(
+ new MiniClusterResource.MiniClusterResourceConfiguration(
+ getConfiguration(),
+ 2,
+ 4),
+ true);
// --------------------------------------------------------------------------------------------
- private void verifyJvmOptions() {
+ private static void verifyJvmOptions() {
final long heap = Runtime.getRuntime().maxMemory() >> 20;
Assert.assertTrue("Insufficient java heap space " + heap + "mb - set JVM option: -Xmx" + MINIMUM_HEAP_SIZE_MB
+ "m", heap > MINIMUM_HEAP_SIZE_MB - 50);
}
- @Before
- public void startCluster() throws Exception {
+ private static Configuration getConfiguration() {
verifyJvmOptions();
Configuration config = new Configuration();
config.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true);
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096);
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 2048);
- this.executor = new LocalFlinkMiniCluster(config, false);
- this.executor.start();
- }
-
- @After
- public void stopCluster() throws Exception {
- if (this.executor != null) {
- this.executor.stop();
- this.executor = null;
- FileSystem.closeAll();
- System.gc();
- }
+ return config;
}
// --------------------------------------------------------------------------------------------
- public void runAndCancelJob(Plan plan, int msecsTillCanceling) throws Exception {
- runAndCancelJob(plan, msecsTillCanceling, DEFAULT_CANCEL_FINISHED_INTERVAL);
- }
-
- public void runAndCancelJob(Plan plan, final int msecsTillCanceling, int maxTimeTillCanceled) throws Exception {
- try {
- // submit job
- final JobGraph jobGraph = getJobGraph(plan);
-
- executor.submitJobDetached(jobGraph);
+ protected void runAndCancelJob(Plan plan, final int msecsTillCanceling, int maxTimeTillCanceled) throws Exception {
+ // submit job
+ final JobGraph jobGraph = getJobGraph(plan);
- // Wait for the job to make some progress and then cancel
- JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.RUNNING,
- executor.getLeaderGateway(TestingUtils.TESTING_DURATION()),
- TestingUtils.TESTING_DURATION());
+ ClusterClient<?> client = CLUSTER.getClusterClient();
+ client.setDetached(true);
- Thread.sleep(msecsTillCanceling);
+ JobSubmissionResult jobSubmissionResult = client.submitJob(jobGraph, CancelingTestBase.class.getClassLoader());
- FiniteDuration timeout = new FiniteDuration(maxTimeTillCanceled, TimeUnit.MILLISECONDS);
+ Deadline submissionDeadLine = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
- ActorGateway jobManager = executor.getLeaderGateway(TestingUtils.TESTING_DURATION());
+ JobStatus jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(submissionDeadLine.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ while (jobStatus != JobStatus.RUNNING && submissionDeadLine.hasTimeLeft()) {
+ Thread.sleep(50);
+ jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(submissionDeadLine.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ }
+ if (jobStatus != JobStatus.RUNNING) {
+ Assert.fail("Job not in state RUNNING.");
+ }
- Future<Object> ask = jobManager.ask(new CancelJob(jobGraph.getJobID()), timeout);
+ Thread.sleep(msecsTillCanceling);
- Object result = Await.result(ask, timeout);
+ client.cancel(jobSubmissionResult.getJobID());
- if (result instanceof CancellationSuccess) {
- // all good
- } else if (result instanceof CancellationFailure) {
- // Failure
- CancellationFailure failure = (CancellationFailure) result;
- throw new Exception("Failed to cancel job with ID " + failure.jobID() + ".",
- failure.cause());
- } else {
- throw new Exception("Unexpected response to cancel request: " + result);
- }
+ Deadline cancelDeadline = new FiniteDuration(maxTimeTillCanceled, TimeUnit.MILLISECONDS).fromNow();
- // Wait for the job to be cancelled
- JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.CANCELED,
- executor.getLeaderGateway(TestingUtils.TESTING_DURATION()),
- TestingUtils.TESTING_DURATION());
+ JobStatus jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(cancelDeadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ while (jobStatusAfterCancel != JobStatus.CANCELED && cancelDeadline.hasTimeLeft()) {
+ Thread.sleep(50);
+ jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(cancelDeadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
}
- catch (Exception e) {
- LOG.error("Exception found in runAndCancelJob.", e);
- e.printStackTrace();
- Assert.fail(e.getMessage());
+ if (jobStatusAfterCancel != JobStatus.CANCELED) {
+ Assert.fail("Failed to cancel job with ID " + jobSubmissionResult.getJobID() + '.');
}
}
- private JobGraph getJobGraph(final Plan plan) throws Exception {
- final Optimizer pc = new Optimizer(new DataStatistics(), this.executor.configuration());
+ private JobGraph getJobGraph(final Plan plan) {
+ final Optimizer pc = new Optimizer(new DataStatistics(), getConfiguration());
final OptimizedPlan op = pc.compile(plan);
final JobGraphGenerator jgg = new JobGraphGenerator();
return jgg.compileJobGraph(op);
}
-
- public void setTaskManagerNumSlots(int taskManagerNumSlots) {
- this.taskManagerNumSlots = taskManagerNumSlots;
- }
-
- public int getTaskManagerNumSlots() {
- return this.taskManagerNumSlots;
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/20d7af77/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
index 5e21129..66919e7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
@@ -34,15 +34,10 @@ import org.apache.flink.test.util.UniformIntTupleGeneratorInputFormat;
* Test job cancellation from within a JoinFunction.
*/
public class JoinCancelingITCase extends CancelingTestBase {
- private static final int parallelism = 4;
-
- public JoinCancelingITCase() {
- setTaskManagerNumSlots(parallelism);
- }
// --------------- Test Sort Matches that are canceled while still reading / sorting -----------------
private void executeTask(JoinFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> joiner, boolean slow) throws Exception {
- executeTask(joiner, slow, parallelism);
+ executeTask(joiner, slow, PARALLELISM);
}
private void executeTask(JoinFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> joiner, boolean slow, int parallelism) throws Exception {
@@ -90,7 +85,7 @@ public class JoinCancelingITCase extends CancelingTestBase {
.with(joiner)
.output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
- env.setParallelism(parallelism);
+ env.setParallelism(PARALLELISM);
runAndCancelJob(env.createProgramPlan(), msecsTillCanceling, maxTimeTillCanceled);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/20d7af77/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
index 3a7039f..13edea4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
@@ -31,11 +31,6 @@ import org.junit.Test;
* Test job cancellation from within a MapFunction.
*/
public class MapCancelingITCase extends CancelingTestBase {
- private static final int parallelism = 4;
-
- public MapCancelingITCase() {
- setTaskManagerNumSlots(parallelism);
- }
@Test
public void testMapCancelling() throws Exception {
@@ -65,7 +60,7 @@ public class MapCancelingITCase extends CancelingTestBase {
.map(mapper)
.output(new DiscardingOutputFormat<Integer>());
- env.setParallelism(parallelism);
+ env.setParallelism(PARALLELISM);
runAndCancelJob(env.createProgramPlan(), 5 * 1000, 10 * 1000);
}