You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/05/05 11:48:11 UTC
[06/16] flink git commit: [FLINK-6078] Remove CuratorFramework#close
calls from ZooKeeper based HA services
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
index 56d1525..25d9228 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
@@ -30,6 +30,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.test.util.TestEnvironment;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -48,39 +49,30 @@ public class AccumulatorErrorITCase extends TestLogger {
private static LocalFlinkMiniCluster cluster;
+ private static ExecutionEnvironment env;
+
@BeforeClass
public static void startCluster() {
- try {
- Configuration config = new Configuration();
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3);
- config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
- cluster = new LocalFlinkMiniCluster(config, false);
+ Configuration config = new Configuration();
+ config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3);
+ config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
+ cluster = new LocalFlinkMiniCluster(config, false);
- cluster.start();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail("Failed to start test cluster: " + e.getMessage());
- }
+ cluster.start();
+
+ env = new TestEnvironment(cluster, 6, false);
}
@AfterClass
public static void shutdownCluster() {
- try {
- cluster.shutdown();
- cluster = null;
- }
- catch (Exception e) {
- e.printStackTrace();
- fail("Failed to stop test cluster: " + e.getMessage());
- }
+ cluster.shutdown();
+ cluster = null;
}
@Test
public void testFaultyAccumulator() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
env.getConfig().disableSysoutLogging();
// Test Exception forwarding with faulty Accumulator implementation
@@ -93,11 +85,9 @@ public class AccumulatorErrorITCase extends TestLogger {
try {
env.execute();
fail("Should have failed.");
- } catch (ProgramInvocationException e) {
- Assert.assertTrue("Exception should be passed:",
- e.getCause() instanceof JobExecutionException);
+ } catch (JobExecutionException e) {
Assert.assertTrue("Root cause should be:",
- e.getCause().getCause() instanceof CustomException);
+ e.getCause() instanceof CustomException);
}
}
@@ -105,7 +95,6 @@ public class AccumulatorErrorITCase extends TestLogger {
@Test
public void testInvalidTypeAccumulator() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
env.getConfig().disableSysoutLogging();
// Test Exception forwarding with faulty Accumulator implementation
@@ -119,13 +108,11 @@ public class AccumulatorErrorITCase extends TestLogger {
try {
env.execute();
fail("Should have failed.");
- } catch (ProgramInvocationException e) {
- Assert.assertTrue("Exception should be passed:",
- e.getCause() instanceof JobExecutionException);
+ } catch (JobExecutionException e) {
Assert.assertTrue("Root cause should be:",
- e.getCause().getCause() instanceof Exception);
+ e.getCause() instanceof Exception);
Assert.assertTrue("Root cause should be:",
- e.getCause().getCause().getCause() instanceof UnsupportedOperationException);
+ e.getCause().getCause() instanceof UnsupportedOperationException);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index e8ceeba..d91c57f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -36,13 +36,13 @@ import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
@@ -81,6 +81,8 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
private static LocalFlinkMiniCluster cluster;
+ private static TestStreamEnvironment env;
+
@Rule
public TemporaryFolder tempFolder = new TemporaryFolder();
@@ -104,6 +106,8 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
+
+ env = new TestStreamEnvironment(cluster, PARALLELISM);
}
@AfterClass
@@ -153,9 +157,6 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
FailingSource.reset();
try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
- "localhost", cluster.getLeaderRPCPort());
-
env.setParallelism(PARALLELISM);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(100);
@@ -226,9 +227,6 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
FailingSource.reset();
try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
- "localhost", cluster.getLeaderRPCPort());
-
env.setParallelism(PARALLELISM);
env.setMaxParallelism(maxParallelism);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
@@ -296,9 +294,6 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
FailingSource.reset();
try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
- "localhost", cluster.getLeaderRPCPort());
-
env.setMaxParallelism(2 * PARALLELISM);
env.setParallelism(PARALLELISM);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
@@ -361,9 +356,6 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
FailingSource.reset();
try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
- "localhost", cluster.getLeaderRPCPort());
-
env.setParallelism(PARALLELISM);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(100);
@@ -434,9 +426,6 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
FailingSource.reset();
try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
- "localhost", cluster.getLeaderRPCPort());
-
env.setParallelism(PARALLELISM);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(100);
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index 3345b9c..a573be6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -30,13 +30,13 @@ import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
@@ -65,6 +65,8 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
private static LocalFlinkMiniCluster cluster;
+ private static TestStreamEnvironment env;
+
@BeforeClass
public static void startTestCluster() {
@@ -76,6 +78,8 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
config.setString(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT, "60 s");
cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
+
+ env = new TestStreamEnvironment(cluster, PARALLELISM);
}
@AfterClass
@@ -95,9 +99,6 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
FailingSource.reset();
try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
- "localhost", cluster.getLeaderRPCPort());
-
env.setParallelism(PARALLELISM);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(100);
@@ -159,9 +160,6 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
FailingSource.reset();
try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
- "localhost", cluster.getLeaderRPCPort());
-
env.setParallelism(PARALLELISM);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(100);
@@ -220,9 +218,6 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
FailingSource.reset();
try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
- "localhost", cluster.getLeaderRPCPort());
-
env.setParallelism(PARALLELISM);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(100);
@@ -292,9 +287,6 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
FailingSource.reset();
try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
- "localhost", cluster.getLeaderRPCPort());
-
env.setParallelism(PARALLELISM);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(100);
@@ -364,9 +356,6 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
FailingSource.reset();
try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
- "localhost", cluster.getLeaderRPCPort());
-
env.setParallelism(PARALLELISM);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(100);
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
index 2839bc1..5f56def 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.util.TestLogger;
@@ -95,8 +96,7 @@ public abstract class StreamFaultToleranceTestBase extends TestLogger {
@Test
public void runCheckpointedProgram() throws Exception {
try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
- "localhost", cluster.getLeaderRPCPort());
+ TestStreamEnvironment env = new TestStreamEnvironment(cluster, PARALLELISM);
env.setParallelism(PARALLELISM);
env.enableCheckpointing(500);
env.getConfig().disableSysoutLogging();
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
index 56d8c66..7004f75 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
@@ -30,12 +30,12 @@ import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
@@ -76,6 +76,8 @@ public class WindowCheckpointingITCase extends TestLogger {
private static LocalFlinkMiniCluster cluster;
+ private static TestStreamEnvironment env;
+
@BeforeClass
public static void startTestCluster() {
@@ -86,6 +88,8 @@ public class WindowCheckpointingITCase extends TestLogger {
cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
+
+ env = new TestStreamEnvironment(cluster, PARALLELISM);
}
@AfterClass
@@ -103,9 +107,6 @@ public class WindowCheckpointingITCase extends TestLogger {
FailingSource.reset();
try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
- "localhost", cluster.getLeaderRPCPort());
-
env.setParallelism(PARALLELISM);
env.setStreamTimeCharacteristic(timeCharacteristic);
env.getConfig().setAutoWatermarkInterval(10);
@@ -161,9 +162,6 @@ public class WindowCheckpointingITCase extends TestLogger {
FailingSource.reset();
try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
- "localhost", cluster.getLeaderRPCPort());
-
env.setParallelism(PARALLELISM);
env.setStreamTimeCharacteristic(timeCharacteristic);
env.getConfig().setAutoWatermarkInterval(10);
@@ -219,9 +217,6 @@ public class WindowCheckpointingITCase extends TestLogger {
FailingSource.reset();
try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
- "localhost", cluster.getLeaderRPCPort());
-
env.setParallelism(PARALLELISM);
env.setStreamTimeCharacteristic(timeCharacteristic);
env.getConfig().setAutoWatermarkInterval(10);
@@ -267,9 +262,6 @@ public class WindowCheckpointingITCase extends TestLogger {
FailingSource.reset();
try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
- "localhost", cluster.getLeaderRPCPort());
-
env.setParallelism(PARALLELISM);
env.setStreamTimeCharacteristic(timeCharacteristic);
env.getConfig().setAutoWatermarkInterval(10);
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index f25a302..75eb112 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -38,8 +38,10 @@ import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSucc
import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.testdata.KMeansData;
import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.test.util.TestEnvironment;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -54,13 +56,14 @@ import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
import java.io.File;
+import java.io.IOException;
+import java.net.URL;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
public class ClassLoaderITCase extends TestLogger {
@@ -117,61 +120,69 @@ public class ClassLoaderITCase extends TestLogger {
}
FOLDER.delete();
+
+ TestStreamEnvironment.unsetAsContext();
+ TestEnvironment.unsetAsContext();
}
@Test
- public void testJobsWithCustomClassLoader() {
+ public void testJobsWithCustomClassLoader() throws IOException, ProgramInvocationException {
try {
int port = testCluster.getLeaderRPCPort();
- PackagedProgram inputSplitTestProg = new PackagedProgram(
- new File(INPUT_SPLITS_PROG_JAR_FILE),
- new String[] { INPUT_SPLITS_PROG_JAR_FILE,
- "", // classpath
- "localhost",
- String.valueOf(port),
- "4" // parallelism
- });
+ PackagedProgram inputSplitTestProg = new PackagedProgram(new File(INPUT_SPLITS_PROG_JAR_FILE));
+
+ TestEnvironment.setAsContext(
+ testCluster,
+ parallelism,
+ Collections.singleton(new Path(INPUT_SPLITS_PROG_JAR_FILE)),
+ Collections.<URL>emptyList());
+
inputSplitTestProg.invokeInteractiveModeForExecution();
- PackagedProgram streamingInputSplitTestProg = new PackagedProgram(
- new File(STREAMING_INPUT_SPLITS_PROG_JAR_FILE),
- new String[] { STREAMING_INPUT_SPLITS_PROG_JAR_FILE,
- "localhost",
- String.valueOf(port),
- "4" // parallelism
- });
+ PackagedProgram streamingInputSplitTestProg = new PackagedProgram(new File(STREAMING_INPUT_SPLITS_PROG_JAR_FILE));
+
+ TestStreamEnvironment.setAsContext(
+ testCluster,
+ parallelism,
+ Collections.singleton(new Path(STREAMING_INPUT_SPLITS_PROG_JAR_FILE)),
+ Collections.<URL>emptyList());
+
streamingInputSplitTestProg.invokeInteractiveModeForExecution();
- String classpath = new File(INPUT_SPLITS_PROG_JAR_FILE).toURI().toURL().toString();
- PackagedProgram inputSplitTestProg2 = new PackagedProgram(new File(INPUT_SPLITS_PROG_JAR_FILE),
- new String[] { "",
- classpath, // classpath
- "localhost",
- String.valueOf(port),
- "4" // parallelism
- });
+ URL classpath = new File(INPUT_SPLITS_PROG_JAR_FILE).toURI().toURL();
+ PackagedProgram inputSplitTestProg2 = new PackagedProgram(new File(INPUT_SPLITS_PROG_JAR_FILE));
+
+ TestEnvironment.setAsContext(
+ testCluster,
+ parallelism,
+ Collections.<Path>emptyList(),
+ Collections.singleton(classpath));
+
inputSplitTestProg2.invokeInteractiveModeForExecution();
// regular streaming job
- PackagedProgram streamingProg = new PackagedProgram(
- new File(STREAMING_PROG_JAR_FILE),
- new String[] {
- STREAMING_PROG_JAR_FILE,
- "localhost",
- String.valueOf(port)
- });
+ PackagedProgram streamingProg = new PackagedProgram(new File(STREAMING_PROG_JAR_FILE));
+
+ TestStreamEnvironment.setAsContext(
+ testCluster,
+ parallelism,
+ Collections.singleton(new Path(STREAMING_PROG_JAR_FILE)),
+ Collections.<URL>emptyList());
+
streamingProg.invokeInteractiveModeForExecution();
// checkpointed streaming job with custom classes for the checkpoint (FLINK-2543)
// the test also ensures that user specific exceptions are serializable between JobManager <--> JobClient.
try {
- PackagedProgram streamingCheckpointedProg = new PackagedProgram(
- new File(STREAMING_CHECKPOINTED_PROG_JAR_FILE),
- new String[] {
- STREAMING_CHECKPOINTED_PROG_JAR_FILE,
- "localhost",
- String.valueOf(port) });
+ PackagedProgram streamingCheckpointedProg = new PackagedProgram(new File(STREAMING_CHECKPOINTED_PROG_JAR_FILE));
+
+ TestStreamEnvironment.setAsContext(
+ testCluster,
+ parallelism,
+ Collections.singleton(new Path(STREAMING_CHECKPOINTED_PROG_JAR_FILE)),
+ Collections.<URL>emptyList());
+
streamingCheckpointedProg.invokeInteractiveModeForExecution();
} catch (Exception e) {
// we can not access the SuccessException here when executing the tests with maven, because its not available in the jar.
@@ -182,14 +193,18 @@ public class ClassLoaderITCase extends TestLogger {
PackagedProgram kMeansProg = new PackagedProgram(
new File(KMEANS_JAR_PATH),
- new String[] { KMEANS_JAR_PATH,
- "localhost",
- String.valueOf(port),
- "4", // parallelism
- KMeansData.DATAPOINTS,
- KMeansData.INITIAL_CENTERS,
- "25"
+ new String[] {
+ KMeansData.DATAPOINTS,
+ KMeansData.INITIAL_CENTERS,
+ "25"
});
+
+ TestEnvironment.setAsContext(
+ testCluster,
+ parallelism,
+ Collections.singleton(new Path(KMEANS_JAR_PATH)),
+ Collections.<URL>emptyList());
+
kMeansProg.invokeInteractiveModeForExecution();
// test FLINK-3633
@@ -200,6 +215,12 @@ public class ClassLoaderITCase extends TestLogger {
String.valueOf(port),
});
+ TestEnvironment.setAsContext(
+ testCluster,
+ parallelism,
+ Collections.singleton(new Path(USERCODETYPE_JAR_PATH)),
+ Collections.<URL>emptyList());
+
userCodeTypeProg.invokeInteractiveModeForExecution();
File checkpointDir = FOLDER.newFolder();
@@ -208,18 +229,21 @@ public class ClassLoaderITCase extends TestLogger {
final PackagedProgram program = new PackagedProgram(
new File(CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH),
new String[] {
- CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH,
- "localhost",
- String.valueOf(port),
checkpointDir.toURI().toString(),
outputDir.toURI().toString()
});
+ TestStreamEnvironment.setAsContext(
+ testCluster,
+ parallelism,
+ Collections.singleton(new Path(CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH)),
+ Collections.<URL>emptyList());
+
program.invokeInteractiveModeForExecution();
} catch (Exception e) {
if (!(e.getCause().getCause() instanceof SuccessException)) {
- fail(e.getMessage());
+ throw e;
}
}
}
@@ -231,23 +255,25 @@ public class ClassLoaderITCase extends TestLogger {
public void testDisposeSavepointWithCustomKvState() throws Exception {
Deadline deadline = new FiniteDuration(100, TimeUnit.SECONDS).fromNow();
- int port = testCluster.getLeaderRPCPort();
-
File checkpointDir = FOLDER.newFolder();
File outputDir = FOLDER.newFolder();
final PackagedProgram program = new PackagedProgram(
new File(CUSTOM_KV_STATE_JAR_PATH),
new String[] {
- CUSTOM_KV_STATE_JAR_PATH,
- "localhost",
- String.valueOf(port),
String.valueOf(parallelism),
checkpointDir.toURI().toString(),
"5000",
outputDir.toURI().toString()
});
+ TestStreamEnvironment.setAsContext(
+ testCluster,
+ parallelism,
+ Collections.singleton(new Path(CUSTOM_KV_STATE_JAR_PATH)),
+ Collections.<URL>emptyList()
+ );
+
// Execute detached
Thread invokeThread = new Thread(new Runnable() {
@Override
@@ -283,7 +309,7 @@ public class ClassLoaderITCase extends TestLogger {
// Retry if job is not available yet
if (jobId == null) {
- Thread.sleep(100);
+ Thread.sleep(100L);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
index 52a3ba8..795ae41 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
@@ -41,14 +41,11 @@ public class CheckpointedStreamingProgram {
private static final int CHECKPOINT_INTERVALL = 100;
public static void main(String[] args) throws Exception {
- final String jarFile = args[0];
- final String host = args[1];
- final int port = Integer.parseInt(args[2]);
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(host, port, jarFile);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
env.getConfig().disableSysoutLogging();
env.enableCheckpointing(CHECKPOINT_INTERVALL);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 10000));
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 100L));
env.disableOperatorChaining();
DataStream<String> text = env.addSource(new SimpleStringGenerator());
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
index d3baa7d..a24a3a8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
@@ -49,14 +49,11 @@ import java.util.concurrent.ThreadLocalRandom;
public class CheckpointingCustomKvStateProgram {
public static void main(String[] args) throws Exception {
- final String jarFile = args[0];
- final String host = args[1];
- final int port = Integer.parseInt(args[2]);
- final String checkpointPath = args[3];
- final String outputPath = args[4];
+ final String checkpointPath = args[0];
+ final String outputPath = args[1];
final int parallelism = 1;
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(host, port, jarFile);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
env.getConfig().disableSysoutLogging();
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java
index b18e8e8..2caa7cf 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java
@@ -18,7 +18,6 @@
package org.apache.flink.test.classloading.jar;
-import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -30,7 +29,6 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.RemoteEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
@@ -42,14 +40,8 @@ import org.apache.flink.core.io.InputSplitAssigner;
public class CustomInputSplitProgram {
public static void main(String[] args) throws Exception {
- final String[] jarFile = (args[0].equals(""))? null : new String[] { args[0] };
- final URL[] classpath = (args[1].equals(""))? null : new URL[] { new URL(args[1]) };
- final String host = args[2];
- final int port = Integer.parseInt(args[3]);
- final int parallelism = Integer.parseInt(args[4]);
-
- RemoteEnvironment env = new RemoteEnvironment(host, port, null, jarFile, classpath);
- env.setParallelism(parallelism);
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
DataSet<Integer> data = env.createInput(new CustomInputFormat());
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
index 8de4797..cbd553c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
@@ -43,15 +43,12 @@ import java.util.concurrent.ThreadLocalRandom;
public class CustomKvStateProgram {
public static void main(String[] args) throws Exception {
- final String jarFile = args[0];
- final String host = args[1];
- final int port = Integer.parseInt(args[2]);
- final int parallelism = Integer.parseInt(args[3]);
- final String checkpointPath = args[4];
- final int checkpointingInterval = Integer.parseInt(args[5]);
- final String outputPath = args[6];
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(host, port, jarFile);
+ final int parallelism = Integer.parseInt(args[0]);
+ final String checkpointPath = args[1];
+ final int checkpointingInterval = Integer.parseInt(args[2]);
+ final String outputPath = args[3];
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
env.getConfig().disableSysoutLogging();
env.enableCheckpointing(checkpointingInterval);
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/KMeansForTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/KMeansForTest.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/KMeansForTest.java
index 785464a..b8e6c85 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/KMeansForTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/KMeansForTest.java
@@ -52,22 +52,15 @@ public class KMeansForTest {
// *************************************************************************
public static void main(String[] args) throws Exception {
- if (args.length < 7) {
+ if (args.length < 3) {
throw new IllegalArgumentException("Missing parameters");
}
- final String jarFile = args[0];
- final String host = args[1];
- final int port = Integer.parseInt(args[2]);
+ final String pointsData = args[0];
+ final String centersData = args[1];
+ final int numIterations = Integer.parseInt(args[2]);
- final int parallelism = Integer.parseInt(args[3]);
-
- final String pointsData = args[4];
- final String centersData = args[5];
- final int numIterations = Integer.parseInt(args[6]);
-
- ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(host, port, jarFile);
- env.setParallelism(parallelism);
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
// get input data
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/LegacyCheckpointedStreamingProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/LegacyCheckpointedStreamingProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/LegacyCheckpointedStreamingProgram.java
index 60253fa..210973f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/LegacyCheckpointedStreamingProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/LegacyCheckpointedStreamingProgram.java
@@ -38,11 +38,7 @@ public class LegacyCheckpointedStreamingProgram {
private static final int CHECKPOINT_INTERVALL = 100;
public static void main(String[] args) throws Exception {
- final String jarFile = args[0];
- final String host = args[1];
- final int port = Integer.parseInt(args[2]);
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(host, port, jarFile);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
env.enableCheckpointing(CHECKPOINT_INTERVALL);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 10000));
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
index 0f0ee0c..e7bd522 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
@@ -42,18 +42,13 @@ import java.util.List;
public class StreamingCustomInputSplitProgram {
public static void main(String[] args) throws Exception {
- final String jarFile = args[0];
- final String host = args[1];
- final int port = Integer.parseInt(args[2]);
- final int parallelism = Integer.parseInt(args[3]);
-
- Configuration config = new Configuration();
+ Configuration config = new Configuration();
config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "5 s");
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(host, port, config, jarFile);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
env.getConfig().disableSysoutLogging();
- env.setParallelism(parallelism);
DataStream<Integer> data = env.createInput(new CustomInputFormat());
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
index a19d8f2..0fdc744 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
@@ -31,12 +31,7 @@ import org.apache.flink.util.Collector;
public class StreamingProgram {
public static void main(String[] args) throws Exception {
-
- final String jarFile = args[0];
- final String host = args[1];
- final int port = Integer.parseInt(args[2]);
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(host, port, jarFile);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
DataStream<String> text = env.fromElements(WordCountData.TEXT).rebalance();
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java
index a073cba..f12fd5f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java
@@ -43,11 +43,7 @@ public class UserCodeType {
}
public static void main(String[] args) throws Exception {
- String jarFile = args[0];
- String host = args[1];
- int port = Integer.parseInt(args[2]);
-
- ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(host, port, jarFile);
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
DataSet<Integer> input = env.fromElements(1,2,3,4,5);
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
index a74ed34..61595f2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
@@ -53,7 +53,8 @@ public class JobRetrievalITCase extends TestLogger {
@BeforeClass
public static void before() {
- cluster = new TestingCluster(new Configuration(), false);
+ Configuration configuration = new Configuration();
+ cluster = new TestingCluster(configuration, false);
cluster.start();
}
@@ -72,7 +73,7 @@ public class JobRetrievalITCase extends TestLogger {
final JobGraph jobGraph = new JobGraph(jobID, "testjob", imalock);
- final ClusterClient client = new StandaloneClusterClient(cluster.configuration());
+ final ClusterClient client = new StandaloneClusterClient(cluster.configuration(), cluster.highAvailabilityServices());
// acquire the lock to make sure that the job cannot complete until the job client
// has been attached in resumingThread
@@ -122,7 +123,7 @@ public class JobRetrievalITCase extends TestLogger {
try {
client.retrieveJob(jobID);
fail();
- } catch (JobRetrievalException e) {
+ } catch (JobRetrievalException ignored) {
// this is what we want
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
index 133ebd0..da92c05 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
@@ -66,7 +66,7 @@ public class CustomDistributionITCase extends TestLogger {
@Before
public void prepare() {
- TestEnvironment clusterEnv = new TestEnvironment(cluster, 1);
+ TestEnvironment clusterEnv = new TestEnvironment(cluster, 1, false);
clusterEnv.setAsContext();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
index d9d1b42..0091571 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.StandaloneMiniCluster;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
@@ -39,47 +39,35 @@ import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
@SuppressWarnings("serial")
public class RemoteEnvironmentITCase extends TestLogger {
private static final int TM_SLOTS = 4;
- private static final int NUM_TM = 1;
-
private static final int USER_DOP = 2;
private static final String INVALID_STARTUP_TIMEOUT = "0.001 ms";
private static final String VALID_STARTUP_TIMEOUT = "100 s";
- private static LocalFlinkMiniCluster cluster;
+ private static Configuration configuration;
+
+ private static StandaloneMiniCluster cluster;
+
@BeforeClass
- public static void setupCluster() {
- try {
- Configuration config = new Configuration();
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TM);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
- cluster = new LocalFlinkMiniCluster(config, false);
- cluster.start();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail("Error starting test cluster: " + e.getMessage());
- }
+ public static void setupCluster() throws Exception {
+ configuration = new Configuration();
+
+ configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
+
+ cluster = new StandaloneMiniCluster(configuration);
}
@AfterClass
- public static void tearDownCluster() {
- try {
- cluster.stop();
- }
- catch (Throwable t) {
- t.printStackTrace();
- fail("ClusterClient shutdown caused an exception: " + t.getMessage());
- }
+ public static void tearDownCluster() throws Exception {
+ cluster.close();
}
/**
@@ -91,8 +79,8 @@ public class RemoteEnvironmentITCase extends TestLogger {
config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, INVALID_STARTUP_TIMEOUT);
final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
- cluster.hostname(),
- cluster.getLeaderRPCPort(),
+ cluster.getHostname(),
+ cluster.getPort(),
config
);
env.getConfig().disableSysoutLogging();
@@ -116,8 +104,8 @@ public class RemoteEnvironmentITCase extends TestLogger {
config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, VALID_STARTUP_TIMEOUT);
final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
- cluster.hostname(),
- cluster.getLeaderRPCPort(),
+ cluster.getHostname(),
+ cluster.getPort(),
config
);
env.setParallelism(USER_DOP);
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
index 2910f06..eea2509 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
@@ -30,6 +30,7 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.test.util.TestEnvironment;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
@@ -52,6 +53,8 @@ public class AutoParallelismITCase extends TestLogger {
private static LocalFlinkMiniCluster cluster;
+ private static TestEnvironment env;
+
@BeforeClass
public static void setupCluster() {
Configuration config = new Configuration();
@@ -60,6 +63,8 @@ public class AutoParallelismITCase extends TestLogger {
cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
+
+ env = new TestEnvironment(cluster, NUM_TM * SLOTS_PER_TM, false);
}
@AfterClass
@@ -78,9 +83,6 @@ public class AutoParallelismITCase extends TestLogger {
@Test
public void testProgramWithAutoParallelism() {
try {
- ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
- "localhost", cluster.getLeaderRPCPort());
-
env.setParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
env.getConfig().disableSysoutLogging();
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
index b4a7f99..76480ba 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
@@ -19,7 +19,6 @@
package org.apache.flink.test.misc;
import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.ConfigConstants;
@@ -27,7 +26,9 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.test.util.TestEnvironment;
import org.apache.flink.types.Value;
import org.apache.flink.util.TestLogger;
@@ -47,39 +48,28 @@ public class CustomSerializationITCase extends TestLogger {
private static LocalFlinkMiniCluster cluster;
+ private static TestEnvironment env;
+
@BeforeClass
public static void startCluster() {
- try {
- Configuration config = new Configuration();
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARLLELISM);
- config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 30L);
- cluster = new LocalFlinkMiniCluster(config, false);
- cluster.start();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail("Failed to start test cluster: " + e.getMessage());
- }
+ Configuration config = new Configuration();
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARLLELISM);
+ config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 30L);
+ cluster = new LocalFlinkMiniCluster(config, false);
+ cluster.start();
+
+ env = new TestEnvironment(cluster, PARLLELISM, false);
}
@AfterClass
public static void shutdownCluster() {
- try {
- cluster.shutdown();
- cluster = null;
- }
- catch (Exception e) {
- e.printStackTrace();
- fail("Failed to stop test cluster: " + e.getMessage());
- }
+ cluster.shutdown();
+ cluster = null;
}
@Test
public void testIncorrectSerializer1() {
try {
- ExecutionEnvironment env =
- ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
-
env.setParallelism(PARLLELISM);
env.getConfig().disableSysoutLogging();
@@ -96,8 +86,8 @@ public class CustomSerializationITCase extends TestLogger {
env.execute();
}
- catch (ProgramInvocationException e) {
- Throwable rootCause = e.getCause().getCause();
+ catch (JobExecutionException e) {
+ Throwable rootCause = e.getCause();
assertTrue(rootCause instanceof IOException);
assertTrue(rootCause.getMessage().contains("broken serialization"));
}
@@ -110,9 +100,6 @@ public class CustomSerializationITCase extends TestLogger {
@Test
public void testIncorrectSerializer2() {
try {
- ExecutionEnvironment env =
- ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
-
env.setParallelism(PARLLELISM);
env.getConfig().disableSysoutLogging();
@@ -129,8 +116,8 @@ public class CustomSerializationITCase extends TestLogger {
env.execute();
}
- catch (ProgramInvocationException e) {
- Throwable rootCause = e.getCause().getCause();
+ catch (JobExecutionException e) {
+ Throwable rootCause = e.getCause();
assertTrue(rootCause instanceof IOException);
assertTrue(rootCause.getMessage().contains("broken serialization"));
}
@@ -143,9 +130,6 @@ public class CustomSerializationITCase extends TestLogger {
@Test
public void testIncorrectSerializer3() {
try {
- ExecutionEnvironment env =
- ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
-
env.setParallelism(PARLLELISM);
env.getConfig().disableSysoutLogging();
@@ -162,8 +146,8 @@ public class CustomSerializationITCase extends TestLogger {
env.execute();
}
- catch (ProgramInvocationException e) {
- Throwable rootCause = e.getCause().getCause();
+ catch (JobExecutionException e) {
+ Throwable rootCause = e.getCause();
assertTrue(rootCause instanceof IOException);
assertTrue(rootCause.getMessage().contains("broken serialization"));
}
@@ -176,9 +160,6 @@ public class CustomSerializationITCase extends TestLogger {
@Test
public void testIncorrectSerializer4() {
try {
- ExecutionEnvironment env =
- ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
-
env.setParallelism(PARLLELISM);
env.getConfig().disableSysoutLogging();
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
index f885321..7dab0f1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
@@ -23,15 +23,15 @@ import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.test.util.TestEnvironment;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
@@ -54,43 +54,34 @@ import static org.junit.Assert.*;
@SuppressWarnings("serial")
public class MiscellaneousIssuesITCase extends TestLogger {
+ private static final int PARALLELISM = 6;
+
private static LocalFlinkMiniCluster cluster;
+
+ private static TestEnvironment env;
@BeforeClass
public static void startCluster() {
- try {
- Configuration config = new Configuration();
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3);
- config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
- cluster = new LocalFlinkMiniCluster(config, false);
+ Configuration config = new Configuration();
+ config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3);
+ config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
+ cluster = new LocalFlinkMiniCluster(config, false);
- cluster.start();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail("Failed to start test cluster: " + e.getMessage());
- }
+ cluster.start();
+
+ env = new TestEnvironment(cluster, PARALLELISM, false);
}
@AfterClass
public static void shutdownCluster() {
- try {
- cluster.shutdown();
- cluster = null;
- }
- catch (Exception e) {
- e.printStackTrace();
- fail("Failed to stop test cluster: " + e.getMessage());
- }
+ cluster.shutdown();
+ cluster = null;
}
@Test
public void testNullValues() {
try {
- ExecutionEnvironment env =
- ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
-
env.setParallelism(1);
env.getConfig().disableSysoutLogging();
@@ -107,10 +98,9 @@ public class MiscellaneousIssuesITCase extends TestLogger {
env.execute();
fail("this should fail due to null values.");
}
- catch (ProgramInvocationException e) {
+ catch (JobExecutionException e) {
assertNotNull(e.getCause());
- assertNotNull(e.getCause().getCause());
- assertTrue(e.getCause().getCause() instanceof NullPointerException);
+ assertTrue(e.getCause() instanceof NullPointerException);
}
}
catch (Exception e) {
@@ -122,9 +112,6 @@ public class MiscellaneousIssuesITCase extends TestLogger {
@Test
public void testDisjointDataflows() {
try {
- ExecutionEnvironment env =
- ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
-
env.setParallelism(5);
env.getConfig().disableSysoutLogging();
@@ -145,9 +132,6 @@ public class MiscellaneousIssuesITCase extends TestLogger {
final String ACC_NAME = "test_accumulator";
try {
- ExecutionEnvironment env =
- ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
-
env.setParallelism(6);
env.getConfig().disableSysoutLogging();
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
index 4f24452..a5103cc 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
@@ -34,14 +33,17 @@ import org.apache.flink.examples.java.clustering.util.KMeansData;
import org.apache.flink.examples.java.graph.ConnectedComponents;
import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
+import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.test.util.TestEnvironment;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
import static org.junit.Assert.*;
public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger {
-
+
+ private static final int PARALLELISM = 16;
@Test
public void testSuccessfulProgramAfterFailure() {
LocalFlinkMiniCluster cluster = null;
@@ -56,9 +58,11 @@ public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger {
cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
+
+ TestEnvironment env = new TestEnvironment(cluster, PARALLELISM, false);
try {
- runConnectedComponents(cluster.getLeaderRPCPort());
+ runConnectedComponents(env);
}
catch (Exception e) {
e.printStackTrace();
@@ -66,15 +70,15 @@ public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger {
}
try {
- runKMeans(cluster.getLeaderRPCPort());
+ runKMeans(env);
fail("This program execution should have failed.");
}
- catch (ProgramInvocationException e) {
- assertTrue(e.getCause().getCause().getMessage().contains("Insufficient number of network buffers"));
+ catch (JobExecutionException e) {
+ assertTrue(e.getCause().getMessage().contains("Insufficient number of network buffers"));
}
try {
- runConnectedComponents(cluster.getLeaderRPCPort());
+ runConnectedComponents(env);
}
catch (Exception e) {
e.printStackTrace();
@@ -92,10 +96,9 @@ public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger {
}
}
- private static void runConnectedComponents(int jmPort) throws Exception {
+ private static void runConnectedComponents(ExecutionEnvironment env) throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", jmPort);
- env.setParallelism(16);
+ env.setParallelism(PARALLELISM);
env.getConfig().disableSysoutLogging();
// read vertex and edge data
@@ -134,10 +137,9 @@ public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger {
env.execute();
}
- private static void runKMeans(int jmPort) throws Exception {
+ private static void runKMeans(ExecutionEnvironment env) throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", jmPort);
- env.setParallelism(16);
+ env.setParallelism(PARALLELISM);
env.getConfig().disableSysoutLogging();
// get input data
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
index 3c8eb48..6c8e758 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
@@ -170,7 +170,9 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
final Deadline deadline = TEST_TIMEOUT.fromNow();
final int numKeys = 256;
- final QueryableStateClient client = new QueryableStateClient(cluster.configuration());
+ final QueryableStateClient client = new QueryableStateClient(
+ cluster.configuration(),
+ cluster.highAvailabilityServices());
JobID jobId = null;
@@ -396,7 +398,9 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
final int numElements = 1024;
- final QueryableStateClient client = new QueryableStateClient(cluster.configuration());
+ final QueryableStateClient client = new QueryableStateClient(
+ cluster.configuration(),
+ cluster.highAvailabilityServices());
JobID jobId = null;
try {
@@ -461,7 +465,9 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
final int numElements = 1024;
- final QueryableStateClient client = new QueryableStateClient(cluster.configuration());
+ final QueryableStateClient client = new QueryableStateClient(
+ cluster.configuration(),
+ cluster.highAvailabilityServices());
JobID jobId = null;
try {
@@ -586,7 +592,9 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
final int numElements = 1024;
- final QueryableStateClient client = new QueryableStateClient(cluster.configuration());
+ final QueryableStateClient client = new QueryableStateClient(
+ cluster.configuration(),
+ cluster.highAvailabilityServices());
JobID jobId = null;
try {
@@ -679,7 +687,9 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
final int numElements = 1024;
- final QueryableStateClient client = new QueryableStateClient(cluster.configuration());
+ final QueryableStateClient client = new QueryableStateClient(
+ cluster.configuration(),
+ cluster.highAvailabilityServices());
JobID jobId = null;
try {
@@ -743,7 +753,9 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
final int numElements = 1024;
- final QueryableStateClient client = new QueryableStateClient(cluster.configuration());
+ final QueryableStateClient client = new QueryableStateClient(
+ cluster.configuration(),
+ cluster.highAvailabilityServices());
JobID jobId = null;
try {
@@ -844,7 +856,9 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
final int numElements = 1024;
- final QueryableStateClient client = new QueryableStateClient(cluster.configuration());
+ final QueryableStateClient client = new QueryableStateClient(
+ cluster.configuration(),
+ cluster.highAvailabilityServices());
JobID jobId = null;
try {
@@ -1008,6 +1022,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
*/
private static class TestKeyRangeSource extends RichParallelSourceFunction<Tuple2<Integer, Long>>
implements CheckpointListener {
+ private static final long serialVersionUID = -5744725196953582710L;
private final static AtomicLong LATEST_CHECKPOINT_ID = new AtomicLong();
private final int numKeys;
@@ -1055,6 +1070,8 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
}
private static class SumFold implements FoldFunction<Tuple2<Integer, Long>, String> {
+ private static final long serialVersionUID = -6249227626701264599L;
+
@Override
public String fold(String accumulator, Tuple2<Integer, Long> value) throws Exception {
long acc = Long.valueOf(accumulator);
@@ -1064,6 +1081,8 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
}
private static class SumReduce implements ReduceFunction<Tuple2<Integer, Long>> {
+ private static final long serialVersionUID = -8651235077342052336L;
+
@Override
public Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2) throws Exception {
value1.f1 += value2.f1;
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 27d1aa1..c7c07ce 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -26,9 +26,12 @@ import akka.util.Timeout;
import org.apache.commons.io.FileUtils;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.messages.JobManagerMessages;
@@ -86,13 +89,14 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
protected static final int PARALLELISM = 4;
@Test
- public void testTaskManagerProcessFailure() {
+ public void testTaskManagerProcessFailure() throws Exception {
final StringWriter processOutput1 = new StringWriter();
final StringWriter processOutput2 = new StringWriter();
final StringWriter processOutput3 = new StringWriter();
ActorSystem jmActorSystem = null;
+ HighAvailabilityServices highAvailabilityServices = null;
Process taskManagerProcess1 = null;
Process taskManagerProcess2 = null;
Process taskManagerProcess3 = null;
@@ -128,6 +132,13 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9);
jmConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "10 s");
jmConfig.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s");
+ jmConfig.setString(JobManagerOptions.ADDRESS, localAddress._1());
+ jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort);
+
+ highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
+ jmConfig,
+ TestingUtils.defaultExecutor(),
+ HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<>(localAddress));
ActorRef jmActor = JobManager.startJobManagerActors(
@@ -135,6 +146,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
jmActorSystem,
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
+ highAvailabilityServices,
JobManager.class,
MemoryArchivist.class)._1();
@@ -263,6 +275,10 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
// we can ignore this
}
}
+
+ if (highAvailabilityServices != null) {
+ highAvailabilityServices.closeAndCleanupAllData();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
index bba218f..6d53b9f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
@@ -27,6 +27,8 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -34,13 +36,13 @@ import org.apache.flink.runtime.leaderelection.TestingListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
import org.apache.flink.runtime.testutils.JobManagerProcess;
import org.apache.flink.runtime.testutils.TaskManagerProcess;
import org.apache.flink.runtime.testutils.TestJvmProcess;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
-import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
@@ -169,6 +171,11 @@ public class ChaosMonkeyITCase extends TestLogger {
// Task manager
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numberOfSlotsPerTaskManager);
+ final HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
+ config,
+ TestingUtils.defaultExecutor(),
+ HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
+
ActorSystem testActorSystem = null;
LeaderRetrievalService leaderRetrievalService = null;
List<JobManagerProcess> jobManagerProcesses = new ArrayList<>();
@@ -187,7 +194,7 @@ public class ChaosMonkeyITCase extends TestLogger {
testActorSystem = AkkaUtils.createDefaultActorSystem();
// Leader listener
- leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(config);
+ leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
TestingListener leaderListener = new TestingListener();
leaderRetrievalService.start(leaderListener);
@@ -358,6 +365,8 @@ public class ChaosMonkeyITCase extends TestLogger {
if (testActorSystem != null) {
testActorSystem.shutdown();
}
+
+ highAvailabilityServices.closeAndCleanupAllData();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java
index d80c826..0b49814 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java
@@ -30,6 +30,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
@@ -45,18 +46,19 @@ public class FastFailuresITCase extends TestLogger {
@Test
public void testThis() {
+ final int parallelism = 4;
+
Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
- "localhost", cluster.getLeaderRPCPort());
+ TestStreamEnvironment env = new TestStreamEnvironment(cluster, parallelism);
+
env.getConfig().disableSysoutLogging();
- env.setParallelism(4);
+ env.setParallelism(parallelism);
env.enableCheckpointing(1000);
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(210, 0));
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index f910e49..6c70b87 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -26,6 +26,8 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -34,12 +36,12 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.leaderelection.TestingListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
import org.apache.flink.runtime.testutils.JobManagerProcess;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
-import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
@@ -170,6 +172,10 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
final JobManagerProcess[] jobManagerProcess = new JobManagerProcess[2];
LeaderRetrievalService leaderRetrievalService = null;
ActorSystem taskManagerSystem = null;
+ final HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
+ config,
+ TestingUtils.defaultExecutor(),
+ HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
try {
final Deadline deadline = TestTimeOut.fromNow();
@@ -187,16 +193,21 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
// Leader listener
TestingListener leaderListener = new TestingListener();
- leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(config);
+ leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
leaderRetrievalService.start(leaderListener);
// The task manager
taskManagerSystem = AkkaUtils.createActorSystem(
config, Option.apply(new Tuple2<String, Object>("localhost", 0)));
TaskManager.startTaskManagerComponentsAndActor(
- config, ResourceID.generate(), taskManagerSystem, "localhost",
- Option.<String>empty(), Option.<LeaderRetrievalService>empty(),
- false, TaskManager.class);
+ config,
+ ResourceID.generate(),
+ taskManagerSystem,
+ highAvailabilityServices,
+ "localhost",
+ Option.<String>empty(),
+ false,
+ TaskManager.class);
{
// Initial submission
@@ -298,6 +309,8 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
if (testSystem != null) {
testSystem.shutdown();
}
+
+ highAvailabilityServices.closeAndCleanupAllData();
}
}
@@ -323,6 +336,10 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
LeaderRetrievalService leaderRetrievalService = null;
ActorSystem taskManagerSystem = null;
ActorSystem testActorSystem = null;
+ final HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
+ config,
+ TestingUtils.defaultExecutor(),
+ HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
try {
// Test actor system
@@ -338,16 +355,21 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
// Leader listener
TestingListener leaderListener = new TestingListener();
- leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(config);
+ leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
leaderRetrievalService.start(leaderListener);
// The task manager
taskManagerSystem = AkkaUtils.createActorSystem(
config, Option.apply(new Tuple2<String, Object>("localhost", 0)));
TaskManager.startTaskManagerComponentsAndActor(
- config, ResourceID.generate(), taskManagerSystem, "localhost",
- Option.<String>empty(), Option.<LeaderRetrievalService>empty(),
- false, TaskManager.class);
+ config,
+ ResourceID.generate(),
+ taskManagerSystem,
+ highAvailabilityServices,
+ "localhost",
+ Option.<String>empty(),
+ false,
+ TaskManager.class);
// Get the leader
leaderListener.waitForNewLeader(testDeadline.timeLeft().toMillis());
@@ -453,6 +475,8 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
if (testActorSystem != null) {
testActorSystem.shutdown();
}
+
+ highAvailabilityServices.closeAndCleanupAllData();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
index e4d0f65..052195a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
@@ -32,6 +32,8 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -45,12 +47,12 @@ import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage
import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
import org.apache.flink.runtime.testutils.JobManagerProcess;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
-import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.apache.flink.util.TestLogger;
import org.apache.zookeeper.data.Stat;
@@ -183,6 +185,11 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
ActorSystem taskManagerSystem = null;
+ final HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
+ config,
+ TestingUtils.defaultExecutor(),
+ HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
+
try {
final Deadline deadline = TestTimeOut.fromNow();
@@ -199,15 +206,20 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
// Leader listener
TestingListener leaderListener = new TestingListener();
- leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(config);
+ leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
leaderRetrievalService.start(leaderListener);
// The task manager
taskManagerSystem = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
TaskManager.startTaskManagerComponentsAndActor(
- config, ResourceID.generate(), taskManagerSystem, "localhost",
- Option.<String>empty(), Option.<LeaderRetrievalService>empty(),
- false, TaskManager.class);
+ config,
+ ResourceID.generate(),
+ taskManagerSystem,
+ highAvailabilityServices,
+ "localhost",
+ Option.<String>empty(),
+ false,
+ TaskManager.class);
// Client test actor
TestActorRef<RecordingTestClient> clientRef = TestActorRef.create(
@@ -327,6 +339,8 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
if (testSystem != null) {
testSystem.shutdown();
}
+
+ highAvailabilityServices.closeAndCleanupAllData();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
index b6a1bd4..5f9d178 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
@@ -34,16 +34,18 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.leaderelection.TestingListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
import org.apache.flink.runtime.testutils.JobManagerProcess;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
-import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
@@ -238,7 +240,8 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
// Task managers
final ActorSystem[] tmActorSystem = new ActorSystem[numberOfTaskManagers];
- // Leader election service
+ HighAvailabilityServices highAvailabilityServices = null;
+
LeaderRetrievalService leaderRetrievalService = null;
// Coordination between the processes goes through a directory
@@ -263,13 +266,22 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+ highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
+ config,
+ TestingUtils.defaultExecutor());
+
// Start the task manager process
for (int i = 0; i < numberOfTaskManagers; i++) {
tmActorSystem[i] = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
TaskManager.startTaskManagerComponentsAndActor(
- config, ResourceID.generate(), tmActorSystem[i], "localhost",
- Option.<String>empty(), Option.<LeaderRetrievalService>empty(),
- false, TaskManager.class);
+ config,
+ ResourceID.generate(),
+ tmActorSystem[i],
+ highAvailabilityServices,
+ "localhost",
+ Option.<String>empty(),
+ false,
+ TaskManager.class);
}
// Test actor system
@@ -279,7 +291,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
// Leader listener
TestingListener leaderListener = new TestingListener();
- leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(config);
+ leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
leaderRetrievalService.start(leaderListener);
// Initial submission
@@ -378,6 +390,10 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
}
}
+ if (highAvailabilityServices != null) {
+ highAvailabilityServices.closeAndCleanupAllData();
+ }
+
// Delete coordination directory
if (coordinateTempDir != null) {
try {