You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/25 07:32:15 UTC
[01/10] flink git commit: [FLINK-9240][tests] Harden
WebFrontendITCase#testStopYarn()
Repository: flink
Updated Branches:
refs/heads/master 0321eb35e -> c50b573e9
[FLINK-9240][tests] Harden WebFrontendITCase#testStopYarn()
This closes #5886.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/26eb8bb2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/26eb8bb2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/26eb8bb2
Branch: refs/heads/master
Commit: 26eb8bb29989b9e65dbd0afe9c107c285b98cf87
Parents: 0321eb3
Author: sihuazhou <su...@163.com>
Authored: Sat Apr 21 13:34:21 2018 +0800
Committer: zentol <ch...@apache.org>
Committed: Tue Apr 24 13:24:18 2018 +0200
----------------------------------------------------------------------
.../runtime/webmonitor/WebFrontendITCase.java | 34 +++++++++++---------
1 file changed, 19 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/26eb8bb2/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
index f512766..994966e 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
@@ -317,6 +317,8 @@ public class WebFrontendITCase extends TestLogger {
"\"execution-config\":{\"execution-mode\":\"PIPELINED\",\"restart-strategy\":\"default\"," +
"\"job-parallelism\":-1,\"object-reuse-mode\":false,\"user-config\":{}}}", response.getContent());
}
+
+ BlockingInvokable.reset();
}
@Test
@@ -347,25 +349,27 @@ public class WebFrontendITCase extends TestLogger {
final FiniteDuration testTimeout = new FiniteDuration(2, TimeUnit.MINUTES);
final Deadline deadline = testTimeout.fromNow();
- while (!getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
- try (HttpTestClient client = new HttpTestClient("localhost", CLUSTER.getWebUIPort())) {
- // Request the file from the web server
- client.sendGetRequest("/jobs/" + jid + "/yarn-stop", deadline.timeLeft());
-
- HttpTestClient.SimpleHttpResponse response = client
- .getNextResponse(deadline.timeLeft());
-
- if (Objects.equals(MiniClusterResource.NEW_CODEBASE, System.getProperty(MiniClusterResource.CODEBASE_KEY))) {
- assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
- } else {
- assertEquals(HttpResponseStatus.OK, response.getStatus());
- }
- assertEquals("application/json; charset=UTF-8", response.getType());
- assertEquals("{}", response.getContent());
+ try (HttpTestClient client = new HttpTestClient("localhost", CLUSTER.getWebUIPort())) {
+ // Request the file from the web server
+ client.sendGetRequest("/jobs/" + jid + "/yarn-stop", deadline.timeLeft());
+
+ HttpTestClient.SimpleHttpResponse response = client
+ .getNextResponse(deadline.timeLeft());
+
+ if (Objects.equals(MiniClusterResource.NEW_CODEBASE, System.getProperty(MiniClusterResource.CODEBASE_KEY))) {
+ assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
+ } else {
+ assertEquals(HttpResponseStatus.OK, response.getStatus());
}
+ assertEquals("application/json; charset=UTF-8", response.getType());
+ assertEquals("{}", response.getContent());
+ }
+ // wait for cancellation to finish
+ while (!getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
Thread.sleep(20);
}
+
BlockingInvokable.reset();
}
[08/10] flink git commit: [FLINK-9100][core] Hide configured value
for "password"/"secret"
Posted by ch...@apache.org.
[FLINK-9100][core] Hide configured value for "password"/"secret"
This closes #9100.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1a9675d5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1a9675d5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1a9675d5
Branch: refs/heads/master
Commit: 1a9675d54fda7c6d7c519935dde05f47eb449401
Parents: aa4bdc0
Author: sihuazhou <su...@163.com>
Authored: Tue Apr 24 13:39:11 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Apr 24 13:41:43 2018 +0200
----------------------------------------------------------------------
.../configuration/GlobalConfiguration.java | 24 +++++++++++++++++++-
.../configuration/GlobalConfigurationTest.java | 10 ++++++++
2 files changed, 33 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1a9675d5/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
index 2f2a9cf..fd7d441 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
@@ -19,6 +19,7 @@
package org.apache.flink.configuration;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,6 +43,11 @@ public final class GlobalConfiguration {
public static final String FLINK_CONF_FILENAME = "flink-conf.yaml";
+ // the keys whose values should be hidden
+ private static final String[] SENSITIVE_KEYS = new String[] {"password", "secret"};
+
+ // the hidden content to be displayed
+ public static final String HIDDEN_CONTENT = "******";
// --------------------------------------------------------------------------------------------
@@ -183,7 +189,7 @@ public final class GlobalConfiguration {
continue;
}
- LOG.info("Loading configuration property: {}, {}", key, value);
+ LOG.info("Loading configuration property: {}, {}", key, isSensitive(key) ? HIDDEN_CONTENT : value);
config.setString(key, value);
}
}
@@ -194,4 +200,20 @@ public final class GlobalConfiguration {
return config;
}
+ /**
+ * Check whether the key is a hidden key.
+ *
+ * @param key the config key
+ */
+ public static boolean isSensitive(String key) {
+ Preconditions.checkNotNull(key, "key is null");
+ final String keyInLower = key.toLowerCase();
+ for (String hideKey : SENSITIVE_KEYS) {
+ if (keyInLower.length() >= hideKey.length()
+ && keyInLower.contains(hideKey)) {
+ return true;
+ }
+ }
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a9675d5/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
index c5d2e62..a42564d 100644
--- a/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
@@ -31,7 +31,9 @@ import java.io.PrintWriter;
import java.util.UUID;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
/**
* This class contains tests for the global configuration (parsing configuration directory information).
@@ -120,4 +122,12 @@ public class GlobalConfigurationTest extends TestLogger {
assertNotNull(GlobalConfiguration.loadConfiguration(tempFolder.getRoot().getAbsolutePath()));
}
+ @Test
+ public void testHiddenKey() {
+ assertTrue(GlobalConfiguration.isSensitive("password123"));
+ assertTrue(GlobalConfiguration.isSensitive("123pasSword"));
+ assertTrue(GlobalConfiguration.isSensitive("PasSword"));
+ assertTrue(GlobalConfiguration.isSensitive("Secret"));
+ assertFalse(GlobalConfiguration.isSensitive("Hello"));
+ }
}
[04/10] flink git commit: [hotfix][tests] Enable
NetworkStackThroughputITCase
Posted by ch...@apache.org.
[hotfix][tests] Enable NetworkStackThroughputITCase
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f633a80b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f633a80b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f633a80b
Branch: refs/heads/master
Commit: f633a80bb22011ca1e7a0e4fc7a754d79b213e00
Parents: 598cb66
Author: zentol <ch...@apache.org>
Authored: Thu Apr 19 10:28:44 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Apr 24 13:30:07 2018 +0200
----------------------------------------------------------------------
.../test/runtime/NetworkStackThroughputITCase.java | 14 +++++---------
1 file changed, 5 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f633a80b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
index 1b46f46..b5c233a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
@@ -36,7 +36,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.util.TestLogger;
-import org.junit.Ignore;
+import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,7 +46,6 @@ import java.util.concurrent.TimeUnit;
/**
* Manually test the throughput of the network stack.
*/
-@Ignore
public class NetworkStackThroughputITCase extends TestLogger {
private static final Logger LOG = LoggerFactory.getLogger(NetworkStackThroughputITCase.class);
@@ -84,7 +83,7 @@ public class NetworkStackThroughputITCase extends TestLogger {
// Determine the amount of data to send per subtask
int dataVolumeGb = getTaskConfiguration().getInteger(NetworkStackThroughputITCase.DATA_VOLUME_GB_CONFIG_KEY, 1);
- long dataMbPerSubtask = (dataVolumeGb * 1024) / getCurrentNumberOfSubtasks();
+ long dataMbPerSubtask = (dataVolumeGb * 10) / getCurrentNumberOfSubtasks();
long numRecordsToEmit = (dataMbPerSubtask * 1024 * 1024) / SpeedTestRecord.RECORD_SIZE;
LOG.info(String.format("%d/%d: Producing %d records (each record: %d bytes, total: %.2f GB)",
@@ -209,6 +208,7 @@ public class NetworkStackThroughputITCase extends TestLogger {
// ------------------------------------------------------------------------
+ @Test
public void testThroughput() throws Exception {
Object[][] configParams = new Object[][]{
new Object[]{1, false, false, false, 4, 2},
@@ -335,13 +335,9 @@ public class NetworkStackThroughputITCase extends TestLogger {
return jobGraph;
}
- private void runAllTests() throws Exception {
- testThroughput();
+ public static void main(String[] args) throws Exception {
+ new NetworkStackThroughputITCase().testThroughput();
System.out.println("Done.");
}
-
- public static void main(String[] args) throws Exception {
- new NetworkStackThroughputITCase().runAllTests();
- }
}
[07/10] flink git commit: [FLINK-8793][REST] Hide configured value
for "password"/"secret"
Posted by ch...@apache.org.
[FLINK-8793][REST] Hide configured value for "password"/"secret"
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aa4bdc03
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aa4bdc03
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aa4bdc03
Branch: refs/heads/master
Commit: aa4bdc037caaa2b516864007b90a7271e998702a
Parents: 512083a
Author: sihuazhou <su...@163.com>
Authored: Tue Apr 24 13:39:22 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Apr 24 13:41:31 2018 +0200
----------------------------------------------------------------------
.../flink/runtime/rest/handler/legacy/ClusterConfigHandler.java | 5 +++--
.../flink/runtime/rest/messages/ClusterConfigurationInfo.java | 5 +++--
2 files changed, 6 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/aa4bdc03/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterConfigHandler.java
index 76221b5..ff2f04e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterConfigHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterConfigHandler.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoEntry;
import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoHeaders;
@@ -74,8 +75,8 @@ public class ClusterConfigHandler extends AbstractJsonRequestHandler {
String value = config.getString(key, null);
// Mask key values which contain sensitive information
- if (value != null && key.toLowerCase().contains("password")) {
- value = "******";
+ if (value != null && GlobalConfiguration.isSensitive(key)) {
+ value = GlobalConfiguration.HIDDEN_CONTENT;
}
gen.writeStringField(ClusterConfigurationInfoEntry.FIELD_NAME_CONFIG_VALUE, value);
http://git-wip-us.apache.org/repos/asf/flink/blob/aa4bdc03/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfo.java
index 627dc4c..06e6765 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfo.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.rest.messages;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler;
import java.util.ArrayList;
@@ -45,8 +46,8 @@ public class ClusterConfigurationInfo extends ArrayList<ClusterConfigurationInfo
String value = config.getString(key, null);
// Mask key values which contain sensitive information
- if (value != null && key.toLowerCase().contains("password")) {
- value = "******";
+ if (value != null && GlobalConfiguration.isSensitive(key)) {
+ value = GlobalConfiguration.HIDDEN_CONTENT;
}
clusterConfig.add(new ClusterConfigurationInfoEntry(key, value));
[10/10] flink git commit: [FLINK-8704][tests] Port
PartialConsumerPipelinedResultTest
Posted by ch...@apache.org.
[FLINK-8704][tests] Port PartialConsumerPipelinedResultTest
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c50b573e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c50b573e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c50b573e
Branch: refs/heads/master
Commit: c50b573e9712786da22bd2863a51c26a005e0d15
Parents: 73e9f90
Author: zentol <ch...@apache.org>
Authored: Mon Mar 12 13:09:44 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Tue Apr 24 13:47:56 2018 +0200
----------------------------------------------------------------------
...LegacyPartialConsumePipelinedResultTest.java | 150 +++++++++++++++++++
.../PartialConsumePipelinedResultTest.java | 33 ++--
2 files changed, 171 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c50b573e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/LegacyPartialConsumePipelinedResultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/LegacyPartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/LegacyPartialConsumePipelinedResultTest.java
new file mode 100644
index 0000000..aecab75
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/LegacyPartialConsumePipelinedResultTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class LegacyPartialConsumePipelinedResultTest extends TestLogger {
+
+ // Test configuration
+ private final static int NUMBER_OF_TMS = 1;
+ private final static int NUMBER_OF_SLOTS_PER_TM = 1;
+ private final static int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
+
+ private final static int NUMBER_OF_NETWORK_BUFFERS = 128;
+
+ private static TestingCluster flink;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ final Configuration config = new Configuration();
+ config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUMBER_OF_TMS);
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUMBER_OF_SLOTS_PER_TM);
+ config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
+ config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, NUMBER_OF_NETWORK_BUFFERS);
+
+ flink = new TestingCluster(config, true);
+
+ flink.start();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ flink.stop();
+ }
+
+ /**
+ * Tests a fix for FLINK-1930.
+ *
+ * <p> When consuming a pipelined result only partially, is is possible that local channels
+ * release the buffer pool, which is associated with the result partition, too early. If the
+ * producer is still producing data when this happens, it runs into an IllegalStateException,
+ * because of the destroyed buffer pool.
+ *
+ * @see <a href="https://issues.apache.org/jira/browse/FLINK-1930">FLINK-1930</a>
+ */
+ @Test
+ public void testPartialConsumePipelinedResultReceiver() throws Exception {
+ final JobVertex sender = new JobVertex("Sender");
+ sender.setInvokableClass(SlowBufferSender.class);
+ sender.setParallelism(PARALLELISM);
+
+ final JobVertex receiver = new JobVertex("Receiver");
+ receiver.setInvokableClass(SingleBufferReceiver.class);
+ receiver.setParallelism(PARALLELISM);
+
+ // The partition needs to be pipelined, otherwise the original issue does not occur, because
+ // the sender and receiver are not online at the same time.
+ receiver.connectNewDataSetAsInput(
+ sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+
+ final JobGraph jobGraph = new JobGraph("Partial Consume of Pipelined Result", sender, receiver);
+
+ final SlotSharingGroup slotSharingGroup = new SlotSharingGroup(
+ sender.getID(), receiver.getID());
+
+ sender.setSlotSharingGroup(slotSharingGroup);
+ receiver.setSlotSharingGroup(slotSharingGroup);
+
+ flink.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION());
+ }
+
+ // ---------------------------------------------------------------------------------------------
+
+ /**
+ * Sends a fixed number of buffers and sleeps in-between sends.
+ */
+ public static class SlowBufferSender extends AbstractInvokable {
+
+ public SlowBufferSender(Environment environment) {
+ super(environment);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ final ResultPartitionWriter writer = getEnvironment().getWriter(0);
+
+ for (int i = 0; i < 8; i++) {
+ final BufferBuilder bufferBuilder = writer.getBufferProvider().requestBufferBuilderBlocking();
+ writer.addBufferConsumer(bufferBuilder.createBufferConsumer(), 0);
+ Thread.sleep(50);
+ bufferBuilder.finish();
+ }
+ }
+ }
+
+ /**
+ * Reads a single buffer and recycles it.
+ */
+ public static class SingleBufferReceiver extends AbstractInvokable {
+
+ public SingleBufferReceiver(Environment environment) {
+ super(environment);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ InputGate gate = getEnvironment().getInputGate(0);
+ Buffer buffer = gate.getNextBufferOrEvent().orElseThrow(IllegalStateException::new).getBuffer();
+ if (buffer != null) {
+ buffer.recycleBuffer();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c50b573e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index 8080ca4..7169796 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.execution.Environment;
@@ -32,41 +31,51 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.testutils.category.New;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
+@Category(New.class)
public class PartialConsumePipelinedResultTest extends TestLogger {
// Test configuration
- private final static int NUMBER_OF_TMS = 1;
- private final static int NUMBER_OF_SLOTS_PER_TM = 1;
- private final static int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
+ private static final int NUMBER_OF_TMS = 1;
+ private static final int NUMBER_OF_SLOTS_PER_TM = 1;
+ private static final int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
- private final static int NUMBER_OF_NETWORK_BUFFERS = 128;
+ private static final int NUMBER_OF_NETWORK_BUFFERS = 128;
- private static TestingCluster flink;
+ private static MiniCluster flink;
@BeforeClass
public static void setUp() throws Exception {
final Configuration config = new Configuration();
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUMBER_OF_TMS);
- config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUMBER_OF_SLOTS_PER_TM);
config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, NUMBER_OF_NETWORK_BUFFERS);
- flink = new TestingCluster(config, true);
+ final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
+ .setConfiguration(config)
+ .setNumTaskManagers(NUMBER_OF_TMS)
+ .setNumSlotsPerTaskManager(NUMBER_OF_SLOTS_PER_TM)
+ .build();
+
+ flink = new MiniCluster(miniClusterConfiguration);
flink.start();
}
@AfterClass
public static void tearDown() throws Exception {
- flink.stop();
+ if (flink != null) {
+ flink.close();
+ }
}
/**
@@ -102,7 +111,7 @@ public class PartialConsumePipelinedResultTest extends TestLogger {
sender.setSlotSharingGroup(slotSharingGroup);
receiver.setSlotSharingGroup(slotSharingGroup);
- flink.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION());
+ flink.executeJobBlocking(jobGraph);
}
// ---------------------------------------------------------------------------------------------
[06/10] flink git commit: [FLINK-9212][REST] Port
SubtasksAllAccumulatorsHandler to new REST endpoint
Posted by ch...@apache.org.
[FLINK-9212][REST] Port SubtasksAllAccumulatorsHandler to new REST endpoint
This closes #5893.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/512083a7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/512083a7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/512083a7
Branch: refs/heads/master
Commit: 512083a7dd8d0574afbfa8e2c7065f16e9a83d12
Parents: 8b98f79
Author: zhouhai02 <zh...@meituan.com>
Authored: Sun Apr 22 18:59:11 2018 +0800
Committer: zentol <ch...@apache.org>
Committed: Tue Apr 24 13:32:22 2018 +0200
----------------------------------------------------------------------
.../job/SubtasksAllAccumulatorsHandler.java | 82 +++++++++++
.../SubtasksAllAccumulatorsHandlers.java | 75 ++++++++++
.../job/SubtasksAllAccumulatorsInfo.java | 144 +++++++++++++++++++
.../runtime/webmonitor/WebMonitorEndpoint.java | 12 ++
.../job/SubtasksAllAccumulatorsInfoTest.java | 58 ++++++++
5 files changed, 371 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/512083a7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksAllAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksAllAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksAllAccumulatorsHandler.java
new file mode 100644
index 0000000..51efba2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksAllAccumulatorsHandler.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.job.SubtasksAllAccumulatorsInfo;
+import org.apache.flink.runtime.rest.messages.job.UserAccumulator;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler for the subtasks all accumulators.
+ */
+public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexHandler<SubtasksAllAccumulatorsInfo, JobVertexMessageParameters> {
+
+ public SubtasksAllAccumulatorsHandler(CompletableFuture<String> localRestAddress, GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, MessageHeaders<EmptyRequestBody, SubtasksAllAccumulatorsInfo, JobVertexMessageParameters> messageHeaders, ExecutionGraphCache executionGraphCache, Executor executor) {
+ super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor);
+ }
+
+ @Override
+ protected SubtasksAllAccumulatorsInfo handleRequest(HandlerRequest<EmptyRequestBody, JobVertexMessageParameters> request, AccessExecutionJobVertex jobVertex) throws RestHandlerException {
+ JobVertexID jobVertexId = jobVertex.getJobVertexId();
+ int parallelism = jobVertex.getParallelism();
+
+ final List<SubtasksAllAccumulatorsInfo.SubtaskAccumulatorsInfo> subtaskAccumulatorsInfos = new ArrayList<>();
+
+ for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
+ TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
+ String locationString = location == null ? "(unassigned)" : location.getHostname();
+
+ StringifiedAccumulatorResult[] accs = vertex.getCurrentExecutionAttempt().getUserAccumulatorsStringified();
+ List<UserAccumulator> userAccumulators = new ArrayList<>(accs.length);
+ for (StringifiedAccumulatorResult acc : accs) {
+ userAccumulators.add(new UserAccumulator(acc.getName(), acc.getType(), acc.getValue()));
+ }
+
+ subtaskAccumulatorsInfos.add(
+ new SubtasksAllAccumulatorsInfo.SubtaskAccumulatorsInfo(
+ vertex.getCurrentExecutionAttempt().getParallelSubtaskIndex(),
+ vertex.getCurrentExecutionAttempt().getAttemptNumber(),
+ locationString,
+ userAccumulators
+ ));
+ }
+
+ return new SubtasksAllAccumulatorsInfo(jobVertexId, parallelism, subtaskAccumulatorsInfos);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/512083a7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksAllAccumulatorsHandlers.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksAllAccumulatorsHandlers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksAllAccumulatorsHandlers.java
new file mode 100644
index 0000000..e178c93
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksAllAccumulatorsHandlers.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.job.SubtasksAllAccumulatorsHandler;
+import org.apache.flink.runtime.rest.messages.job.SubtasksAllAccumulatorsInfo;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link SubtasksAllAccumulatorsHandler}.
+ */
+public class SubtasksAllAccumulatorsHandlers implements MessageHeaders<EmptyRequestBody, SubtasksAllAccumulatorsInfo, JobVertexMessageParameters> {
+
+ private static final SubtasksAllAccumulatorsHandlers INSTANCE = new SubtasksAllAccumulatorsHandlers();
+
+ public static final String URL = "/jobs" +
+ "/:" + JobIDPathParameter.KEY +
+ "/vertices" +
+ "/:" + JobVertexIdPathParameter.KEY +
+ "/subtasks/accumulators";
+
+ private SubtasksAllAccumulatorsHandlers() {}
+
+ @Override
+ public Class<EmptyRequestBody> getRequestClass() {
+ return EmptyRequestBody.class;
+ }
+
+ @Override
+ public Class<SubtasksAllAccumulatorsInfo> getResponseClass() {
+ return SubtasksAllAccumulatorsInfo.class;
+ }
+
+ @Override
+ public HttpResponseStatus getResponseStatusCode() {
+ return HttpResponseStatus.OK;
+ }
+
+ @Override
+ public JobVertexMessageParameters getUnresolvedMessageParameters() {
+ return new JobVertexMessageParameters();
+ }
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.GET;
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return URL;
+ }
+
+ public static SubtasksAllAccumulatorsHandlers getInstance() {
+ return INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/512083a7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtasksAllAccumulatorsInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtasksAllAccumulatorsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtasksAllAccumulatorsInfo.java
new file mode 100644
index 0000000..ee2535f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtasksAllAccumulatorsInfo.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.job.SubtasksAllAccumulatorsHandler;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import java.util.Collection;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link SubtasksAllAccumulatorsHandler}.
+ */
+public class SubtasksAllAccumulatorsInfo implements ResponseBody {
+
+ public static final String FIELD_NAME_JOB_VERTEX_ID = "id";
+ public static final String FIELD_NAME_PARALLELISM = "parallelism";
+ public static final String FIELD_NAME_SUBTASKS = "subtasks";
+
+ @JsonProperty(FIELD_NAME_JOB_VERTEX_ID)
+ @JsonSerialize(using = JobVertexIDSerializer.class)
+ private final JobVertexID jobVertexId;
+
+ @JsonProperty(FIELD_NAME_PARALLELISM)
+ private final int parallelism;
+
+ @JsonProperty(FIELD_NAME_SUBTASKS)
+ private final Collection<SubtaskAccumulatorsInfo> subtaskAccumulatorsInfos;
+
+ @JsonCreator
+ public SubtasksAllAccumulatorsInfo(
+ @JsonDeserialize(using = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_JOB_VERTEX_ID) JobVertexID jobVertexId,
+ @JsonProperty(FIELD_NAME_PARALLELISM) int parallelism,
+ @JsonProperty(FIELD_NAME_SUBTASKS) Collection<SubtaskAccumulatorsInfo> subtaskAccumulatorsInfos) {
+ this.jobVertexId = Preconditions.checkNotNull(jobVertexId);
+ this.parallelism = parallelism;
+ this.subtaskAccumulatorsInfos = Preconditions.checkNotNull(subtaskAccumulatorsInfos);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SubtasksAllAccumulatorsInfo that = (SubtasksAllAccumulatorsInfo) o;
+ return Objects.equals(jobVertexId, that.jobVertexId) &&
+ parallelism == that.parallelism &&
+ Objects.equals(subtaskAccumulatorsInfos, that.subtaskAccumulatorsInfos);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(jobVertexId, parallelism, subtaskAccumulatorsInfos);
+ }
+
+ // ---------------------------------------------------
+ // Static inner classes
+ // ---------------------------------------------------
+
+ /**
+ * Detailed information about subtask accumulators.
+ */
+ public static class SubtaskAccumulatorsInfo {
+ public static final String FIELD_NAME_SUBTASK_INDEX = "subtask";
+ public static final String FIELD_NAME_ATTEMPT_NUM = "attempt";
+ public static final String FIELD_NAME_HOST = "host";
+ public static final String FIELD_NAME_USER_ACCUMULATORS = "user-accumulators";
+
+
+ @JsonProperty(FIELD_NAME_SUBTASK_INDEX)
+ private final int subtaskIndex;
+
+ @JsonProperty(FIELD_NAME_ATTEMPT_NUM)
+ private final int attemptNum;
+
+ @JsonProperty(FIELD_NAME_HOST)
+ private final String host;
+
+ @JsonProperty(FIELD_NAME_USER_ACCUMULATORS)
+ private final Collection<UserAccumulator> userAccumulators;
+
+ @JsonCreator
+ public SubtaskAccumulatorsInfo(
+ @JsonProperty(FIELD_NAME_SUBTASK_INDEX) int subtaskIndex,
+ @JsonProperty(FIELD_NAME_ATTEMPT_NUM) int attemptNum,
+ @JsonProperty(FIELD_NAME_HOST) String host,
+ @JsonProperty(FIELD_NAME_USER_ACCUMULATORS) Collection<UserAccumulator> userAccumulators) {
+
+ this.subtaskIndex = subtaskIndex;
+ this.attemptNum = attemptNum;
+ this.host = Preconditions.checkNotNull(host);
+ this.userAccumulators = Preconditions.checkNotNull(userAccumulators);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SubtaskAccumulatorsInfo that = (SubtaskAccumulatorsInfo) o;
+ return subtaskIndex == that.subtaskIndex &&
+ attemptNum == that.attemptNum &&
+ Objects.equals(host, that.host) &&
+ Objects.equals(userAccumulators, that.userAccumulators);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(subtaskIndex, attemptNum, host, userAccumulators);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/512083a7/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 0ea7550..1a67d92 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -50,6 +50,7 @@ import org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler;
import org.apache.flink.runtime.rest.handler.job.SubtaskCurrentAttemptDetailsHandler;
import org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptAccumulatorsHandler;
import org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler;
+import org.apache.flink.runtime.rest.handler.job.SubtasksAllAccumulatorsHandler;
import org.apache.flink.runtime.rest.handler.job.SubtasksTimesHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticDetailsHandler;
@@ -94,6 +95,7 @@ import org.apache.flink.runtime.rest.messages.JobVertexBackPressureHeaders;
import org.apache.flink.runtime.rest.messages.JobVertexDetailsHeaders;
import org.apache.flink.runtime.rest.messages.JobVertexTaskManagersHeaders;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
+import org.apache.flink.runtime.rest.messages.SubtasksAllAccumulatorsHandlers;
import org.apache.flink.runtime.rest.messages.SubtasksTimesHeaders;
import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
import org.apache.flink.runtime.rest.messages.YarnCancelJobTerminationHeaders;
@@ -318,6 +320,15 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
executionGraphCache,
executor);
+ SubtasksAllAccumulatorsHandler subtasksAllAccumulatorsHandler = new SubtasksAllAccumulatorsHandler(
+ restAddressFuture,
+ leaderRetriever,
+ timeout,
+ responseHeaders,
+ SubtasksAllAccumulatorsHandlers.getInstance(),
+ executionGraphCache,
+ executor);
+
TaskManagersHandler taskManagersHandler = new TaskManagersHandler(
restAddressFuture,
leaderRetriever,
@@ -575,6 +586,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
handlers.add(Tuple2.of(TaskCheckpointStatisticsHeaders.getInstance(), taskCheckpointStatisticDetailsHandler));
handlers.add(Tuple2.of(JobExceptionsHeaders.getInstance(), jobExceptionsHandler));
handlers.add(Tuple2.of(JobVertexAccumulatorsHeaders.getInstance(), jobVertexAccumulatorsHandler));
+ handlers.add(Tuple2.of(JobVertexAccumulatorsHeaders.getInstance(), subtasksAllAccumulatorsHandler));
handlers.add(Tuple2.of(JobDetailsHeaders.getInstance(), jobDetailsHandler));
handlers.add(Tuple2.of(JobAccumulatorsHeaders.getInstance(), jobAccumulatorsHandler));
handlers.add(Tuple2.of(TaskManagersHeaders.getInstance(), taskManagersHandler));
http://git-wip-us.apache.org/repos/asf/flink/blob/512083a7/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtasksAllAccumulatorsInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtasksAllAccumulatorsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtasksAllAccumulatorsInfoTest.java
new file mode 100644
index 0000000..2a71239
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtasksAllAccumulatorsInfoTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests (un)marshalling of the {@link SubtasksAllAccumulatorsInfo}.
+ */
+public class SubtasksAllAccumulatorsInfoTest extends RestResponseMarshallingTestBase<SubtasksAllAccumulatorsInfo> {
+ @Override
+ protected Class<SubtasksAllAccumulatorsInfo> getTestResponseClass() {
+ return SubtasksAllAccumulatorsInfo.class;
+ }
+
+ @Override
+ protected SubtasksAllAccumulatorsInfo getTestResponseInstance() throws Exception {
+ List<SubtasksAllAccumulatorsInfo.SubtaskAccumulatorsInfo> subtaskAccumulatorsInfos = new ArrayList<>(3);
+
+ List<UserAccumulator> userAccumulators = new ArrayList<>(2);
+ userAccumulators.add(new UserAccumulator("test name1", "test type1", "test value1"));
+ userAccumulators.add(new UserAccumulator("test name2", "test type2", "test value2"));
+
+ for (int i = 0; i < 3; ++i) {
+ subtaskAccumulatorsInfos.add(
+ new SubtasksAllAccumulatorsInfo.SubtaskAccumulatorsInfo(
+ i,
+ i,
+ "host-" + String.valueOf(i),
+ userAccumulators
+ ));
+
+ }
+ return new SubtasksAllAccumulatorsInfo(new JobVertexID(),
+ 4,
+ subtaskAccumulatorsInfos);
+ }
+}
[05/10] flink git commit: [FLINK-9225][core] Minor code comments fix
in RuntimeContext
Posted by ch...@apache.org.
[FLINK-9225][core] Minor code comments fix in RuntimeContext
This closes #5883.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8b98f79c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8b98f79c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8b98f79c
Branch: refs/heads/master
Commit: 8b98f79c90c26291755dbccb5b7065a9fba768a2
Parents: f633a80
Author: binlijin <bi...@gmail.com>
Authored: Fri Apr 20 15:02:59 2018 +0800
Committer: zentol <ch...@apache.org>
Committed: Tue Apr 24 13:30:08 2018 +0200
----------------------------------------------------------------------
.../java/org/apache/flink/api/common/functions/RuntimeContext.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8b98f79c/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index 3429143..e221091 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -249,7 +249,7 @@ public interface RuntimeContext {
*
* keyedStream.map(new RichMapFunction<MyType, Tuple2<MyType, Long>>() {
*
- * private ValueState<Long> count;
+ * private ValueState<Long> state;
*
* public void open(Configuration cfg) {
* state = getRuntimeContext().getState(
[03/10] flink git commit: [hotfix][tests] Properly close writer in
NetworkStackThroughputITCase
Posted by ch...@apache.org.
[hotfix][tests] Properly close writer in NetworkStackThroughputITCase
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/598cb668
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/598cb668
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/598cb668
Branch: refs/heads/master
Commit: 598cb6689cbfc54ddf0bce69eaec18a8d29c20e0
Parents: b28e163
Author: zentol <ch...@apache.org>
Authored: Thu Apr 19 10:27:56 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Apr 24 13:30:06 2018 +0200
----------------------------------------------------------------------
.../apache/flink/test/runtime/NetworkStackThroughputITCase.java | 2 ++
1 file changed, 2 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/598cb668/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
index 3b93ca2..1b46f46 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
@@ -104,6 +104,7 @@ public class NetworkStackThroughputITCase extends TestLogger {
}
}
finally {
+ writer.clearBuffers();
writer.flushAll();
}
}
@@ -137,6 +138,7 @@ public class NetworkStackThroughputITCase extends TestLogger {
}
finally {
reader.clearBuffers();
+ writer.clearBuffers();
writer.flushAll();
}
}
[09/10] flink git commit: [FLINK-9033][config] Replace usages of
deprecated TASK_MANAGER_NUM_TASK_SLOTS
Posted by ch...@apache.org.
[FLINK-9033][config] Replace usages of deprecated TASK_MANAGER_NUM_TASK_SLOTS
This closes #5731.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/73e9f901
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/73e9f901
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/73e9f901
Branch: refs/heads/master
Commit: 73e9f9013391a4e5be18f1cfc1f9462a78e95ca7
Parents: 1a9675d
Author: zhouhai02 <zh...@meituan.com>
Authored: Wed Mar 21 00:01:54 2018 +0800
Committer: zentol <ch...@apache.org>
Committed: Tue Apr 24 13:45:17 2018 +0200
----------------------------------------------------------------------
.../main/java/org/apache/flink/client/LocalExecutor.java | 7 ++++---
.../flink/client/deployment/ClusterSpecification.java | 3 +--
.../kinesis/manualtests/ManualExactlyOnceTest.java | 2 +-
.../ManualExactlyOnceWithStreamReshardingTest.java | 2 +-
.../org/apache/flink/storm/api/FlinkLocalCluster.java | 3 +--
.../flink/runtime/clusterframework/BootstrapTools.java | 2 +-
.../runtime/minicluster/MiniClusterConfiguration.java | 3 ++-
.../runtime/taskexecutor/TaskManagerConfiguration.java | 2 +-
.../taskexecutor/TaskManagerServicesConfiguration.java | 4 ++--
.../flink/runtime/checkpoint/CoordinatorShutdownTest.java | 5 +++--
.../partition/PartialConsumePipelinedResultTest.java | 2 +-
.../flink/runtime/jobmanager/JobManagerCleanupITCase.java | 3 ++-
.../runtime/jobmanager/JobManagerHARecoveryTest.java | 4 ++--
.../apache/flink/runtime/jobmanager/JobManagerTest.java | 5 ++---
.../leaderelection/LeaderChangeJobRecoveryTest.java | 3 ++-
.../leaderelection/LeaderChangeStateCleanupTest.java | 3 ++-
.../backpressure/BackPressureStatsTrackerImplITCase.java | 4 ++--
.../backpressure/StackTraceSampleCoordinatorITCase.java | 4 ++--
.../org/apache/flink/runtime/akka/AkkaSslITCase.scala | 10 +++++-----
.../apache/flink/runtime/jobmanager/RecoveryITCase.scala | 4 ++--
.../apache/flink/runtime/testingUtils/TestingUtils.scala | 2 +-
.../org/apache/flink/api/scala/ScalaShellITCase.scala | 4 ++--
.../org/apache/flink/test/util/MiniClusterResource.java | 2 +-
.../java/org/apache/flink/test/util/TestBaseUtils.java | 2 +-
.../checkpointing/utils/SavepointMigrationTestBase.java | 3 ++-
.../flink/test/operators/ExecutionEnvironmentITCase.java | 4 ++--
.../flink/test/operators/RemoteEnvironmentITCase.java | 4 ++--
.../AbstractTaskManagerProcessFailureRecoveryTest.java | 2 +-
.../recovery/JobManagerHACheckpointRecoveryITCase.java | 3 ++-
.../JobManagerHAProcessFailureBatchRecoveryITCase.java | 3 +--
.../test/recovery/TaskManagerFailureRecoveryITCase.java | 2 +-
.../leaderelection/ZooKeeperLeaderElectionITCase.java | 3 ++-
.../runtime/minicluster/LocalFlinkMiniClusterITCase.java | 3 ++-
.../scala/runtime/jobmanager/JobManagerFailsITCase.scala | 2 +-
.../runtime/taskmanager/TaskManagerFailsITCase.scala | 5 ++---
.../org/apache/flink/yarn/cli/FlinkYarnSessionCli.java | 3 +--
36 files changed, 63 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index 01c281f..f837c4f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.dag.DataSinkNode;
@@ -139,7 +140,7 @@ public class LocalExecutor extends PlanExecutor {
.setRpcServiceSharing(MiniClusterConfiguration.RpcServiceSharing.SHARED)
.setNumSlotsPerTaskManager(
configuration.getInteger(
- ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1))
+ TaskManagerOptions.NUM_TASK_SLOTS, 1))
.build();
final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration);
@@ -220,7 +221,7 @@ public class LocalExecutor extends PlanExecutor {
try {
// TODO: Set job's default parallelism to max number of slots
- final int slotsPerTaskManager = jobExecutorServiceConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, taskManagerNumSlots);
+ final int slotsPerTaskManager = jobExecutorServiceConfiguration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, taskManagerNumSlots);
final int numTaskManagers = jobExecutorServiceConfiguration.getInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers);
@@ -265,7 +266,7 @@ public class LocalExecutor extends PlanExecutor {
private Configuration createConfiguration() {
Configuration newConfiguration = new Configuration();
- newConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, getTaskManagerNumSlots());
+ newConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, getTaskManagerNumSlots());
newConfiguration.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, isDefaultOverwriteFiles());
newConfiguration.addAll(baseConfiguration);
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
index 8650cab..cf2ae4c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
@@ -18,7 +18,6 @@
package org.apache.flink.client.deployment;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
@@ -66,7 +65,7 @@ public final class ClusterSpecification {
}
public static ClusterSpecification fromConfiguration(Configuration configuration) {
- int slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+ int slots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
int jobManagerMemoryMb = configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);
int taskManagerMemoryMb = configuration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
index 963002f..40225fb 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
@@ -78,7 +78,7 @@ public class ManualExactlyOnceTest {
final Configuration flinkConfig = new Configuration();
flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
- flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
+ flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 8);
flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16);
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
index 93b9caf..34dcdc0 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
@@ -90,7 +90,7 @@ public class ManualExactlyOnceWithStreamReshardingTest {
final Configuration flinkConfig = new Configuration();
flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
- flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
+ flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 8);
flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16);
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
index bff8c80..6b0b503 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
@@ -18,7 +18,6 @@
package org.apache.flink.storm.api;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -90,7 +89,7 @@ public class FlinkLocalCluster {
configuration.addAll(jobGraph.getJobConfiguration());
configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
- configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
+ configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
this.flink = new LocalFlinkMiniCluster(configuration, true);
this.flink.start();
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index 102274d1..7a8403a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -247,7 +247,7 @@ public class BootstrapTools {
cfg.setString(TaskManagerOptions.REGISTRATION_TIMEOUT, registrationTimeout.toString());
if (numSlots != -1){
- cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
+ cfg.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots);
}
return cfg;
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
index 44a567b..0a0c692 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.util.Preconditions;
@@ -167,7 +168,7 @@ public class MiniClusterConfiguration {
public MiniClusterConfiguration build() {
final Configuration modifiedConfiguration = new Configuration(configuration);
- modifiedConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
+ modifiedConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlotsPerTaskManager);
modifiedConfiguration.setString(
RestOptions.ADDRESS,
modifiedConfiguration.getString(RestOptions.ADDRESS, "localhost"));
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
index 1bf42ee..e8a7ae8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -162,7 +162,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
// --------------------------------------------------------------------------------------------
public static TaskManagerConfiguration fromConfiguration(Configuration configuration) {
- int numberSlots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+ int numberSlots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
if (numberSlots == -1) {
numberSlots = 1;
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index d029bc5..b80320c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -196,7 +196,7 @@ public class TaskManagerServicesConfiguration {
boolean localCommunication) throws Exception {
// we need this because many configs have been written with a "-1" entry
- int slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+ int slots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
if (slots == -1) {
slots = 1;
}
@@ -290,7 +290,7 @@ public class TaskManagerServicesConfiguration {
checkConfigParameter(dataport >= 0, dataport, TaskManagerOptions.DATA_PORT.key(),
"Leave config parameter empty or use 0 to let the system choose a port automatically.");
- checkConfigParameter(slots >= 1, slots, ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
+ checkConfigParameter(slots >= 1, slots, TaskManagerOptions.NUM_TASK_SLOTS.key(),
"Number of task slots must be at least one.");
final int pageSize = configuration.getInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE);
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index 1d44444..8a6a9d8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.execution.Environment;
@@ -59,7 +60,7 @@ public class CoordinatorShutdownTest extends TestLogger {
try {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
cluster = new LocalFlinkMiniCluster(config, true);
cluster.start();
@@ -128,7 +129,7 @@ public class CoordinatorShutdownTest extends TestLogger {
try {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
cluster = new LocalFlinkMiniCluster(config, true);
cluster.start();
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index ced1a33..8080ca4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -55,7 +55,7 @@ public class PartialConsumePipelinedResultTest extends TestLogger {
public static void setUp() throws Exception {
final Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUMBER_OF_TMS);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUMBER_OF_SLOTS_PER_TM);
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUMBER_OF_SLOTS_PER_TM);
config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, NUMBER_OF_NETWORK_BUFFERS);
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
index 8806dec..33a4a28 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
@@ -146,7 +147,7 @@ public class JobManagerCleanupITCase extends TestLogger {
try {
Configuration config = new Configuration();
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
config.setString(AkkaOptions.ASK_TIMEOUT, DEFAULT_AKKA_ASK_TIMEOUT());
config.setString(BlobServerOptions.STORAGE_DIRECTORY, blobBaseDir.getAbsolutePath());
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index bb2dbf7..d991983 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -20,9 +20,9 @@ package org.apache.flink.runtime.jobmanager;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.testutils.OneShotLatch;
@@ -174,7 +174,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
flinkConfiguration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
flinkConfiguration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().toString());
- flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slots);
+ flinkConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, slots);
flinkConfiguration.setLong(BlobServerOptions.CLEANUP_INTERVAL, 3_600L);
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 417294c..79e6d20 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.jobmanager;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
@@ -641,7 +640,7 @@ public class JobManagerTest extends TestLogger {
Configuration tmConfig = new Configuration();
tmConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
- tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
+ tmConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 8);
ActorRef taskManager = TaskManager.startTaskManagerComponentsAndActor(
tmConfig,
@@ -1300,7 +1299,7 @@ public class JobManagerTest extends TestLogger {
archiver = new AkkaActorGateway(master._2(), leaderId);
Configuration tmConfig = new Configuration();
- tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
+ tmConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 4);
ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
tmConfig,
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
index 942fcf3..2ebaeba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.leaderelection;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
@@ -72,7 +73,7 @@ public class LeaderChangeJobRecoveryTest extends TestLogger {
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1);
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
- configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM);
+ configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlotsPerTM);
configuration.setString(ConfigConstants.RESTART_STRATEGY, "fixeddelay");
configuration.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 9999);
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
index bbcbbf0..6880d9b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.leaderelection;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingManualHighAvailabilityServices;
import org.apache.flink.runtime.instance.ActorGateway;
@@ -78,7 +79,7 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
- configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM);
+ configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlotsPerTM);
highAvailabilityServices = new TestingManualHighAvailabilityServices();
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
index 994d02e..0e837a5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
@@ -19,8 +19,8 @@
package org.apache.flink.runtime.rest.handler.legacy.backpressure;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobClient;
@@ -138,7 +138,7 @@ public class BackPressureStatsTrackerImplITCase extends TestLogger {
config,
highAvailabilityServices);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism);
taskManager = TestingUtils.createTaskManager(
testActorSystem,
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java
index 8fa302a..ccf5f60 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java
@@ -19,8 +19,8 @@
package org.apache.flink.runtime.rest.handler.legacy.backpressure;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobClient;
@@ -109,7 +109,7 @@ public class StackTraceSampleCoordinatorITCase extends TestLogger {
config,
highAvailabilityServices);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism);
taskManager = TestingUtils.createTaskManager(
testActorSystem,
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
index 72596cd..ebdae31 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
@@ -51,7 +51,7 @@ class AkkaSslITCase(_system: ActorSystem)
val config = new Configuration()
config.setString(JobManagerOptions.ADDRESS, "127.0.0.1")
config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "127.0.0.1")
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
config.setBoolean(SecurityOptions.SSL_ENABLED, true)
@@ -78,7 +78,7 @@ class AkkaSslITCase(_system: ActorSystem)
val config = new Configuration()
config.setString(JobManagerOptions.ADDRESS, "127.0.0.1")
config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "127.0.0.1")
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
config.setBoolean(SecurityOptions.SSL_ENABLED, true)
@@ -101,7 +101,7 @@ class AkkaSslITCase(_system: ActorSystem)
"start with akka ssl disabled" in {
val config = new Configuration()
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
config.setBoolean(SecurityOptions.SSL_ENABLED, false)
@@ -117,7 +117,7 @@ class AkkaSslITCase(_system: ActorSystem)
an[Exception] should be thrownBy {
val config = new Configuration()
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
config.setString(AkkaOptions.ASK_TIMEOUT, "2 s")
@@ -139,7 +139,7 @@ class AkkaSslITCase(_system: ActorSystem)
an[Exception] should be thrownBy {
val config = new Configuration()
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
config.setString(AkkaOptions.ASK_TIMEOUT, "2 s")
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
index 71d2ee9..a2dffc8 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
@@ -22,7 +22,7 @@ import akka.actor.{ActorSystem, PoisonPill}
import akka.testkit.{ImplicitSender, TestKit}
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.restartstrategy.RestartStrategies
-import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration}
+import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration, TaskManagerOptions}
import org.apache.flink.runtime.akka.ListeningBehaviour
import org.apache.flink.runtime.io.network.partition.ResultPartitionType
import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobStatus, JobVertex}
@@ -59,7 +59,7 @@ class RecoveryITCase(_system: ActorSystem)
heartbeatTimeout: String)
: TestingCluster = {
val config = new Configuration()
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers)
config.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, heartbeatTimeout)
new TestingCluster(config)
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 2d8d02d..b89d2a6 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -88,7 +88,7 @@ object TestingUtils {
def startTestingCluster(numSlots: Int, numTMs: Int = 1,
timeout: String = DEFAULT_AKKA_ASK_TIMEOUT): TestingCluster = {
val config = new Configuration()
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs)
config.setString(AkkaOptions.ASK_TIMEOUT, timeout)
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
index cb6231a..2e07fb9 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
@@ -22,8 +22,8 @@ import java.io._
import akka.actor.ActorRef
import akka.pattern.Patterns
+import org.apache.flink.configuration.{Configuration, CoreOptions, TaskManagerOptions}
import org.apache.flink.runtime.minicluster.StandaloneMiniCluster
-import org.apache.flink.configuration.{ConfigConstants, Configuration, CoreOptions, GlobalConfiguration}
import org.apache.flink.runtime.clusterframework.BootstrapTools
import org.apache.flink.test.util.TestBaseUtils
import org.apache.flink.util.TestLogger
@@ -321,7 +321,7 @@ object ScalaShellITCase {
@BeforeClass
def beforeAll(): Unit = {
- configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism)
+ configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism)
configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE)
cluster = Option(new StandaloneMiniCluster(configuration))
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
index 8c21b37..844de01 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -198,7 +198,7 @@ public class MiniClusterResource extends ExternalResource {
private void startLegacyMiniCluster() throws Exception {
final Configuration configuration = new Configuration(miniClusterResourceConfiguration.getConfiguration());
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, miniClusterResourceConfiguration.getNumberTaskManagers());
- configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, miniClusterResourceConfiguration.getNumberSlotsPerTaskManager());
+ configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, miniClusterResourceConfiguration.getNumberSlotsPerTaskManager());
final LocalFlinkMiniCluster flinkMiniCluster = TestBaseUtils.startCluster(
configuration,
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index dd255fd..7e9b12e 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -127,7 +127,7 @@ public class TestBaseUtils extends TestLogger {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, taskManagerNumSlots);
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, taskManagerNumSlots);
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, startWebserver);
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
index 8f2aaa1..cfa155b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -25,6 +25,7 @@ import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -99,7 +100,7 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils {
final Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, DEFAULT_PARALLELISM);
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, DEFAULT_PARALLELISM);
UUID id = UUID.randomUUID();
final File checkpointDir = TEMP_FOLDER.newFolder("checkpoints_" + id).getAbsoluteFile();
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/java/org/apache/flink/test/operators/ExecutionEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/ExecutionEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/ExecutionEnvironmentITCase.java
index f7f993b..65a21d1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/ExecutionEnvironmentITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/ExecutionEnvironmentITCase.java
@@ -23,8 +23,8 @@ import org.apache.flink.api.common.functions.RichMapPartitionFunction;
import org.apache.flink.api.common.io.GenericInputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
@@ -50,7 +50,7 @@ public class ExecutionEnvironmentITCase extends TestLogger {
@Test
public void testLocalEnvironmentWithConfig() throws Exception {
Configuration conf = new Configuration();
- conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
+ conf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, PARALLELISM);
final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);
env.setParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
index a3a551c..c2d6341 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
@@ -26,9 +26,9 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.runtime.minicluster.MiniCluster;
@@ -96,7 +96,7 @@ public class RemoteEnvironmentITCase extends TestLogger {
resource = miniCluster;
} else {
- configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
+ configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, TM_SLOTS);
final StandaloneMiniCluster standaloneMiniCluster = new StandaloneMiniCluster(configuration);
hostname = standaloneMiniCluster.getHostname();
port = standaloneMiniCluster.getPort();
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 29516dc..6764f6f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -406,7 +406,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
cfg.setInteger(JobManagerOptions.PORT, jobManagerPort);
cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
cfg.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
- cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+ cfg.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
cfg.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
TaskManager.selectNetworkInterfaceAndRunTaskManager(cfg,
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index 16ea6d5..ce750d3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
@@ -365,7 +366,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
config.setInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, retainedCheckpoints);
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots);
config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, temporaryFolder.newFolder().toString());
String tmpFolderString = temporaryFolder.newFolder().toString();
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
index d217a2a..50404a4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
@@ -270,7 +269,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
// Task manager configuration
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
config,
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
index 8371230..755b3d3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
@@ -77,7 +77,7 @@ public class TaskManagerFailureRecoveryITCase extends TestLogger {
try {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism);
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
config.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "500 ms");
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
index f348b8a..399fc10 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.instance.ActorGateway;
@@ -157,7 +158,7 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
- configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM);
+ configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlotsPerTM);
// we "effectively" disable the automatic RecoverAllJobs message and sent it manually to make
// sure that all TMs have registered to the JM prior to issuing the RecoverAllJobs message
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
index 770b88c..b3d92df 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.test.runtime.minicluster;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.instance.ActorGateway;
@@ -76,7 +77,7 @@ public class LocalFlinkMiniClusterITCase extends TestLogger {
try {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots);
miniCluster = new LocalFlinkMiniCluster(config, true);
miniCluster.start();
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
index 5fe7b1d..5c9b1fb 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
@@ -133,7 +133,7 @@ class JobManagerFailsITCase(_system: ActorSystem)
def startDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): TestingCluster = {
val config = new Configuration()
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskmanagers)
config.setInteger(JobManagerOptions.PORT, 0)
config.setString(TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF, "50 ms")
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
index a065e5b..69ad2d7 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
@@ -20,8 +20,7 @@ package org.apache.flink.api.scala.runtime.taskmanager
import akka.actor.{ActorSystem, Kill, PoisonPill}
import akka.testkit.{ImplicitSender, TestKit}
-import org.apache.flink.configuration.ConfigConstants
-import org.apache.flink.configuration.Configuration
+import org.apache.flink.configuration.{ConfigConstants, Configuration, TaskManagerOptions}
import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
import org.apache.flink.runtime.client.JobExecutionException
import org.apache.flink.runtime.io.network.partition.ResultPartitionType
@@ -243,7 +242,7 @@ class TaskManagerFailsITCase(_system: ActorSystem)
def createDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): TestingCluster = {
val config = new Configuration()
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskmanagers)
new TestingCluster(config, singleActorSystem = false)
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index fe04662..7596d68 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -23,7 +23,6 @@ import org.apache.flink.client.cli.CliArgsException;
import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
@@ -500,7 +499,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
}
if (commandLine.hasOption(slots.getOpt())) {
- effectiveConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, Integer.parseInt(commandLine.getOptionValue(slots.getOpt())));
+ effectiveConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, Integer.parseInt(commandLine.getOptionValue(slots.getOpt())));
}
if (isYarnPropertiesFileMode(commandLine)) {
[02/10] flink git commit: [FLINK-8967][tests] Port
NetworkStackThroughputITCase to flip6
Posted by ch...@apache.org.
[FLINK-8967][tests] Port NetworkStackThroughputITCase to flip6
This closes #5870.
This closes #5247.
This closes #5875.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b28e1632
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b28e1632
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b28e1632
Branch: refs/heads/master
Commit: b28e1632552cae9380269125ea4a486b5830f6d8
Parents: 26eb8bb
Author: zentol <ch...@apache.org>
Authored: Tue Apr 17 15:24:22 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Apr 24 13:29:34 2018 +0200
----------------------------------------------------------------------
.../runtime/NetworkStackThroughputITCase.java | 40 +++++++++++---------
1 file changed, 23 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b28e1632/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
index e6401c0..3b93ca2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
@@ -19,10 +19,11 @@
package org.apache.flink.test.runtime;
import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.reader.RecordReader;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
@@ -32,9 +33,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.util.TestLogger;
import org.junit.Ignore;
@@ -42,7 +41,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.Arrays;
import java.util.concurrent.TimeUnit;
/**
@@ -234,43 +232,51 @@ public class NetworkStackThroughputITCase extends TestLogger {
final int numTaskManagers = parallelism / numSlotsPerTaskManager;
- final LocalFlinkMiniCluster localFlinkMiniCluster = TestBaseUtils.startCluster(
- numTaskManagers,
- numSlotsPerTaskManager,
- false,
- false,
- true);
+ final MiniClusterResource cluster = new MiniClusterResource(
+ new MiniClusterResource.MiniClusterResourceConfiguration(
+ new Configuration(),
+ numTaskManagers,
+ numSlotsPerTaskManager
+ ),
+ true
+ );
+ cluster.before();
try {
- System.out.println(Arrays.toString(p));
+ System.out.println(String.format("Running test with parameters: dataVolumeGB=%s, useForwarder=%s, isSlowSender=%s, isSlowReceiver=%s, parallelism=%s, numSlotsPerTM=%s",
+ dataVolumeGb, useForwarder, isSlowSender, isSlowReceiver, parallelism, numSlotsPerTaskManager));
testProgram(
- localFlinkMiniCluster,
+ cluster,
dataVolumeGb,
useForwarder,
isSlowSender,
isSlowReceiver,
parallelism);
} finally {
- TestBaseUtils.stopCluster(localFlinkMiniCluster, FutureUtils.toFiniteDuration(TestingUtils.TIMEOUT()));
+ cluster.after();
}
}
}
private void testProgram(
- LocalFlinkMiniCluster localFlinkMiniCluster,
+ final MiniClusterResource cluster,
final int dataVolumeGb,
final boolean useForwarder,
final boolean isSlowSender,
final boolean isSlowReceiver,
final int parallelism) throws Exception {
- JobExecutionResult jer = localFlinkMiniCluster.submitJobAndWait(
+ ClusterClient<?> client = cluster.getClusterClient();
+ client.setDetached(false);
+ client.setPrintStatusDuringExecution(false);
+
+ JobExecutionResult jer = (JobExecutionResult) client.submitJob(
createJobGraph(
dataVolumeGb,
useForwarder,
isSlowSender,
isSlowReceiver,
parallelism),
- false);
+ getClass().getClassLoader());
long dataVolumeMbit = dataVolumeGb * 8192;
long runtimeSecs = jer.getNetRuntime(TimeUnit.SECONDS);