You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/10/20 09:58:54 UTC
[03/47] flink git commit: [FLINK-2354] [runtime] Add job graph and
checkpoint recovery
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
index 837b643..cd40c82 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
@@ -26,10 +26,10 @@ import org.apache.flink.runtime.io.network.api.reader.RecordReader;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
-import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages.CurrentJobStatus;
import org.apache.flink.runtime.messages.JobManagerMessages.JobNotFound;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
import org.apache.flink.types.IntValue;
import org.junit.Test;
import scala.concurrent.Await;
@@ -48,7 +49,6 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
import static org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
import static org.apache.flink.runtime.messages.JobManagerMessages.RequestJobStatus;
-import static org.junit.Assert.fail;
public class TaskCancelTest {
@@ -109,30 +109,21 @@ public class TaskCancelTest {
// Wait for the job to make some progress and then cancel
awaitRunning(
- flink.getLeaderGateway(TestingUtils.TESTING_DURATION()),
- jobGraph.getJobID(),
- TestingUtils.TESTING_DURATION());
+ flink.getLeaderGateway(TestingUtils.TESTING_DURATION()),
+ jobGraph.getJobID(),
+ TestingUtils.TESTING_DURATION());
Thread.sleep(5000);
cancelJob(
- flink.getLeaderGateway(TestingUtils.TESTING_DURATION()),
- jobGraph.getJobID(),
- TestingUtils.TESTING_DURATION());
+ flink.getLeaderGateway(TestingUtils.TESTING_DURATION()),
+ jobGraph.getJobID(),
+ TestingUtils.TESTING_DURATION());
// Wait for the job to be cancelled
- JobStatus status = awaitTermination(
- flink.getLeaderGateway(TestingUtils.TESTING_DURATION()),
- jobGraph.getJobID(),
- TestingUtils.TESTING_DURATION());
-
- if (status == JobStatus.CANCELED) {
- // Expected :-) All is swell.
- }
- else {
- fail("The job finished with unexpected terminal state " + status + ". "
- + "This indicates that there is a bug in the task cancellation.");
- }
+ JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.CANCELED,
+ flink.getLeaderGateway(TestingUtils.TESTING_DURATION()),
+ TestingUtils.TESTING_DURATION());
}
finally {
if (flink != null) {
@@ -224,42 +215,6 @@ public class TaskCancelTest {
}
- private JobStatus awaitTermination(ActorGateway jobManager, JobID jobId, FiniteDuration timeout)
- throws Exception {
-
- checkNotNull(jobManager);
- checkNotNull(jobId);
- checkNotNull(timeout);
-
- while (true) {
- Future<Object> ask = jobManager.ask(
- new RequestJobStatus(jobId),
- timeout);
-
- Object result = Await.result(ask, timeout);
-
- if (result instanceof CurrentJobStatus) {
- // Success
- CurrentJobStatus status = (CurrentJobStatus) result;
-
- if (!status.jobID().equals(jobId)) {
- throw new Exception("JobManager responded for wrong job ID. Request: "
- + jobId + ", response: " + status.jobID() + ".");
- }
-
- if (status.status().isTerminalState()) {
- return status.status();
- }
- }
- else if (result instanceof JobNotFound) {
- throw new Exception("Cannot find job with ID " + jobId + ".");
- }
- else {
- throw new Exception("Unexpected response to cancel request: " + result);
- }
- }
- }
-
// ---------------------------------------------------------------------------------------------
public static class InfiniteSource extends AbstractInvokable {
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
index 61b1f7a..069b6af 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
@@ -18,26 +18,17 @@
package org.apache.flink.runtime.testutils;
-import static org.junit.Assert.fail;
+import org.apache.flink.runtime.util.FileUtils;
-import java.io.BufferedWriter;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
import java.io.File;
-import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
+import java.io.InputStream;
import java.io.PrintWriter;
+import java.io.StringWriter;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
-import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
+import java.util.UUID;
/**
* This class contains auxiliary methods for unit tests.
@@ -77,6 +68,18 @@ public class CommonTestUtils {
}
/**
+ * Create a temporary log4j configuration for the test.
+ */
+ public static File createTemporaryLog4JProperties() throws IOException {
+ File log4jProps = File.createTempFile(FileUtils.getRandomFilename(""), "-log4j" +
+ ".properties");
+ log4jProps.deleteOnExit();
+ CommonTestUtils.printLog4jDebugConfig(log4jProps);
+
+ return log4jProps;
+ }
+
+ /**
* Tries to get the java executable command with which the current JVM was started.
* Returns null, if the command could not be found.
*
@@ -152,4 +155,50 @@ public class CommonTestUtils {
fw.close();
}
}
+
+ public static File createTempDirectory() throws IOException {
+ File tempDir = new File(System.getProperty("java.io.tmpdir"));
+
+ for (int i = 0; i < 10; i++) {
+ File dir = new File(tempDir, UUID.randomUUID().toString());
+ if (!dir.exists() && dir.mkdirs()) {
+ return dir;
+ }
+ System.err.println("Could not use temporary directory " + dir.getAbsolutePath());
+ }
+
+ throw new IOException("Could not create temporary file directory");
+ }
+
+ /**
+ * Utility class to read the output of a process stream and forward it into a StringWriter.
+ */
+ public static class PipeForwarder extends Thread {
+
+ private final StringWriter target;
+ private final InputStream source;
+
+ public PipeForwarder(InputStream source, StringWriter target) {
+ super("Pipe Forwarder");
+ setDaemon(true);
+
+ this.source = source;
+ this.target = target;
+
+ start();
+ }
+
+ @Override
+ public void run() {
+ try {
+ int next;
+ while ((next = source.read()) != -1) {
+ target.write(next);
+ }
+ }
+ catch (IOException e) {
+ // terminate
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerActorTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerActorTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerActorTestUtils.java
new file mode 100644
index 0000000..66e1d9b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerActorTestUtils.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.JobManagerMessages.CurrentJobStatus;
+import org.apache.flink.runtime.messages.JobManagerMessages.JobStatusResponse;
+import org.apache.flink.runtime.testingUtils.TestingJobManager;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.flink.runtime.messages.JobManagerMessages.JobNotFound;
+import static org.apache.flink.runtime.messages.JobManagerMessages.RequestJobStatus;
+import static org.apache.flink.runtime.messages.JobManagerMessages.getRequestJobStatus;
+import static org.apache.flink.runtime.messages.JobManagerMessages.getRequestNumberRegisteredTaskManager;
+
+/**
+ * JobManager actor test utilities.
+ *
+ * <p>If you are using a {@link TestingJobManager} most of these are not needed.
+ */
+public class JobManagerActorTestUtils {
+
+ /**
+ * Waits for the expected {@link JobStatus}.
+ *
+ * <p>Repeatedly queries the JobManager via {@link RequestJobStatus} messages.
+ *
+ * @param jobId Job ID of the job to wait for
+ * @param expectedJobStatus Expected job status
+ * @param jobManager Job manager actor to ask
+ * @param timeout Timeout after which the operation fails
+ * @throws Exception If the job is not found within the timeout or the job is in another state.
+ */
+ public static void waitForJobStatus(
+ JobID jobId,
+ JobStatus expectedJobStatus,
+ ActorGateway jobManager,
+ FiniteDuration timeout) throws Exception {
+
+ checkNotNull(jobId, "Job ID");
+ checkNotNull(expectedJobStatus, "Expected job status");
+ checkNotNull(jobManager, "Job manager");
+ checkNotNull(timeout, "Timeout");
+
+ final Deadline deadline = timeout.fromNow();
+
+ while (deadline.hasTimeLeft()) {
+ // Request the job status
+ JobStatusResponse response = requestJobStatus(jobId, jobManager, deadline.timeLeft());
+
+ // Found the job
+ if (response instanceof CurrentJobStatus) {
+ JobStatus jobStatus = ((CurrentJobStatus) response).status();
+
+ // OK, that's what we were waiting for
+ if (jobStatus == expectedJobStatus) {
+ return;
+ }
+ else if (jobStatus.isTerminalState()) {
+ throw new IllegalStateException("Job is in terminal state " + jobStatus + ", "
+ + "but was waiting for " + expectedJobStatus + ".");
+ }
+ }
+ // Did not find the job... retry
+ else if (response instanceof JobNotFound) {
+ Thread.sleep(Math.min(100, deadline.timeLeft().toMillis()));
+ }
+ else {
+ throw new IllegalStateException("Unexpected response.");
+ }
+ }
+
+ throw new IllegalStateException("Job not found within deadline.");
+ }
+
+ /**
+ * Request a {@link JobStatusResponse}.
+ *
+ * @param jobId Job ID of the job to request the status of
+ * @param jobManager Job manager actor to ask
+ * @param timeout Timeout after which the operation fails
+ * @return The {@link JobStatusResponse} from the job manager
+ * @throws Exception If there is no answer within the timeout.
+ */
+ public static JobStatusResponse requestJobStatus(
+ JobID jobId,
+ ActorGateway jobManager,
+ FiniteDuration timeout) throws Exception {
+
+ checkNotNull(jobId, "Job ID");
+ checkNotNull(jobManager, "Job manager");
+ checkNotNull(timeout, "Timeout");
+
+ // Ask the JobManager
+ RequestJobStatus request = (RequestJobStatus) getRequestJobStatus(jobId);
+ Future<Object> ask = jobManager.ask(request, timeout);
+ Object response = Await.result(ask, timeout);
+
+ if (response instanceof JobStatusResponse) {
+ return (JobStatusResponse) response;
+ }
+
+ throw new IllegalStateException("Unexpected response.");
+ }
+
+ /**
+ * Waits for a minimum number of task managers to connect to the job manager.
+ *
+ * @param minimumNumberOfTaskManagers Minimum number of task managers to wait for
+ * @param jobManager Job manager actor to ask
+ * @param timeout Timeout after which the operation fails
+ * @throws Exception If the task managers don't connection with the timeout.
+ */
+ public static void waitForTaskManagers(
+ int minimumNumberOfTaskManagers,
+ ActorGateway jobManager,
+ FiniteDuration timeout) throws Exception {
+
+ checkArgument(minimumNumberOfTaskManagers >= 1);
+ checkNotNull(jobManager, "Job manager");
+ checkNotNull(timeout, "Timeout");
+
+ final Deadline deadline = timeout.fromNow();
+
+ while (deadline.hasTimeLeft()) {
+ Future<Object> ask = jobManager.ask(getRequestNumberRegisteredTaskManager(),
+ deadline.timeLeft());
+
+ Integer response = (Integer) Await.result(ask, deadline.timeLeft());
+
+ // All are connected. We are done.
+ if (response >= minimumNumberOfTaskManagers) {
+ return;
+ }
+ // Waiting for more... retry
+ else {
+ Thread.sleep(Math.min(100, deadline.timeLeft().toMillis()));
+ }
+ }
+
+ throw new IllegalStateException("Task managers not connected within deadline.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
new file mode 100644
index 0000000..85b768d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.JobManagerMode;
+import org.apache.flink.util.NetUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A {@link JobManager} instance running in a separate JVM.
+ */
+public class JobManagerProcess extends TestJvmProcess {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JobManagerProcess.class);
+
+ /** ID for this JobManager */
+ private final int id;
+
+ /** The port the JobManager listens on */
+ private final int jobManagerPort;
+
+ /** The configuration for the JobManager */
+ private final Configuration config;
+
+ /** Configuration parsed as args for {@link JobManagerProcess.JobManagerProcessEntryPoint} */
+ private final String[] jvmArgs;
+
+ private ActorRef jobManagerRef;
+
+ /**
+ * Creates a {@link JobManager} running in a separate JVM.
+ *
+ * <p>See {@link #JobManagerProcess(int, Configuration, int)} for a more
+ * detailed
+ * description.
+ *
+ * @param config Configuration for the job manager process
+ * @throws Exception
+ */
+ public JobManagerProcess(int id, Configuration config) throws Exception {
+ this(id, config, 0);
+ }
+
+ /**
+ * Creates a {@link JobManager} running in a separate JVM.
+ *
+ * @param id ID for the JobManager
+ * @param config Configuration for the job manager process
+ * @param jobManagerPort Job manager port (if <code>0</code>, pick any available port)
+ * @throws Exception
+ */
+ public JobManagerProcess(int id, Configuration config, int jobManagerPort) throws Exception {
+ checkArgument(id >= 0, "Negative ID");
+ this.id = id;
+ this.config = checkNotNull(config, "Configuration");
+ this.jobManagerPort = jobManagerPort <= 0 ? NetUtils.getAvailablePort() : jobManagerPort;
+
+ ArrayList<String> args = new ArrayList<>();
+ args.add("--port");
+ args.add(String.valueOf(this.jobManagerPort));
+
+ for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
+ args.add("--" + entry.getKey());
+ args.add(entry.getValue());
+ }
+
+ this.jvmArgs = new String[args.size()];
+ args.toArray(jvmArgs);
+ }
+
+ @Override
+ public String getName() {
+ return "JobManager " + id;
+ }
+
+ @Override
+ public String[] getJvmArgs() {
+ return jvmArgs;
+ }
+
+ @Override
+ public String getEntryPointClassName() {
+ return JobManagerProcessEntryPoint.class.getName();
+ }
+
+ public int getJobManagerPort() {
+ return jobManagerPort;
+ }
+
+ public Configuration getConfig() {
+ return config;
+ }
+
+ /**
+ * Returns the Akka URL of this JobManager.
+ */
+ public String getJobManagerAkkaURL() {
+ return JobManager.getRemoteJobManagerAkkaURL(
+ new InetSocketAddress("localhost", jobManagerPort),
+ Option.<String>empty());
+ }
+
+ @Override
+ public String toString() {
+ return String.format("JobManagerProcess(id=%d, port=%d)", id, jobManagerPort);
+ }
+
+ /**
+ * Waits for the job manager to be reachable.
+ *
+ * <p><strong>Important:</strong> Make sure to set the timeout larger than Akka's gating
+ * time. Otherwise, this will effectively not wait for the JobManager to startup, because the
+ * time out will fire immediately.
+ *
+ * @param actorSystem Actor system to be used to resolve JobManager address.
+ * @param timeout Timeout (make sure to set larger than Akka's gating time).
+ */
+ public ActorRef getActorRef(ActorSystem actorSystem, FiniteDuration timeout)
+ throws Exception {
+
+ if (jobManagerRef != null) {
+ return jobManagerRef;
+ }
+
+ checkNotNull(actorSystem, "Actor system");
+
+ // Deadline passes timeout ms
+ Deadline deadline = timeout.fromNow();
+
+ while (deadline.hasTimeLeft()) {
+ try {
+ // If the Actor is not reachable yet, this throws an Exception. Retry until the
+ // deadline passes.
+ this.jobManagerRef = AkkaUtils.getActorRef(
+ getJobManagerAkkaURL(),
+ actorSystem,
+ deadline.timeLeft());
+
+ return jobManagerRef;
+ }
+ catch (Throwable ignored) {
+ // Retry
+ Thread.sleep(Math.min(100, deadline.timeLeft().toMillis()));
+ }
+ }
+
+ throw new IllegalStateException("JobManager did not start up within " + timeout + ".");
+ }
+
+ /**
+ * Entry point for the JobManager process.
+ */
+ public static class JobManagerProcessEntryPoint {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JobManagerProcessEntryPoint.class);
+
+ /**
+ * Runs the JobManager process in {@link JobManagerMode#CLUSTER} and {@link
+ * StreamingMode#STREAMING} (can handle both batch and streaming jobs).
+ *
+ * <p><strong>Required argument</strong>: <code>port</code>. Start the process with
+ * <code>--port PORT</code>.
+ *
+ * <p>Other arguments are parsed to a {@link Configuration} and passed to the
+ * JobManager, for instance: <code>--recovery.mode ZOOKEEPER --ha.zookeeper.quorum
+ * "xyz:123:456"</code>.
+ */
+ public static void main(String[] args) {
+ try {
+ ParameterTool params = ParameterTool.fromArgs(args);
+ final int port = Integer.valueOf(params.getRequired("port"));
+ LOG.info("Running on port {}.", port);
+
+ Configuration config = params.getConfiguration();
+ LOG.info("Configuration: {}.", config);
+
+ // Run the JobManager
+ JobManager.runJobManager(config, JobManagerMode.CLUSTER, StreamingMode.STREAMING,
+ "localhost", port);
+
+ // Run forever. Forever, ever? Forever, ever!
+ new CountDownLatch(1).await();
+ }
+ catch (Throwable t) {
+ LOG.error("Failed to start JobManager process", t);
+ System.exit(1);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
new file mode 100644
index 0000000..f683c55
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A {@link TaskManager} instance running in a separate JVM.
+ */
+public class TaskManagerProcess extends TestJvmProcess {
+
+ /** ID for this TaskManager */
+ private final int id;
+
+ /** The configuration for the TaskManager */
+ private final Configuration config;
+
+ /** Configuration parsed as args for {@link TaskManagerProcess.TaskManagerProcessEntryPoint} */
+ private final String[] jvmArgs;
+
+ public TaskManagerProcess(int id, Configuration config) throws Exception {
+ checkArgument(id >= 0, "Negative ID");
+ this.id = id;
+ this.config = checkNotNull(config, "Configuration");
+
+ ArrayList<String> args = new ArrayList<>();
+
+ for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
+ args.add("--" + entry.getKey());
+ args.add(entry.getValue());
+ }
+
+ this.jvmArgs = new String[args.size()];
+ args.toArray(jvmArgs);
+ }
+
+ @Override
+ public String getName() {
+ return "TaskManager " + id;
+ }
+
+ @Override
+ public String[] getJvmArgs() {
+ return jvmArgs;
+ }
+
+ @Override
+ public String getEntryPointClassName() {
+ return TaskManagerProcessEntryPoint.class.getName();
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("TaskManagerProcess(id=%d)", id);
+ }
+
+ /**
+ * Entry point for the TaskManager process.
+ */
+ public static class TaskManagerProcessEntryPoint {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TaskManagerProcessEntryPoint.class);
+
+ /**
+ * Runs the JobManager process in {@link StreamingMode#STREAMING} (can handle both batch
+ * and streaming jobs).
+ *
+ * <p>All arguments are parsed to a {@link Configuration} and passed to the Taskmanager,
+ * for instance: <code>--recovery.mode ZOOKEEPER --ha.zookeeper.quorum "xyz:123:456"</code>.
+ */
+ public static void main(String[] args) throws Exception {
+ try {
+ Configuration config = ParameterTool.fromArgs(args).getConfiguration();
+
+ if (!config.containsKey(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY)) {
+ config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
+ }
+
+ if (!config.containsKey(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY)) {
+ config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100);
+ }
+
+
+ LOG.info("Configuration: {}.", config);
+
+ // Run the TaskManager
+ TaskManager.selectNetworkInterfaceAndRunTaskManager(
+ config, StreamingMode.STREAMING, TaskManager.class);
+
+ // Run forever
+ new CountDownLatch(1).await();
+ }
+ catch (Throwable t) {
+ LOG.error("Failed to start TaskManager process", t);
+ System.exit(1);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
new file mode 100644
index 0000000..0920b5c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Arrays;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.createTemporaryLog4JProperties;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
+import static org.junit.Assert.fail;
+
+/**
+ * A {@link Process} running a separate JVM.
+ */
+public abstract class TestJvmProcess {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestJvmProcess.class);
+
+ /** Lock to guard {@link #createAndStart()} and {@link #destroy()} calls. */
+ private final Object createDestroyLock = new Object();
+
+ /** The java command path */
+ private final String javaCommandPath;
+
+ /** The log4j configuration path. */
+ private final String log4jConfigFilePath;
+
+ /** Shutdown hook for resource cleanup */
+ private final Thread shutdownHook;
+
+ /** JVM process memory (set for both '-Xms' and '-Xmx'). */
+ private int jvmMemoryInMb = 80;
+
+ /** The JVM process */
+ private Process process;
+
+ /** Writer for the process output */
+ private volatile StringWriter processOutput;
+
+ public TestJvmProcess() throws Exception {
+ this(getJavaCommandPath(), createTemporaryLog4JProperties().getPath());
+ }
+
+ public TestJvmProcess(String javaCommandPath, String log4jConfigFilePath) {
+ this.javaCommandPath = checkNotNull(javaCommandPath, "Java command path");
+ this.log4jConfigFilePath = checkNotNull(log4jConfigFilePath, "log4j config file path");
+
+ this.shutdownHook = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ destroy();
+ }
+ catch (Throwable t) {
+ LOG.error("Error during process cleanup shutdown hook.", t);
+ }
+ }
+ });
+ }
+
+ /**
+ * Returns the name of the process.
+ */
+ public abstract String getName();
+
+ /**
+ * Returns the arguments to the JVM.
+ *
+ * <p>These can be parsed by the main method of the entry point class.
+ */
+ public abstract String[] getJvmArgs();
+
+ /**
+ * Returns the name of the class to run.
+ *
+ * <p>Arguments to the main method can be specified via {@link #getJvmArgs()}.
+ */
+ public abstract String getEntryPointClassName();
+
+ // ---------------------------------------------------------------------------------------------
+
+ /**
+ * Sets the memory for the process (<code>-Xms</code> and <code>-Xmx</code> flags) (>= 80).
+ *
+ * @param jvmMemoryInMb Amount of memory in Megabytes for the JVM (>= 80).
+ */
+ public void setJVMMemory(int jvmMemoryInMb) {
+ checkArgument(jvmMemoryInMb >= 80, "JobManager JVM Requires at least 80 MBs of memory.");
+ this.jvmMemoryInMb = jvmMemoryInMb;
+ }
+
+ /**
+ * Creates and starts the {@link Process}.
+ *
+ * <strong>Important:</strong> Don't forget to call {@link #destroy()} to prevent
+ * resource leaks. The created process will be child process and is not guaranteed to
+ * terminate when the parent process terminates.
+ */
+ public void createAndStart() throws IOException {
+ String[] cmd = new String[] {
+ javaCommandPath,
+ "-Dlog.level=DEBUG",
+ "-Dlog4j.configuration=file:" + log4jConfigFilePath,
+ "-Xms" + jvmMemoryInMb + "m",
+ "-Xmx" + jvmMemoryInMb + "m",
+ "-classpath", getCurrentClasspath(),
+ getEntryPointClassName() };
+
+ String[] jvmArgs = getJvmArgs();
+
+ if (jvmArgs != null && jvmArgs.length > 0) {
+ cmd = ArrayUtils.addAll(cmd, jvmArgs);
+ }
+
+ synchronized (createDestroyLock) {
+ if (process == null) {
+ LOG.debug("Running command '{}'.", Arrays.toString(cmd));
+ this.process = new ProcessBuilder(cmd).start();
+
+ // Forward output
+ this.processOutput = new StringWriter();
+ new CommonTestUtils.PipeForwarder(process.getErrorStream(), processOutput);
+
+ try {
+ // Add JVM shutdown hook to call shutdown of service
+ Runtime.getRuntime().addShutdownHook(shutdownHook);
+ }
+ catch (IllegalStateException ignored) {
+ // JVM is already shutting down. No need to do this.
+ }
+ catch (Throwable t) {
+ LOG.error("Cannot register process cleanup shutdown hook.", t);
+ }
+ }
+ else {
+ throw new IllegalStateException("Already running.");
+ }
+ }
+ }
+
+ public void printProcessLog() {
+ if (processOutput == null) {
+ throw new IllegalStateException("Not started");
+ }
+
+ System.out.println("-----------------------------------------");
+ System.out.println(" BEGIN SPAWNED PROCESS LOG FOR " + getName());
+ System.out.println("-----------------------------------------");
+
+ String out = processOutput.toString();
+ if (out == null || out.length() == 0) {
+ System.out.println("(EMPTY)");
+ }
+ else {
+ System.out.println(out);
+ }
+
+ System.out.println("-----------------------------------------");
+ System.out.println(" END SPAWNED PROCESS LOG " + getName());
+ System.out.println("-----------------------------------------");
+ }
+
+ public void destroy() {
+ synchronized (createDestroyLock) {
+ if (process != null) {
+ LOG.debug("Destroying " + getName() + " process.");
+
+ try {
+ process.destroy();
+ }
+ catch (Throwable t) {
+ LOG.error("Error while trying to destroy process.", t);
+ }
+ finally {
+ process = null;
+
+ if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
+ try {
+ Runtime.getRuntime().removeShutdownHook(shutdownHook);
+ }
+ catch (IllegalStateException ignored) {
+ // JVM is in shutdown already, we can safely ignore this.
+ }
+ catch (Throwable t) {
+ LOG.warn("Exception while unregistering prcess cleanup shutdown hook.");
+ }
+ }
+ }
+ }
+ }
+ }
+
+ // ---------------------------------------------------------------------------------------------
+ // File based synchronization utilities
+ // ---------------------------------------------------------------------------------------------
+
+ public static void touchFile(File file) throws IOException {
+ if (!file.exists()) {
+ new FileOutputStream(file).close();
+ }
+ if (!file.setLastModified(System.currentTimeMillis())) {
+ throw new IOException("Could not touch the file.");
+ }
+ }
+
+ public static void waitForMarkerFiles(File basedir, String prefix, int num, long timeout) {
+ long now = System.currentTimeMillis();
+ final long deadline = now + timeout;
+
+
+ while (now < deadline) {
+ boolean allFound = true;
+
+ for (int i = 0; i < num; i++) {
+ File nextToCheck = new File(basedir, prefix + i);
+ if (!nextToCheck.exists()) {
+ allFound = false;
+ break;
+ }
+ }
+
+ if (allFound) {
+ return;
+ }
+ else {
+ // not all found, wait for a bit
+ try {
+ Thread.sleep(10);
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ now = System.currentTimeMillis();
+ }
+ }
+
+ fail("The tasks were not started within time (" + timeout + "msecs)");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
new file mode 100644
index 0000000..d2e5b6a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * ZooKeeper test utilities.
+ */
+public class ZooKeeperTestUtils {
+
+ /**
+ * Creates a configuration to operate in {@link RecoveryMode#ZOOKEEPER}.
+ *
+ * @param zooKeeperQuorum ZooKeeper quorum to connect to
+ * @param fsStateHandlePath Base path for file system state backend (for checkpoints and
+ * recovery)
+ * @return A new configuration to operate in {@link RecoveryMode#ZOOKEEPER}.
+ */
+ public static Configuration createZooKeeperRecoveryModeConfig(
+ String zooKeeperQuorum, String fsStateHandlePath) {
+
+ return setZooKeeperRecoveryMode(new Configuration(), zooKeeperQuorum, fsStateHandlePath);
+ }
+
+ /**
+ * Sets all necessary configuration keys to operate in {@link RecoveryMode#ZOOKEEPER}.
+ *
+ * @param config Configuration to use
+ * @param zooKeeperQuorum ZooKeeper quorum to connect to
+ * @param fsStateHandlePath Base path for file system state backend (for checkpoints and
+ * recovery)
+ * @return The modified configuration to operate in {@link RecoveryMode#ZOOKEEPER}.
+ */
+ public static Configuration setZooKeeperRecoveryMode(
+ Configuration config,
+ String zooKeeperQuorum,
+ String fsStateHandlePath) {
+
+ checkNotNull(config, "Configuration");
+ checkNotNull(zooKeeperQuorum, "ZooKeeper quorum");
+ checkNotNull(fsStateHandlePath, "File state handle backend path");
+
+ // Web frontend, you have been dismissed. Sorry.
+ config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1);
+
+ // ZooKeeper recovery mode
+ config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER");
+ config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, zooKeeperQuorum);
+
+ int connTimeout = 5000;
+ if (System.getenv().get("CI") != null) {
+ // The regular timeout is to aggressive for Travis and connections are often lost.
+ connTimeout = 20000;
+ }
+
+ config.setInteger(ConfigConstants.ZOOKEEPER_CONNECTION_TIMEOUT, connTimeout);
+ config.setInteger(ConfigConstants.ZOOKEEPER_SESSION_TIMEOUT, connTimeout);
+
+ // File system state backend
+ config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
+ config.setString(ConfigConstants.STATE_BACKEND_FS_DIR, fsStateHandlePath + "/checkpoints");
+ config.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, fsStateHandlePath + "/recovery");
+
+ // Akka failure detection and execution retries
+ config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms");
+ config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "6 s");
+ config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9);
+ config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "10 s");
+
+ return config;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
new file mode 100644
index 0000000..f0130ec
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
@@ -0,0 +1,591 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.TestLogger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.Stat;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for basic {@link ZooKeeperStateHandleStore} behaviour.
+ *
+ * <p> Tests include:
+ * <ul>
+ * <li>Expected usage of operations</li>
+ * <li>Correct ordering of ZooKeeper and state handle operations</li>
+ * </ul>
+ */
+public class ZooKeeperStateHandleStoreITCase extends TestLogger {
+
+ private final static ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ if (ZooKeeper != null) {
+ ZooKeeper.shutdown();
+ }
+ }
+
+ @Before
+ public void cleanUp() throws Exception {
+ ZooKeeper.deleteAll();
+ }
+
+ /**
+ * Tests add operation with default {@link CreateMode}.
+ */
+ @Test
+ public void testAdd() throws Exception {
+ // Setup
+ LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+
+ ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+ ZooKeeper.getClient(), stateHandleProvider);
+
+ // Config
+ final String pathInZooKeeper = "/testAdd";
+ final Long state = 1239712317L;
+
+ // Test
+ store.add(pathInZooKeeper, state);
+
+ // Verify
+ // State handle created
+ assertEquals(1, stateHandleProvider.getStateHandles().size());
+ assertEquals(state, stateHandleProvider.getStateHandles().get(0).getState(null));
+
+ // Path created and is persistent
+ Stat stat = ZooKeeper.getClient().checkExists().forPath(pathInZooKeeper);
+ assertNotNull(stat);
+ assertEquals(0, stat.getEphemeralOwner());
+
+ // Data is equal
+ @SuppressWarnings("unchecked")
+ Long actual = ((StateHandle<Long>) InstantiationUtil.deserializeObject(
+ ZooKeeper.getClient().getData().forPath(pathInZooKeeper),
+ ClassLoader.getSystemClassLoader())).getState(null);
+
+ assertEquals(state, actual);
+ }
+
+ /**
+ * Tests that {@link CreateMode} is respected.
+ */
+ @Test
+ public void testAddWithCreateMode() throws Exception {
+ LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+
+ ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+ ZooKeeper.getClient(), stateHandleProvider);
+
+ // Config
+ Long state = 3457347234L;
+
+ CreateMode[] modes = CreateMode.values();
+ for (int i = 0; i < modes.length; i++) {
+ CreateMode mode = modes[i];
+ state += i;
+
+ String pathInZooKeeper = "/testAddWithCreateMode" + mode.name();
+
+ // Test
+ store.add(pathInZooKeeper, state, mode);
+
+ if (mode.isSequential()) {
+ // Figure out the sequential ID
+ List<String> paths = ZooKeeper.getClient().getChildren().forPath("/");
+ for (String p : paths) {
+ if (p.startsWith("testAddWithCreateMode" + mode.name())) {
+ pathInZooKeeper = "/" + p;
+ break;
+ }
+ }
+ }
+
+ // Verify
+ // State handle created
+ assertEquals(i + 1, stateHandleProvider.getStateHandles().size());
+ assertEquals(state, stateHandleProvider.getStateHandles().get(i).getState(null));
+
+ // Path created
+ Stat stat = ZooKeeper.getClient().checkExists().forPath(pathInZooKeeper);
+
+ assertNotNull(stat);
+
+ // Is ephemeral or persistent
+ if (mode.isEphemeral()) {
+ assertTrue(stat.getEphemeralOwner() != 0);
+ }
+ else {
+ assertEquals(0, stat.getEphemeralOwner());
+ }
+
+ // Data is equal
+ @SuppressWarnings("unchecked")
+ Long actual = ((StateHandle<Long>) InstantiationUtil.deserializeObject(
+ ZooKeeper.getClient().getData().forPath(pathInZooKeeper),
+ ClassLoader.getSystemClassLoader())).getState(null);
+
+ assertEquals(state, actual);
+ }
+ }
+
+ /**
+ * Tests that an existing path throws an Exception.
+ */
+ @Test(expected = Exception.class)
+ public void testAddAlreadyExistingPath() throws Exception {
+ LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+
+ ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+ ZooKeeper.getClient(), stateHandleProvider);
+
+ ZooKeeper.getClient().create().forPath("/testAddAlreadyExistingPath");
+
+ store.add("/testAddAlreadyExistingPath", 1L);
+ }
+
+ /**
+ * Tests that the created state handle is discarded if ZooKeeper create fails.
+ */
+ @Test
+ public void testAddDiscardStateHandleAfterFailure() throws Exception {
+ // Setup
+ LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+
+ CuratorFramework client = spy(ZooKeeper.getClient());
+ when(client.create()).thenThrow(new RuntimeException("Expected test Exception."));
+
+ ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+ client, stateHandleProvider);
+
+ // Config
+ final String pathInZooKeeper = "/testAddDiscardStateHandleAfterFailure";
+ final Long state = 81282227L;
+
+ try {
+ // Test
+ store.add(pathInZooKeeper, state);
+ fail("Did not throw expected exception");
+ }
+ catch (Exception ignored) {
+ }
+
+ // Verify
+ // State handle created and discarded
+ assertEquals(1, stateHandleProvider.getStateHandles().size());
+ assertEquals(state, stateHandleProvider.getStateHandles().get(0).getState(null));
+ assertEquals(1, stateHandleProvider.getStateHandles().get(0).getNumberOfDiscardCalls());
+ }
+
+ /**
+ * Tests that a state handle is replaced.
+ */
+ @Test
+ public void testReplace() throws Exception {
+ // Setup
+ LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+
+ ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+ ZooKeeper.getClient(), stateHandleProvider);
+
+ // Config
+ final String pathInZooKeeper = "/testReplace";
+ final Long initialState = 30968470898L;
+ final Long replaceState = 88383776661L;
+
+ // Test
+ store.add(pathInZooKeeper, initialState);
+ store.replace(pathInZooKeeper, 0, replaceState);
+
+ // Verify
+ // State handles created
+ assertEquals(2, stateHandleProvider.getStateHandles().size());
+ assertEquals(initialState, stateHandleProvider.getStateHandles().get(0).getState(null));
+ assertEquals(replaceState, stateHandleProvider.getStateHandles().get(1).getState(null));
+
+ // Path created and is persistent
+ Stat stat = ZooKeeper.getClient().checkExists().forPath(pathInZooKeeper);
+ assertNotNull(stat);
+ assertEquals(0, stat.getEphemeralOwner());
+
+ // Data is equal
+ @SuppressWarnings("unchecked")
+ Long actual = ((StateHandle<Long>) InstantiationUtil.deserializeObject(
+ ZooKeeper.getClient().getData().forPath(pathInZooKeeper),
+ ClassLoader.getSystemClassLoader())).getState(null);
+
+ assertEquals(replaceState, actual);
+ }
+
+ /**
+ * Tests that a non existing path throws an Exception.
+ */
+ @Test(expected = Exception.class)
+ public void testReplaceNonExistingPath() throws Exception {
+ StateHandleProvider<Long> stateHandleProvider = new LongStateHandleProvider();
+
+ ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+ ZooKeeper.getClient(), stateHandleProvider);
+
+ store.replace("/testReplaceNonExistingPath", 0, 1L);
+ }
+
+ /**
+ * Tests that the replace state handle is discarded if ZooKeeper setData fails.
+ */
+ @Test
+ public void testReplaceDiscardStateHandleAfterFailure() throws Exception {
+ // Setup
+ LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+
+ CuratorFramework client = spy(ZooKeeper.getClient());
+ when(client.setData()).thenThrow(new RuntimeException("Expected test Exception."));
+
+ ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+ client, stateHandleProvider);
+
+ // Config
+ final String pathInZooKeeper = "/testReplaceDiscardStateHandleAfterFailure";
+ final Long initialState = 30968470898L;
+ final Long replaceState = 88383776661L;
+
+ // Test
+ store.add(pathInZooKeeper, initialState);
+
+ try {
+ store.replace(pathInZooKeeper, 0, replaceState);
+ fail("Did not throw expected exception");
+ }
+ catch (Exception ignored) {
+ }
+
+ // Verify
+ // State handle created and discarded
+ assertEquals(2, stateHandleProvider.getStateHandles().size());
+ assertEquals(initialState, stateHandleProvider.getStateHandles().get(0).getState(null));
+ assertEquals(replaceState, stateHandleProvider.getStateHandles().get(1).getState(null));
+ assertEquals(1, stateHandleProvider.getStateHandles().get(1).getNumberOfDiscardCalls());
+
+ // Initial value
+ @SuppressWarnings("unchecked")
+ Long actual = ((StateHandle<Long>) InstantiationUtil.deserializeObject(
+ ZooKeeper.getClient().getData().forPath(pathInZooKeeper),
+ ClassLoader.getSystemClassLoader())).getState(null);
+
+ assertEquals(initialState, actual);
+ }
+
+ /**
+ * Tests get operation.
+ */
+ @Test
+ public void testGetAndExists() throws Exception {
+ // Setup
+ LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+
+ ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+ ZooKeeper.getClient(), stateHandleProvider);
+
+ // Config
+ final String pathInZooKeeper = "/testGetAndExists";
+ final Long state = 311222268470898L;
+
+ // Test
+ assertEquals(-1, store.exists(pathInZooKeeper));
+
+ store.add(pathInZooKeeper, state);
+ StateHandle<Long> actual = store.get(pathInZooKeeper);
+
+ // Verify
+ assertEquals(state, actual.getState(null));
+ assertTrue(store.exists(pathInZooKeeper) >= 0);
+ }
+
+ /**
+ * Tests that a non existing path throws an Exception.
+ */
+ @Test(expected = Exception.class)
+ public void testGetNonExistingPath() throws Exception {
+ LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+
+ ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+ ZooKeeper.getClient(), stateHandleProvider);
+
+ store.get("/testGetNonExistingPath");
+ }
+
+ /**
+ * Tests that all added state is returned.
+ */
+ @Test
+ public void testGetAll() throws Exception {
+ // Setup
+ LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+
+ ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+ ZooKeeper.getClient(), stateHandleProvider);
+
+ // Config
+ final String pathInZooKeeper = "/testGetAll";
+
+ final Set<Long> expected = new HashSet<>();
+ expected.add(311222268470898L);
+ expected.add(132812888L);
+ expected.add(27255442L);
+ expected.add(11122233124L);
+
+ // Test
+ for (long val : expected) {
+ store.add(pathInZooKeeper, val, CreateMode.PERSISTENT_SEQUENTIAL);
+ }
+
+ for (Tuple2<StateHandle<Long>, String> val : store.getAll()) {
+ assertTrue(expected.remove(val.f0.getState(null)));
+ }
+ assertEquals(0, expected.size());
+ }
+
+ /**
+ * Tests that the state is returned sorted.
+ */
+ @Test
+ public void testGetAllSortedByName() throws Exception {
+ // Setup
+ LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+
+ ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+ ZooKeeper.getClient(), stateHandleProvider);
+
+ // Config
+ final String pathInZooKeeper = "/testGetAllSortedByName";
+
+ final Long[] expected = new Long[] {
+ 311222268470898L, 132812888L, 27255442L, 11122233124L };
+
+ // Test
+ for (long val : expected) {
+ store.add(pathInZooKeeper, val, CreateMode.PERSISTENT_SEQUENTIAL);
+ }
+
+ List<Tuple2<StateHandle<Long>, String>> actual = store.getAllSortedByName();
+ assertEquals(expected.length, actual.size());
+
+ for (int i = 0; i < expected.length; i++) {
+ assertEquals(expected[i], actual.get(i).f0.getState(null));
+ }
+ }
+
+ /**
+ * Tests that state handles are correctly removed.
+ */
+ @Test
+ public void testRemove() throws Exception {
+ // Setup
+ LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+
+ ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+ ZooKeeper.getClient(), stateHandleProvider);
+
+ // Config
+ final String pathInZooKeeper = "/testRemove";
+ final Long state = 27255442L;
+
+ store.add(pathInZooKeeper, state);
+
+ // Test
+ store.remove(pathInZooKeeper);
+
+ // Verify discarded
+ assertEquals(0, ZooKeeper.getClient().getChildren().forPath("/").size());
+ }
+
+ /**
+ * Tests that state handles are correctly removed with a callback.
+ */
+ @Test
+ public void testRemoveWithCallback() throws Exception {
+ // Setup
+ LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+
+ ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+ ZooKeeper.getClient(), stateHandleProvider);
+
+ // Config
+ final String pathInZooKeeper = "/testRemoveWithCallback";
+ final Long state = 27255442L;
+
+ store.add(pathInZooKeeper, state);
+
+ final CountDownLatch sync = new CountDownLatch(1);
+ BackgroundCallback callback = mock(BackgroundCallback.class);
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ sync.countDown();
+ return null;
+ }
+ }).when(callback).processResult(eq(ZooKeeper.getClient()), any(CuratorEvent.class));
+
+ // Test
+ store.remove(pathInZooKeeper, callback);
+
+ // Verify discarded and callback called
+ assertEquals(0, ZooKeeper.getClient().getChildren().forPath("/").size());
+
+ sync.await();
+
+ verify(callback, times(1))
+ .processResult(eq(ZooKeeper.getClient()), any(CuratorEvent.class));
+ }
+
+ /**
+ * Tests that state handles are correctly discarded.
+ */
+ @Test
+ public void testRemoveAndDiscardState() throws Exception {
+ // Setup
+ LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+
+ ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+ ZooKeeper.getClient(), stateHandleProvider);
+
+ // Config
+ final String pathInZooKeeper = "/testDiscard";
+ final Long state = 27255442L;
+
+ store.add(pathInZooKeeper, state);
+
+ // Test
+ store.removeAndDiscardState(pathInZooKeeper);
+
+ // Verify discarded
+ assertEquals(0, ZooKeeper.getClient().getChildren().forPath("/").size());
+ }
+
+ /** Tests that all state handles are correctly discarded. */
+ @Test
+ public void testRemoveAndDiscardAllState() throws Exception {
+ // Setup
+ LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+
+ ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+ ZooKeeper.getClient(), stateHandleProvider);
+
+ // Config
+ final String pathInZooKeeper = "/testDiscardAll";
+
+ final Set<Long> expected = new HashSet<>();
+ expected.add(311222268470898L);
+ expected.add(132812888L);
+ expected.add(27255442L);
+ expected.add(11122233124L);
+
+ // Test
+ for (long val : expected) {
+ store.add(pathInZooKeeper, val, CreateMode.PERSISTENT_SEQUENTIAL);
+ }
+
+ store.removeAndDiscardAllState();
+
+ // Verify all discarded
+ assertEquals(0, ZooKeeper.getClient().getChildren().forPath("/").size());
+ }
+
+ // ---------------------------------------------------------------------------------------------
+ // Simple test helpers
+ // ---------------------------------------------------------------------------------------------
+
+ private static class LongStateHandleProvider implements StateHandleProvider<Long> {
+
+ private static final long serialVersionUID = 4572084854499402276L;
+
+ private final List<LongStateHandle> stateHandles = new ArrayList<>();
+
+ @Override
+ public StateHandle<Long> createStateHandle(Long state) {
+ LongStateHandle stateHandle = new LongStateHandle(state);
+ stateHandles.add(stateHandle);
+
+ return stateHandle;
+ }
+
+ public List<LongStateHandle> getStateHandles() {
+ return stateHandles;
+ }
+ }
+
+ private static class LongStateHandle implements StateHandle<Long> {
+
+ private static final long serialVersionUID = -3555329254423838912L;
+
+ private final Long state;
+
+ private int numberOfDiscardCalls;
+
+ public LongStateHandle(Long state) {
+ this.state = state;
+ }
+
+ @Override
+ public Long getState(ClassLoader ignored) throws Exception {
+ return state;
+ }
+
+ @Override
+ public void discardState() throws Exception {
+ numberOfDiscardCalls++;
+ }
+
+ public int getNumberOfDiscardCalls() {
+ return numberOfDiscardCalls;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
new file mode 100644
index 0000000..7ae89d1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.test.TestingCluster;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+
+/**
+ * Simple ZooKeeper and CuratorFramework setup for tests.
+ */
+public class ZooKeeperTestEnvironment {
+
+ private final TestingServer zooKeeperServer;
+
+ private final TestingCluster zooKeeperCluster;
+
+ private final CuratorFramework client;
+
+ /**
+ * Starts a ZooKeeper cluster with the number of quorum peers and a client.
+ *
+ * @param numberOfZooKeeperQuorumPeers Starts a {@link TestingServer}, if <code>1</code>.
+ * Starts a {@link TestingCluster}, if <code>=>1</code>.
+ */
+ public ZooKeeperTestEnvironment(int numberOfZooKeeperQuorumPeers) {
+ if (numberOfZooKeeperQuorumPeers <= 0) {
+ throw new IllegalArgumentException("Number of peers needs to be >= 1.");
+ }
+
+ final Configuration conf = new Configuration();
+
+ try {
+ if (numberOfZooKeeperQuorumPeers == 1) {
+ zooKeeperServer = new TestingServer(true);
+ zooKeeperCluster = null;
+
+ conf.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY,
+ zooKeeperServer.getConnectString());
+ }
+ else {
+ zooKeeperServer = null;
+ zooKeeperCluster = new TestingCluster(numberOfZooKeeperQuorumPeers);
+
+ zooKeeperCluster.start();
+
+ conf.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY,
+ zooKeeperCluster.getConnectString());
+ }
+
+ client = ZooKeeperUtils.startCuratorFramework(conf);
+
+ client.newNamespaceAwareEnsurePath("/")
+ .ensure(client.getZookeeperClient());
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Error setting up ZooKeeperTestEnvironment", e);
+ }
+ }
+
+ /**
+ * Shutdown the client and ZooKeeper server/cluster.
+ */
+ public void shutdown() throws Exception {
+ if (client != null) {
+ client.close();
+ }
+
+ if (zooKeeperServer != null) {
+ zooKeeperServer.close();
+ }
+
+ if (zooKeeperCluster != null) {
+ zooKeeperCluster.close();
+ }
+ }
+
+ public String getConnectString() {
+ if (zooKeeperServer != null) {
+ return zooKeeperServer.getConnectString();
+ }
+ else {
+ return zooKeeperCluster.getConnectString();
+ }
+ }
+
+ /**
+ * Returns a client for the started ZooKeeper server/cluster.
+ */
+ public CuratorFramework getClient() {
+ return client;
+ }
+
+ /**
+ * Creates a new client for the started ZooKeeper server/cluster.
+ */
+ public CuratorFramework createClient() {
+ Configuration config = new Configuration();
+ config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, getConnectString());
+ return ZooKeeperUtils.startCuratorFramework(config);
+ }
+
+ /**
+ * Deletes all ZNodes under the root node.
+ *
+ * @throws Exception If the ZooKeeper operation fails
+ */
+ public void deleteAll() throws Exception {
+ final String path = "/" + client.getNamespace();
+ ZKPaths.deleteChildren(client.getZookeeperClient().getZooKeeper(), path, false);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
index d1b8fac..9a1cde0 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
@@ -18,9 +18,12 @@
package org.apache.flink.runtime.executiongraph
+import java.util.concurrent.TimeUnit
+
import org.apache.flink.api.common.JobID
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.execution.ExecutionState
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway
import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, JobVertex}
import org.apache.flink.runtime.jobmanager.Tasks
@@ -32,6 +35,7 @@ import org.scalatest.junit.JUnitRunner
import org.scalatest.{Matchers, WordSpecLike}
import scala.collection.JavaConverters._
+import scala.concurrent.duration.FiniteDuration
@RunWith(classOf[JUnitRunner])
class ExecutionGraphRestartTest extends WordSpecLike with Matchers {
@@ -126,8 +130,23 @@ class ExecutionGraphRestartTest extends WordSpecLike with Matchers {
for (vertex <- eg.getAllExecutionVertices.asScala) {
vertex.getCurrentExecutionAttempt().cancelingComplete()
}
-
+
+ val timeout = new FiniteDuration(2, TimeUnit.MINUTES)
+
+ // Wait for async restart
+ var deadline = timeout.fromNow
+ while (deadline.hasTimeLeft() && eg.getState != JobStatus.RUNNING) {
+ Thread.sleep(100)
+ }
+
eg.getState should equal(JobStatus.RUNNING)
+
+ // Wait for deploying after async restart
+ deadline = timeout.fromNow
+ while (deadline.hasTimeLeft() && eg.getAllExecutionVertices.asScala.exists(
+ _.getCurrentExecutionAttempt.getState != ExecutionState.DEPLOYING)) {
+ Thread.sleep(100)
+ }
for (vertex <- eg.getAllExecutionVertices.asScala) {
vertex.getCurrentExecutionAttempt().markFinished()
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index c9ae1e4..703d7bf 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -43,28 +43,28 @@ import scala.concurrent.{Await, Future}
* otherwise false
*/
class TestingCluster(
- userConfiguration: Configuration,
- singleActorSystem: Boolean,
- synchronousDispatcher: Boolean,
- streamingMode: StreamingMode)
+ userConfiguration: Configuration,
+ singleActorSystem: Boolean,
+ synchronousDispatcher: Boolean,
+ streamingMode: StreamingMode)
extends FlinkMiniCluster(
userConfiguration,
singleActorSystem,
streamingMode) {
-
+
def this(userConfiguration: Configuration,
singleActorSystem: Boolean,
synchronousDispatcher: Boolean)
- = this(userConfiguration, singleActorSystem, synchronousDispatcher, StreamingMode.BATCH_ONLY)
+ = this(userConfiguration, singleActorSystem, synchronousDispatcher, StreamingMode.BATCH_ONLY)
def this(userConfiguration: Configuration, singleActorSystem: Boolean)
- = this(userConfiguration, singleActorSystem, false)
+ = this(userConfiguration, singleActorSystem, false)
def this(userConfiguration: Configuration) = this(userConfiguration, true, false)
-
+
// --------------------------------------------------------------------------
-
+
override def generateConfiguration(userConfig: Configuration): Configuration = {
val cfg = new Configuration()
cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost")
@@ -100,16 +100,18 @@ class TestingCluster(
}
val (executionContext,
- instanceManager,
- scheduler,
- libraryCacheManager,
- executionRetries,
- delayBetweenRetries,
- timeout,
- archiveCount,
- leaderElectionService) = JobManager.createJobManagerComponents(
- config,
- createLeaderElectionService())
+ instanceManager,
+ scheduler,
+ libraryCacheManager,
+ executionRetries,
+ delayBetweenRetries,
+ timeout,
+ archiveCount,
+ leaderElectionService,
+ submittedJobsGraphs,
+ checkpointRecoveryFactory) = JobManager.createJobManagerComponents(
+ config,
+ createLeaderElectionService())
val testArchiveProps = Props(new TestingMemoryArchivist(archiveCount))
val archive = actorSystem.actorOf(testArchiveProps, archiveName)
@@ -126,7 +128,9 @@ class TestingCluster(
delayBetweenRetries,
timeout,
streamingMode,
- leaderElectionService))
+ leaderElectionService,
+ submittedJobsGraphs,
+ checkpointRecoveryFactory))
val dispatcherJobManagerProps = if (synchronousDispatcher) {
// disable asynchronous futures (e.g. accumulator update in Heartbeat)
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index 312a1e5..be72003 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -18,32 +18,18 @@
package org.apache.flink.runtime.testingUtils
-import akka.actor.{Cancellable, Terminated, ActorRef}
-import akka.pattern.pipe
-import akka.pattern.ask
-import org.apache.flink.api.common.JobID
+import akka.actor.ActorRef
import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.leaderelection.LeaderElectionService
-import org.apache.flink.runtime.{StreamingMode, FlinkActor}
import org.apache.flink.runtime.StreamingMode
-import org.apache.flink.runtime.execution.ExecutionState
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
import org.apache.flink.runtime.instance.InstanceManager
-import org.apache.flink.runtime.jobgraph.JobStatus
-import org.apache.flink.runtime.jobmanager.JobManager
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
-import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
-import org.apache.flink.runtime.messages.JobManagerMessages.GrantLeadership
-import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
-import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
-import org.apache.flink.runtime.testingUtils.TestingMessages.{CheckIfJobRemoved, Alive,
-DisableDisconnect}
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged
+import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore}
+import org.apache.flink.runtime.leaderelection.LeaderElectionService
-import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
-
import scala.language.postfixOps
/** JobManager implementation extended by testing messages
@@ -70,7 +56,9 @@ class TestingJobManager(
delayBetweenRetries: Long,
timeout: FiniteDuration,
mode: StreamingMode,
- leaderElectionService: LeaderElectionService)
+ leaderElectionService: LeaderElectionService,
+ submittedJobGraphs : SubmittedJobGraphStore,
+ checkpointRecoveryFactory : CheckpointRecoveryFactory)
extends JobManager(
flinkConfiguration,
executionContext,
@@ -82,5 +70,7 @@ class TestingJobManager(
delayBetweenRetries,
timeout,
mode,
- leaderElectionService)
+ leaderElectionService,
+ submittedJobGraphs,
+ checkpointRecoveryFactory)
with TestingJobManagerLike {}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index b607433..72a8c25 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
+import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -541,7 +542,6 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
}
}
}
- }
/**
* Registers a timer.
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index c7f7698..11eb174 100644
--- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -20,22 +20,20 @@ package org.apache.flink.test.util
import java.util.concurrent.TimeoutException
-import akka.pattern.ask
-import akka.actor.{Props, ActorRef, ActorSystem}
+import akka.actor.{ActorRef, ActorSystem}
import akka.pattern.Patterns._
+import akka.pattern.ask
import org.apache.curator.test.TestingCluster
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.StreamingMode
import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.jobmanager.{RecoveryMode, JobManager}
+import org.apache.flink.runtime.jobmanager.{JobManager, RecoveryMode}
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
import org.apache.flink.runtime.taskmanager.TaskManager
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages
-.NotifyWhenRegisteredAtJobManager
-import org.apache.flink.runtime.testingUtils.{TestingUtils, TestingTaskManager,
-TestingJobManager, TestingMemoryArchivist}
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager
+import org.apache.flink.runtime.testingUtils.{TestingJobManager, TestingMemoryArchivist, TestingTaskManager, TestingUtils}
-import scala.concurrent.{Future, Await}
+import scala.concurrent.{Await, Future}
/**
* A forkable mini cluster is a special case of the mini cluster, used for parallel test execution
@@ -47,20 +45,20 @@ import scala.concurrent.{Future, Await}
* same [[ActorSystem]], otherwise false.
*/
class ForkableFlinkMiniCluster(
- userConfiguration: Configuration,
- singleActorSystem: Boolean,
- streamingMode: StreamingMode)
+ userConfiguration: Configuration,
+ singleActorSystem: Boolean,
+ streamingMode: StreamingMode)
extends LocalFlinkMiniCluster(userConfiguration, singleActorSystem, streamingMode) {
- def this(userConfiguration: Configuration, singleActorSystem: Boolean)
- = this(userConfiguration, singleActorSystem, StreamingMode.BATCH_ONLY)
+ def this(userConfiguration: Configuration, singleActorSystem: Boolean)
+ = this(userConfiguration, singleActorSystem, StreamingMode.BATCH_ONLY)
def this(userConfiguration: Configuration) = this(userConfiguration, true)
-
+
// --------------------------------------------------------------------------
var zookeeperCluster: Option[TestingCluster] = None
-
+
override def generateConfiguration(userConfiguration: Configuration): Configuration = {
val forNumberString = System.getProperty("forkNumber")
@@ -264,10 +262,10 @@ object ForkableFlinkMiniCluster {
import org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT
def startCluster(
- numSlots: Int,
- numTaskManagers: Int,
- timeout: String = DEFAULT_AKKA_ASK_TIMEOUT)
- : ForkableFlinkMiniCluster = {
+ numSlots: Int,
+ numTaskManagers: Int,
+ timeout: String = DEFAULT_AKKA_ASK_TIMEOUT)
+ : ForkableFlinkMiniCluster = {
val config = new Configuration()
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)