You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/06/22 15:45:39 UTC
[6/9] flink git commit: [hotfix] Introduce builder for
MiniClusterResourceConfiguration
[hotfix] Introduce builder for MiniClusterResourceConfiguration
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6de6ed48
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6de6ed48
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6de6ed48
Branch: refs/heads/master
Commit: 6de6ed48532a31a7ba90338ce7e5c2bc61d0ae11
Parents: bf7e101
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Jun 21 14:57:45 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jun 22 17:45:04 2018 +0200
----------------------------------------------------------------------
.../org/apache/flink/client/LocalExecutor.java | 3 +-
.../connectors/fs/RollingSinkITCase.java | 9 +-
.../connectors/fs/RollingSinkSecuredITCase.java | 11 +-
.../kafka/KafkaShortRetentionTestBase.java | 10 +-
.../connectors/kafka/KafkaTestBase.java | 10 +-
.../TopSpeedWindowingExampleITCase.java | 10 +-
.../hdfstests/DistributedCacheDfsTest.java | 9 +-
.../apache/flink/ml/util/FlinkTestBase.scala | 10 +-
.../gateway/local/LocalExecutorITCase.java | 10 +-
.../jobmanager/JMXJobManagerMetricTest.java | 13 +-
.../HAQueryableStateFsBackendITCase.java | 10 +-
.../HAQueryableStateRocksDBBackendITCase.java | 10 +-
.../NonHAQueryableStateFsBackendITCase.java | 10 +-
...NonHAQueryableStateRocksDBBackendITCase.java | 10 +-
.../runtime/webmonitor/WebFrontendITCase.java | 16 ++-
.../webmonitor/handlers/JarRunHandlerTest.java | 14 +-
.../webmonitor/history/HistoryServerTest.java | 14 +-
.../flink/runtime/minicluster/MiniCluster.java | 6 +-
.../minicluster/MiniClusterConfiguration.java | 11 +-
.../runtime/minicluster/RpcServiceSharing.java | 28 ++++
.../runtime/minicluster/MiniClusterITCase.java | 4 +-
.../flink/api/scala/ScalaShellITCase.scala | 6 +-
.../scala/ScalaShellLocalStartupITCase.scala | 6 +-
.../flink/test/util/AbstractTestBase.java | 9 +-
.../flink/test/util/MiniClusterResource.java | 103 ++------------
.../util/MiniClusterResourceConfiguration.java | 138 +++++++++++++++++++
.../apache/flink/test/util/TestBaseUtils.java | 25 ++++
.../accumulators/AccumulatorErrorITCase.java | 9 +-
.../accumulators/AccumulatorLiveITCase.java | 10 +-
.../test/cancelling/CancelingTestBase.java | 10 +-
...tractEventTimeWindowCheckpointingITCase.java | 10 +-
.../EventTimeAllWindowCheckpointingITCase.java | 10 +-
.../KeyedStateCheckpointingITCase.java | 10 +-
.../test/checkpointing/RescalingITCase.java | 10 +-
.../ResumeCheckpointManuallyITCase.java | 10 +-
.../test/checkpointing/SavepointITCase.java | 51 +++----
.../StreamFaultToleranceTestBase.java | 10 +-
.../WindowCheckpointingITCase.java | 10 +-
.../ZooKeeperHighAvailabilityITCase.java | 10 +-
.../utils/SavepointMigrationTestBase.java | 10 +-
.../test/example/client/JobRetrievalITCase.java | 13 +-
.../failing/JobSubmissionFailsITCase.java | 10 +-
.../manual/StreamingScalabilityAndLatency.java | 10 +-
.../flink/test/misc/AutoParallelismITCase.java | 15 +-
.../test/misc/CustomSerializationITCase.java | 10 +-
.../test/misc/MiscellaneousIssuesITCase.java | 9 +-
...SuccessAfterNetworkBuffersFailureITCase.java | 10 +-
.../operators/CustomDistributionITCase.java | 10 +-
...SimpleRecoveryFailureRateStrategyITBase.java | 9 +-
...RecoveryFixedDelayRestartStrategyITBase.java | 10 +-
.../flink/test/runtime/IPv6HostnamesITCase.java | 10 +-
.../flink/test/runtime/NettyEpollITCase.java | 11 +-
.../runtime/NetworkStackThroughputITCase.java | 11 +-
.../AbstractOperatorRestoreTestBase.java | 11 +-
.../test/streaming/runtime/TimestampITCase.java | 10 +-
.../org/apache/flink/yarn/YarnTestBase.java | 8 +-
56 files changed, 509 insertions(+), 343 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index f837c4f..4e4993a 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.minicluster.JobExecutorService;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import java.util.List;
@@ -137,7 +138,7 @@ public class LocalExecutor extends PlanExecutor {
configuration.getInteger(
ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER))
- .setRpcServiceSharing(MiniClusterConfiguration.RpcServiceSharing.SHARED)
+ .setRpcServiceSharing(RpcServiceSharing.SHARED)
.setNumSlotsPerTaskManager(
configuration.getInteger(
TaskManagerOptions.NUM_TASK_SLOTS, 1))
http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
index 78b53d8..93f6d52 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
@@ -33,6 +33,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.util.Collector;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TestLogger;
@@ -118,10 +119,10 @@ public class RollingSinkITCase extends TestLogger {
+ "/";
miniClusterResource = new MiniClusterResource(
- new MiniClusterResource.MiniClusterResourceConfiguration(
- new org.apache.flink.configuration.Configuration(),
- 1,
- 4));
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(4)
+ .build());
miniClusterResource.before();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
index b9564ee..7296269 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.modules.HadoopModule;
import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.SecureTestEnvironment;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.test.util.TestingSecurityContext;
@@ -147,10 +148,12 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase {
Configuration configuration = startSecureFlinkClusterWithRecoveryModeEnabled();
- miniClusterResource = new MiniClusterResource(new MiniClusterResource.MiniClusterResourceConfiguration(
- configuration,
- 1,
- 4));
+ miniClusterResource = new MiniClusterResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(configuration)
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(4)
+ .build());
miniClusterResource.before();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index f488325..1e3e1fc 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -32,6 +32,7 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.util.InstantiationUtil;
import org.junit.AfterClass;
@@ -70,10 +71,11 @@ public class KafkaShortRetentionTestBase implements Serializable {
@ClassRule
public static MiniClusterResource flink = new MiniClusterResource(
- new MiniClusterResource.MiniClusterResourceConfiguration(
- getConfiguration(),
- NUM_TMS,
- TM_SLOTS));
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(getConfiguration())
+ .setNumberTaskManagers(NUM_TMS)
+ .setNumberSlotsPerTaskManager(TM_SLOTS)
+ .build());
@ClassRule
public static TemporaryFolder tempFolder = new TemporaryFolder();
http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index 697e075..806c90c 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.TestLogger;
@@ -81,10 +82,11 @@ public abstract class KafkaTestBase extends TestLogger {
@ClassRule
public static MiniClusterResource flink = new MiniClusterResource(
- new MiniClusterResource.MiniClusterResourceConfiguration(
- getFlinkConfiguration(),
- NUM_TMS,
- TM_SLOTS),
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(getFlinkConfiguration())
+ .setNumberTaskManagers(NUM_TMS)
+ .setNumberSlotsPerTaskManager(TM_SLOTS)
+ .build(),
true);
protected static FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java
index 320dd5f..ab8d6b9 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java
@@ -17,10 +17,10 @@
package org.apache.flink.streaming.test.examples.windowing;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.examples.windowing.TopSpeedWindowing;
import org.apache.flink.streaming.examples.windowing.util.TopSpeedWindowingExampleData;
import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.TestLogger;
@@ -42,10 +42,10 @@ public class TopSpeedWindowingExampleITCase extends TestLogger {
@ClassRule
public static MiniClusterResource miniClusterResource = new MiniClusterResource(
- new MiniClusterResource.MiniClusterResourceConfiguration(
- new Configuration(),
- 1,
- 1));
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(1)
+ .build());
@Test
public void testTopSpeedWindowingExampleITCase() throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/DistributedCacheDfsTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/DistributedCacheDfsTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/DistributedCacheDfsTest.java
index 5d6a30f..ed4ec94 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/DistributedCacheDfsTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/DistributedCacheDfsTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.util.NetUtils;
import org.apache.hadoop.conf.Configuration;
@@ -69,10 +70,10 @@ public class DistributedCacheDfsTest {
@ClassRule
public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
- new MiniClusterResource.MiniClusterResourceConfiguration(
- new org.apache.flink.configuration.Configuration(),
- 1,
- 1));
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(1)
+ .build());
private static MiniDFSCluster hdfsCluster;
private static Configuration conf = new Configuration();
http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
index 2165152..194b657 100644
--- a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
+++ b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
@@ -18,9 +18,7 @@
package org.apache.flink.ml.util
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.test.util.MiniClusterResource
-import org.apache.flink.test.util.MiniClusterResource.MiniClusterResourceConfiguration
+import org.apache.flink.test.util.{MiniClusterResource, MiniClusterResourceConfiguration}
import org.scalatest.{BeforeAndAfter, Suite}
/** Mixin to start and stop a LocalFlinkMiniCluster automatically for Scala based tests.
@@ -57,8 +55,10 @@ trait FlinkTestBase extends BeforeAndAfter {
before {
val cl = new MiniClusterResource(
- new MiniClusterResourceConfiguration(new Configuration(), 1, parallelism)
- )
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(parallelism)
+ .build())
cl.before()
http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index 64b526d..ad2d509 100644
--- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -36,6 +36,7 @@ import org.apache.flink.table.client.gateway.SessionContext;
import org.apache.flink.table.client.gateway.TypedResult;
import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil;
import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.TestLogger;
@@ -71,10 +72,11 @@ public class LocalExecutorITCase extends TestLogger {
@ClassRule
public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
- new MiniClusterResource.MiniClusterResourceConfiguration(
- getConfig(),
- NUM_TMS,
- NUM_SLOTS_PER_TM),
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(getConfig())
+ .setNumberTaskManagers(NUM_TMS)
+ .setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
+ .build(),
true);
private static ClusterClient<?> clusterClient;
http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index c00b5d3..90ba6d4 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -38,6 +38,8 @@ import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguratio
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.ClassRule;
@@ -57,14 +59,15 @@ import static org.junit.Assert.assertEquals;
/**
* Tests to verify JMX reporter functionality on the JobManager.
*/
-public class JMXJobManagerMetricTest {
+public class JMXJobManagerMetricTest extends TestLogger {
@ClassRule
public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
- new MiniClusterResource.MiniClusterResourceConfiguration(
- getConfiguration(),
- 1,
- 1),
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(getConfiguration())
+ .setNumberSlotsPerTaskManager(1)
+ .setNumberTaskManagers(1)
+ .build(),
true);
private static Configuration getConfiguration() {
http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
index a47045f..f977a8e 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.queryablestate.client.QueryableStateClient;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.curator.test.TestingServer;
import org.junit.AfterClass;
@@ -68,10 +69,11 @@ public class HAQueryableStateFsBackendITCase extends AbstractQueryableStateTestB
// we have to manage this manually because we have to create the ZooKeeper server
// ahead of this
miniClusterResource = new MiniClusterResource(
- new MiniClusterResource.MiniClusterResourceConfiguration(
- getConfig(),
- NUM_TMS,
- NUM_SLOTS_PER_TM),
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(getConfig())
+ .setNumberTaskManagers(NUM_TMS)
+ .setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
+ .build(),
true);
miniClusterResource.before();
http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
index b1092c1..25f56d7 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.queryablestate.client.QueryableStateClient;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.curator.test.TestingServer;
import org.junit.AfterClass;
@@ -68,10 +69,11 @@ public class HAQueryableStateRocksDBBackendITCase extends AbstractQueryableState
// we have to manage this manually because we have to create the ZooKeeper server
// ahead of this
miniClusterResource = new MiniClusterResource(
- new MiniClusterResource.MiniClusterResourceConfiguration(
- getConfig(),
- NUM_TMS,
- NUM_SLOTS_PER_TM),
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(getConfig())
+ .setNumberTaskManagers(NUM_TMS)
+ .setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
+ .build(),
true);
miniClusterResource.before();
http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
index eb300c1..0d61e0d 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.queryablestate.client.QueryableStateClient;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -52,10 +53,11 @@ public class NonHAQueryableStateFsBackendITCase extends AbstractQueryableStateTe
@ClassRule
public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
- new MiniClusterResource.MiniClusterResourceConfiguration(
- getConfig(),
- NUM_TMS,
- NUM_SLOTS_PER_TM),
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(getConfig())
+ .setNumberTaskManagers(NUM_TMS)
+ .setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
+ .build(),
true);
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
index 3d6a3e3..002ff9f 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.queryablestate.client.QueryableStateClient;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -52,10 +53,11 @@ public class NonHAQueryableStateRocksDBBackendITCase extends AbstractQueryableSt
@ClassRule
public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
- new MiniClusterResource.MiniClusterResourceConfiguration(
- getConfig(),
- NUM_TMS,
- NUM_SLOTS_PER_TM),
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(getConfig())
+ .setNumberTaskManagers(NUM_TMS)
+ .setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
+ .build(),
true);
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
index 994966e..776a267 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.TestLogger;
@@ -80,10 +81,11 @@ public class WebFrontendITCase extends TestLogger {
@ClassRule
public static final MiniClusterResource CLUSTER = new MiniClusterResource(
- new MiniClusterResource.MiniClusterResourceConfiguration(
- CLUSTER_CONFIGURATION,
- NUM_TASK_MANAGERS,
- NUM_SLOTS),
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(CLUSTER_CONFIGURATION)
+ .setNumberTaskManagers(NUM_TASK_MANAGERS)
+ .setNumberSlotsPerTaskManager(NUM_SLOTS)
+ .build(),
true
);
@@ -153,7 +155,7 @@ public class WebFrontendITCase extends TestLogger {
if (notFoundJobConnection.getResponseCode() >= 400) {
// we don't set the content-encoding header
Assert.assertNull(notFoundJobConnection.getContentEncoding());
- if (Objects.equals(MiniClusterResource.NEW_CODEBASE, System.getProperty(MiniClusterResource.CODEBASE_KEY))) {
+ if (Objects.equals(CLUSTER.getMiniClusterType(), TestBaseUtils.CodebaseType.NEW)) {
Assert.assertEquals("application/json; charset=UTF-8", notFoundJobConnection.getContentType());
} else {
Assert.assertEquals("text/plain; charset=UTF-8", notFoundJobConnection.getContentType());
@@ -281,7 +283,7 @@ public class WebFrontendITCase extends TestLogger {
final Deadline deadline = testTimeout.fromNow();
try (HttpTestClient client = new HttpTestClient("localhost", CLUSTER.getWebUIPort())) {
- if (Objects.equals(MiniClusterResource.NEW_CODEBASE, System.getProperty(MiniClusterResource.CODEBASE_KEY))) {
+ if (Objects.equals(CLUSTER.getMiniClusterType(), TestBaseUtils.CodebaseType.NEW)) {
// stop the job
client.sendPatchRequest("/jobs/" + jid + "/?mode=stop", deadline.timeLeft());
HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft());
@@ -356,7 +358,7 @@ public class WebFrontendITCase extends TestLogger {
HttpTestClient.SimpleHttpResponse response = client
.getNextResponse(deadline.timeLeft());
- if (Objects.equals(MiniClusterResource.NEW_CODEBASE, System.getProperty(MiniClusterResource.CODEBASE_KEY))) {
+ if (Objects.equals(CLUSTER.getMiniClusterType(), TestBaseUtils.CodebaseType.NEW)) {
assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
} else {
assertEquals(HttpResponseStatus.OK, response.getStatus());
http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
index aefe4f1..d26620b 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
@@ -28,6 +28,8 @@ import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.util.RestClientException;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.ExceptionUtils;
import org.junit.ClassRule;
@@ -62,12 +64,12 @@ public class JarRunHandlerTest {
config.setString(WebOptions.UPLOAD_DIR, uploadDir.toString());
MiniClusterResource clusterResource = new MiniClusterResource(
- new MiniClusterResource.MiniClusterResourceConfiguration(
- config,
- 1,
- 1
- ),
- MiniClusterResource.MiniClusterType.NEW
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(config)
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(1)
+ .build(),
+ TestBaseUtils.CodebaseType.NEW
);
clusterResource.before();
http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index 9407af2..5d29fbb 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -28,6 +28,8 @@ import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
@@ -75,12 +77,12 @@ public class HistoryServerTest extends TestLogger {
clusterConfig.setString(JobManagerOptions.ARCHIVE_DIR, jmDirectory.toURI().toString());
cluster = new MiniClusterResource(
- new MiniClusterResource.MiniClusterResourceConfiguration(
- clusterConfig,
- 1,
- 1
- ),
- MiniClusterResource.MiniClusterType.NEW
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(clusterConfig)
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(1)
+ .build(),
+ TestBaseUtils.CodebaseType.NEW
);
cluster.before();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index aca4fdb..b89617b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -230,7 +230,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
final Configuration configuration = miniClusterConfiguration.getConfiguration();
final Time rpcTimeout = miniClusterConfiguration.getRpcTimeout();
final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers();
- final boolean useSingleRpcService = miniClusterConfiguration.getRpcServiceSharing() == MiniClusterConfiguration.RpcServiceSharing.SHARED;
+ final boolean useSingleRpcService = miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED;
try {
initializeIOFormatClasses(configuration);
@@ -916,7 +916,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
@Nonnull
private CompletionStage<Void> terminateRpcServices() {
final int numRpcServices;
- if (miniClusterConfiguration.getRpcServiceSharing() == MiniClusterConfiguration.RpcServiceSharing.SHARED) {
+ if (miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED) {
numRpcServices = 1;
} else {
numRpcServices = 1 + 2 + miniClusterConfiguration.getNumTaskManagers(); // common, JM, RM, TMs
@@ -927,7 +927,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
synchronized (lock) {
rpcTerminationFutures.add(commonRpcService.stopService());
- if (miniClusterConfiguration.getRpcServiceSharing() != MiniClusterConfiguration.RpcServiceSharing.SHARED) {
+ if (miniClusterConfiguration.getRpcServiceSharing() != RpcServiceSharing.SHARED) {
rpcTerminationFutures.add(jobManagerRpcService.stopService());
rpcTerminationFutures.add(resourceManagerRpcService.stopService());
http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
index 0a0c692..f728f24 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -32,7 +32,7 @@ import javax.annotation.Nullable;
import scala.concurrent.duration.FiniteDuration;
-import static org.apache.flink.runtime.minicluster.MiniClusterConfiguration.RpcServiceSharing.SHARED;
+import static org.apache.flink.runtime.minicluster.RpcServiceSharing.SHARED;
/**
* Configuration object for the {@link MiniCluster}.
@@ -117,15 +117,6 @@ public class MiniClusterConfiguration {
// Enums
// ----------------------------------------------------------------------------------
- /**
- * Enum which defines whether the mini cluster components use a shared RpcService
- * or whether every component gets its own dedicated RpcService started.
- */
- public enum RpcServiceSharing {
- SHARED, // a single shared rpc service
- DEDICATED // every component gets his own dedicated rpc service
- }
-
// ----------------------------------------------------------------------------------
// Builder
// ----------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/RpcServiceSharing.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/RpcServiceSharing.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/RpcServiceSharing.java
new file mode 100644
index 0000000..7b33869
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/RpcServiceSharing.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.minicluster;
+
+/**
+ * Enum which defines whether the mini cluster components use a shared RpcService
+ * or whether every component gets its own dedicated RpcService started.
+ */
+public enum RpcServiceSharing {
+ SHARED, // a single shared rpc service
+ DEDICATED // every component gets his own dedicated rpc service
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index a72e654..30ac84f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -54,7 +54,7 @@ public class MiniClusterITCase extends TestLogger {
@Test
public void runJobWithSingleRpcService() throws Exception {
MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
- .setRpcServiceSharing(MiniClusterConfiguration.RpcServiceSharing.SHARED)
+ .setRpcServiceSharing(RpcServiceSharing.SHARED)
.setConfiguration(configuration)
.build();
@@ -71,7 +71,7 @@ public class MiniClusterITCase extends TestLogger {
@Test
public void runJobWithMultipleRpcServices() throws Exception {
MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
- .setRpcServiceSharing(MiniClusterConfiguration.RpcServiceSharing.DEDICATED)
+ .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
.setConfiguration(configuration)
.build();
http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
index 12522a8..c6b43c2 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
@@ -24,7 +24,8 @@ import java.util.Objects
import org.apache.flink.configuration.{Configuration, CoreOptions, RestOptions, TaskManagerOptions}
import org.apache.flink.runtime.clusterframework.BootstrapTools
import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfiguration, StandaloneMiniCluster}
-import org.apache.flink.test.util.MiniClusterResource
+import org.apache.flink.test.util.{MiniClusterResource, TestBaseUtils}
+import org.apache.flink.test.util.TestBaseUtils.CodebaseType
import org.apache.flink.util.TestLogger
import org.junit._
import org.junit.rules.TemporaryFolder
@@ -321,8 +322,7 @@ object ScalaShellITCase {
@BeforeClass
def beforeAll(): Unit = {
- val isNew = Objects.equals(MiniClusterResource.NEW_CODEBASE,
- System.getProperty(MiniClusterResource.CODEBASE_KEY))
+ val isNew = TestBaseUtils.isNewCodebase()
if (isNew) {
configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE)
// set to different than default so not to interfere with ScalaShellLocalStartupITCase
http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
index a971db8..f13f57b 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
@@ -19,11 +19,10 @@
package org.apache.flink.api.scala
import java.io._
-import java.util.Objects
import org.apache.flink.configuration.{Configuration, CoreOptions}
import org.apache.flink.runtime.clusterframework.BootstrapTools
-import org.apache.flink.test.util.MiniClusterResource
+import org.apache.flink.test.util.TestBaseUtils
import org.apache.flink.util.TestLogger
import org.junit.rules.TemporaryFolder
import org.junit.{Assert, Rule, Test}
@@ -87,8 +86,7 @@ class ScalaShellLocalStartupITCase extends TestLogger {
System.setOut(new PrintStream(baos))
val configuration = new Configuration()
- val mode = if (Objects.equals(MiniClusterResource.NEW_CODEBASE,
- System.getProperty(MiniClusterResource.CODEBASE_KEY))) {
+ val mode = if (TestBaseUtils.isNewCodebase()) {
CoreOptions.NEW_MODE
} else {
CoreOptions.LEGACY_MODE
http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index 65b351d..0b7a3b3 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -18,7 +18,6 @@
package org.apache.flink.test.util;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.FileUtils;
import org.junit.ClassRule;
@@ -60,10 +59,10 @@ public abstract class AbstractTestBase extends TestBaseUtils {
@ClassRule
public static MiniClusterResource miniClusterResource = new MiniClusterResource(
- new MiniClusterResource.MiniClusterResourceConfiguration(
- new Configuration(),
- 1,
- DEFAULT_PARALLELISM));
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
+ .build());
@ClassRule
public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
index cbe329c..c412bbf 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -18,18 +18,10 @@
package org.apache.flink.test.util;
-import org.apache.flink.api.common.time.Time;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.MiniClusterClient;
import org.apache.flink.client.program.StandaloneClusterClient;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.configuration.UnmodifiableConfiguration;
-import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.configuration.*;
import org.apache.flink.runtime.minicluster.JobExecutorService;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.minicluster.MiniCluster;
@@ -38,13 +30,11 @@ import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
-
import org.junit.rules.ExternalResource;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -56,15 +46,11 @@ public class MiniClusterResource extends ExternalResource {
private static final Logger LOG = LoggerFactory.getLogger(MiniClusterResource.class);
- public static final String CODEBASE_KEY = "codebase";
-
- public static final String NEW_CODEBASE = "new";
-
private final TemporaryFolder temporaryFolder = new TemporaryFolder();
private final MiniClusterResourceConfiguration miniClusterResourceConfiguration;
- private final MiniClusterType miniClusterType;
+ private final TestBaseUtils.CodebaseType miniClusterType;
private JobExecutorService jobExecutorService;
@@ -85,30 +71,30 @@ public class MiniClusterResource extends ExternalResource {
}
public MiniClusterResource(
- final MiniClusterResourceConfiguration miniClusterResourceConfiguration,
- final MiniClusterType miniClusterType) {
+ final MiniClusterResourceConfiguration miniClusterResourceConfiguration,
+ final TestBaseUtils.CodebaseType miniClusterType) {
this(miniClusterResourceConfiguration, miniClusterType, false);
}
public MiniClusterResource(
- final MiniClusterResourceConfiguration miniClusterResourceConfiguration,
- final boolean enableClusterClient) {
+ final MiniClusterResourceConfiguration miniClusterResourceConfiguration,
+ final boolean enableClusterClient) {
this(
miniClusterResourceConfiguration,
- Objects.equals(NEW_CODEBASE, System.getProperty(CODEBASE_KEY)) ? MiniClusterType.NEW : MiniClusterType.LEGACY,
+ miniClusterResourceConfiguration.getCodebaseType(),
enableClusterClient);
}
private MiniClusterResource(
- final MiniClusterResourceConfiguration miniClusterResourceConfiguration,
- final MiniClusterType miniClusterType,
- final boolean enableClusterClient) {
+ final MiniClusterResourceConfiguration miniClusterResourceConfiguration,
+ final TestBaseUtils.CodebaseType miniClusterType,
+ final boolean enableClusterClient) {
this.miniClusterResourceConfiguration = Preconditions.checkNotNull(miniClusterResourceConfiguration);
this.miniClusterType = Preconditions.checkNotNull(miniClusterType);
this.enableClusterClient = enableClusterClient;
}
- public MiniClusterType getMiniClusterType() {
+ public TestBaseUtils.CodebaseType getMiniClusterType() {
return miniClusterType;
}
@@ -187,7 +173,7 @@ public class MiniClusterResource extends ExternalResource {
}
}
- private void startJobExecutorService(MiniClusterType miniClusterType) throws Exception {
+ private void startJobExecutorService(TestBaseUtils.CodebaseType miniClusterType) throws Exception {
switch (miniClusterType) {
case LEGACY:
startLegacyMiniCluster();
@@ -196,7 +182,7 @@ public class MiniClusterResource extends ExternalResource {
startMiniCluster();
break;
default:
- throw new FlinkRuntimeException("Unknown MiniClusterType " + miniClusterType + '.');
+ throw new FlinkRuntimeException("Unknown MiniClusterType " + miniClusterType + '.');
}
}
@@ -264,67 +250,4 @@ public class MiniClusterResource extends ExternalResource {
webUIPort = miniCluster.getRestAddress().getPort();
}
-
- /**
- * Mini cluster resource configuration object.
- */
- public static class MiniClusterResourceConfiguration {
- private final Configuration configuration;
-
- private final int numberTaskManagers;
-
- private final int numberSlotsPerTaskManager;
-
- private final Time shutdownTimeout;
-
- public MiniClusterResourceConfiguration(
- Configuration configuration,
- int numberTaskManagers,
- int numberSlotsPerTaskManager) {
- this(
- configuration,
- numberTaskManagers,
- numberSlotsPerTaskManager,
- AkkaUtils.getTimeoutAsTime(configuration));
- }
-
- public MiniClusterResourceConfiguration(
- Configuration configuration,
- int numberTaskManagers,
- int numberSlotsPerTaskManager,
- Time shutdownTimeout) {
- this.configuration = Preconditions.checkNotNull(configuration);
- this.numberTaskManagers = numberTaskManagers;
- this.numberSlotsPerTaskManager = numberSlotsPerTaskManager;
- this.shutdownTimeout = Preconditions.checkNotNull(shutdownTimeout);
- }
-
- public Configuration getConfiguration() {
- return configuration;
- }
-
- public int getNumberTaskManagers() {
- return numberTaskManagers;
- }
-
- public int getNumberSlotsPerTaskManager() {
- return numberSlotsPerTaskManager;
- }
-
- public Time getShutdownTimeout() {
- return shutdownTimeout;
- }
- }
-
- // ---------------------------------------------
- // Enum definitions
- // ---------------------------------------------
-
- /**
- * Type of the mini cluster to start.
- */
- public enum MiniClusterType {
- LEGACY,
- NEW
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java
new file mode 100644
index 0000000..c938920
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Mini cluster resource configuration object.
+ */
+public class MiniClusterResourceConfiguration {
+
+ private final Configuration configuration;
+
+ private final int numberTaskManagers;
+
+ private final int numberSlotsPerTaskManager;
+
+ private final Time shutdownTimeout;
+
+ private final TestBaseUtils.CodebaseType codebaseType;
+
+ private final RpcServiceSharing rpcServiceSharing;
+
+ MiniClusterResourceConfiguration(
+ Configuration configuration,
+ int numberTaskManagers,
+ int numberSlotsPerTaskManager,
+ Time shutdownTimeout,
+ TestBaseUtils.CodebaseType codebaseType,
+ RpcServiceSharing rpcServiceSharing) {
+ this.configuration = Preconditions.checkNotNull(configuration);
+ this.numberTaskManagers = numberTaskManagers;
+ this.numberSlotsPerTaskManager = numberSlotsPerTaskManager;
+ this.shutdownTimeout = Preconditions.checkNotNull(shutdownTimeout);
+ this.codebaseType = Preconditions.checkNotNull(codebaseType);
+ this.rpcServiceSharing = Preconditions.checkNotNull(rpcServiceSharing);
+ }
+
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+
+ public int getNumberTaskManagers() {
+ return numberTaskManagers;
+ }
+
+ public int getNumberSlotsPerTaskManager() {
+ return numberSlotsPerTaskManager;
+ }
+
+ public Time getShutdownTimeout() {
+ return shutdownTimeout;
+ }
+
+ /**
+ * @deprecated Will be irrelevant once the legacy mode has been removed.
+ */
+ @Deprecated
+ public TestBaseUtils.CodebaseType getCodebaseType() {
+ return codebaseType;
+ }
+
+ public RpcServiceSharing getRpcServiceSharing() {
+ return rpcServiceSharing;
+ }
+
+ /**
+ * Builder for {@link MiniClusterResourceConfiguration}.
+ */
+ public static final class Builder {
+
+ private Configuration configuration = new Configuration();
+ private int numberTaskManagers = 1;
+ private int numberSlotsPerTaskManager = 1;
+ private Time shutdownTimeout = AkkaUtils.getTimeoutAsTime(configuration);
+ private TestBaseUtils.CodebaseType codebaseType = TestBaseUtils.getCodebaseType();
+
+ private RpcServiceSharing rpcServiceSharing = RpcServiceSharing.SHARED;
+
+ public Builder setConfiguration(Configuration configuration) {
+ this.configuration = configuration;
+ return this;
+ }
+
+ public Builder setNumberTaskManagers(int numberTaskManagers) {
+ this.numberTaskManagers = numberTaskManagers;
+ return this;
+ }
+
+ public Builder setNumberSlotsPerTaskManager(int numberSlotsPerTaskManager) {
+ this.numberSlotsPerTaskManager = numberSlotsPerTaskManager;
+ return this;
+ }
+
+ public Builder setShutdownTimeout(Time shutdownTimeout) {
+ this.shutdownTimeout = shutdownTimeout;
+ return this;
+ }
+
+ /**
+ * @deprecated Will be irrelevant once the legacy mode has been removed.
+ */
+ @Deprecated
+ public Builder setCodebaseType(TestBaseUtils.CodebaseType codebaseType) {
+ this.codebaseType = codebaseType;
+ return this;
+ }
+
+ public Builder setRpcServiceSharing(RpcServiceSharing rpcServiceSharing) {
+ this.rpcServiceSharing = rpcServiceSharing;
+ return this;
+ }
+
+ public MiniClusterResourceConfiguration build() {
+ return new MiniClusterResourceConfiguration(configuration, numberTaskManagers, numberSlotsPerTaskManager, shutdownTimeout, codebaseType, rpcServiceSharing);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index 7e9b12e..10fe1f0 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -44,6 +44,8 @@ import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nonnull;
+
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.File;
@@ -103,6 +105,9 @@ public class TestBaseUtils extends TestLogger {
public static final Time DEFAULT_HTTP_TIMEOUT = Time.seconds(10L);
+ static final String NEW_CODEBASE = "new";
+ static final String CODEBASE_KEY = "codebase";
+
// ------------------------------------------------------------------------
protected static File logDir;
@@ -673,6 +678,26 @@ public class TestBaseUtils extends TestLogger {
throw new TimeoutException("Could not get HTTP response in time since the service is still unavailable.");
}
+ @Nonnull
+ public static CodebaseType getCodebaseType() {
+ return Objects.equals(NEW_CODEBASE, System.getProperty(CODEBASE_KEY)) ? CodebaseType.NEW : CodebaseType.LEGACY;
+ }
+
+ public static boolean isNewCodebase() {
+ return CodebaseType.NEW == getCodebaseType();
+ }
+
+ /**
+ * Type of the mini cluster to start.
+ *
+ * @deprecated Will be irrelevant once the legacy mode has been removed.
+ */
+ @Deprecated
+ public enum CodebaseType {
+ LEGACY,
+ NEW
+ }
+
/**
* Comparator for comparable Tuples.
* @param <T> tuple type
http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
index 3d90833..d186e36 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
@@ -50,10 +51,10 @@ public class AccumulatorErrorITCase extends TestLogger {
@ClassRule
public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
- new MiniClusterResource.MiniClusterResourceConfiguration(
- getConfiguration(),
- 2,
- 3));
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(2)
+ .setNumberSlotsPerTaskManager(3)
+ .build());
public static Configuration getConfiguration() {
Configuration config = new Configuration();
http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index 347c2e3..5944d3a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
@@ -84,10 +85,11 @@ public class AccumulatorLiveITCase extends TestLogger {
@ClassRule
public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
- new MiniClusterResource.MiniClusterResourceConfiguration(
- getConfiguration(),
- 1,
- 1),
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(getConfiguration())
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(1)
+ .build(),
true);
private static Configuration getConfiguration() {
http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
index cac16f0..871df7d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
@@ -56,10 +57,11 @@ public abstract class CancelingTestBase extends TestLogger {
@ClassRule
public static final MiniClusterResource CLUSTER = new MiniClusterResource(
- new MiniClusterResource.MiniClusterResourceConfiguration(
- getConfiguration(),
- 2,
- 4),
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(getConfiguration())
+ .setNumberTaskManagers(2)
+ .setNumberSlotsPerTaskManager(4)
+ .build(),
true);
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index 61baefa..a1e31e9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -45,6 +45,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
@@ -106,10 +107,11 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
protected final MiniClusterResource getMiniClusterResource() {
return new MiniClusterResource(
- new MiniClusterResource.MiniClusterResourceConfiguration(
- getConfigurationSafe(),
- 2,
- PARALLELISM / 2));
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(getConfigurationSafe())
+ .setNumberTaskManagers(2)
+ .setNumberSlotsPerTaskManager(PARALLELISM / 2)
+ .build());
}
private Configuration getConfigurationSafe() {
http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index 07167a9..b82abb9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
@@ -66,10 +67,11 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
@ClassRule
public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
- new MiniClusterResource.MiniClusterResourceConfiguration(
- getConfiguration(),
- 2,
- PARALLELISM / 2));
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(getConfiguration())
+ .setNumberTaskManagers(2)
+ .setNumberSlotsPerTaskManager(PARALLELISM / 2)
+ .build());
private static Configuration getConfiguration() {
Configuration config = new Configuration();
http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
index e5868c3..caa1791 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
@@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.util.TestLogger;
import org.junit.ClassRule;
@@ -79,10 +80,11 @@ public class KeyedStateCheckpointingITCase extends TestLogger {
@ClassRule
public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
- new MiniClusterResource.MiniClusterResourceConfiguration(
- getConfiguration(),
- NUM_TASK_MANAGERS,
- NUM_TASK_SLOTS));
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(getConfiguration())
+ .setNumberTaskManagers(NUM_TASK_MANAGERS)
+ .setNumberSlotsPerTaskManager(NUM_TASK_SLOTS)
+ .build());
private static Configuration getConfiguration() {
Configuration config = new Configuration();
http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 55631a2..e7fcefd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -52,6 +52,7 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.util.Collector;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
@@ -127,10 +128,11 @@ public class RescalingITCase extends TestLogger {
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
cluster = new MiniClusterResource(
- new MiniClusterResource.MiniClusterResourceConfiguration(
- config,
- numTaskManagers,
- slotsPerTaskManager),
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(config)
+ .setNumberTaskManagers(numTaskManagers)
+ .setNumberSlotsPerTaskManager(numSlots)
+ .build(),
true);
cluster.before();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
index aebaa63..f4fe8e7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
@@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.test.state.ManualWindowSpeedITCase;
import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.util.TestLogger;
import org.apache.curator.test.TestingServer;
@@ -263,10 +264,11 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
}
MiniClusterResource cluster = new MiniClusterResource(
- new MiniClusterResource.MiniClusterResourceConfiguration(
- config,
- NUM_TASK_MANAGERS,
- SLOTS_PER_TASK_MANAGER),
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(config)
+ .setNumberTaskManagers(NUM_TASK_MANAGERS)
+ .setNumberSlotsPerTaskManager(SLOTS_PER_TASK_MANAGER)
+ .build(),
true);
cluster.before();
http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 2805144..b46c485 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -49,6 +49,7 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
@@ -227,11 +228,11 @@ public class SavepointITCase extends TestLogger {
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
MiniClusterResource cluster = new MiniClusterResource(
- new MiniClusterResource.MiniClusterResourceConfiguration(
- config,
- numTaskManagers,
- numSlotsPerTaskManager
- ),
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(config)
+ .setNumberTaskManagers(numTaskManagers)
+ .setNumberSlotsPerTaskManager(numSlotsPerTaskManager)
+ .build(),
true);
cluster.before();
ClusterClient<?> client = cluster.getClusterClient();
@@ -296,11 +297,11 @@ public class SavepointITCase extends TestLogger {
// Start Flink
MiniClusterResource cluster = new MiniClusterResource(
- new MiniClusterResource.MiniClusterResourceConfiguration(
- config,
- numTaskManagers,
- numSlotsPerTaskManager
- ),
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(config)
+ .setNumberTaskManagers(numTaskManagers)
+ .setNumberSlotsPerTaskManager(numSlotsPerTaskManager)
+ .build(),
true);
LOG.info("Shutting down Flink cluster.");
@@ -341,11 +342,11 @@ public class SavepointITCase extends TestLogger {
// create a new TestingCluster to make sure we start with completely
// new resources
cluster = new MiniClusterResource(
- new MiniClusterResource.MiniClusterResourceConfiguration(
- config,
- numTaskManagers,
- numSlotsPerTaskManager
- ),
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(config)
+ .setNumberTaskManagers(numTaskManagers)
+ .setNumberSlotsPerTaskManager(numSlotsPerTaskManager)
+ .build(),
true);
LOG.info("Restarting Flink cluster.");
cluster.before();
@@ -578,11 +579,11 @@ public class SavepointITCase extends TestLogger {
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
MiniClusterResource cluster = new MiniClusterResource(
- new MiniClusterResource.MiniClusterResourceConfiguration(
- config,
- 1,
- 2 * jobGraph.getMaximumParallelism()
- ),
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(config)
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(2 * jobGraph.getMaximumParallelism())
+ .build(),
true);
cluster.before();
ClusterClient<?> client = cluster.getClusterClient();
@@ -708,11 +709,11 @@ public class SavepointITCase extends TestLogger {
MiniClusterResource get() {
return new MiniClusterResource(
- new MiniClusterResource.MiniClusterResourceConfiguration(
- config,
- numTaskManagers,
- numSlotsPerTaskManager
- ),
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(config)
+ .setNumberTaskManagers(numTaskManagers)
+ .setNumberSlotsPerTaskManager(numSlotsPerTaskManager)
+ .build(),
true);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
index 5ac1214..0ed9d6b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
@@ -19,9 +19,9 @@
package org.apache.flink.test.checkpointing;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.util.TestLogger;
@@ -42,10 +42,10 @@ public abstract class StreamFaultToleranceTestBase extends TestLogger {
@ClassRule
public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
- new MiniClusterResource.MiniClusterResourceConfiguration(
- new Configuration(),
- NUM_TASK_MANAGERS,
- NUM_TASK_SLOTS));
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(NUM_TASK_MANAGERS)
+ .setNumberSlotsPerTaskManager(NUM_TASK_SLOTS)
+ .build());
/**
* Implementations are expected to assemble the test topology in this function
http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
index bc73717..1b93684 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
@@ -35,6 +35,7 @@ import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
@@ -74,10 +75,11 @@ public class WindowCheckpointingITCase extends TestLogger {
@ClassRule
public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
- new MiniClusterResource.MiniClusterResourceConfiguration(
- getConfiguration(),
- 2,
- PARALLELISM / 2));
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(getConfiguration())
+ .setNumberTaskManagers(2)
+ .setNumberSlotsPerTaskManager(PARALLELISM / 2)
+ .build());
private static Configuration getConfiguration() {
Configuration config = new Configuration();
http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
index 156d448..6fb18e8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
@@ -44,6 +44,7 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
@@ -109,10 +110,11 @@ public class ZooKeeperHighAvailabilityITCase extends TestLogger {
// we have to manage this manually because we have to create the ZooKeeper server
// ahead of this
miniClusterResource = new MiniClusterResource(
- new MiniClusterResource.MiniClusterResourceConfiguration(
- config,
- NUM_TMS,
- NUM_SLOTS_PER_TM),
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(config)
+ .setNumberTaskManagers(NUM_TMS)
+ .setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
+ .build(),
true);
miniClusterResource.before();
http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
index 84cb88a..0d43aa9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.OptionalFailure;
@@ -88,10 +89,11 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils {
protected SavepointMigrationTestBase() throws Exception {
miniClusterResource = new MiniClusterResource(
- new MiniClusterResource.MiniClusterResourceConfiguration(
- getConfiguration(),
- 1,
- DEFAULT_PARALLELISM),
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(getConfiguration())
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
+ .build(),
true);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
index d8ed2ee..93bec74 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
@@ -30,6 +30,8 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
@@ -53,12 +55,11 @@ public class JobRetrievalITCase extends TestLogger {
@ClassRule
public static final MiniClusterResource CLUSTER = new MiniClusterResource(
- new MiniClusterResource.MiniClusterResourceConfiguration(
- new Configuration(),
- 1,
- 4
- ),
- MiniClusterResource.MiniClusterType.NEW
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(4)
+ .build(),
+ TestBaseUtils.CodebaseType.NEW
);
private RestClusterClient<StandaloneClusterId> client;
http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
index ecd16a1..5ca9ba2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
@@ -51,10 +52,11 @@ public class JobSubmissionFailsITCase extends TestLogger {
@ClassRule
public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
- new MiniClusterResource.MiniClusterResourceConfiguration(
- getConfiguration(),
- NUM_TM,
- NUM_SLOTS / NUM_TM),
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(getConfiguration())
+ .setNumberTaskManagers(NUM_TM)
+ .setNumberSlotsPerTaskManager(NUM_SLOTS / NUM_TM)
+ .build(),
true);
private static Configuration getConfiguration() {