You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/10/20 09:58:55 UTC
[04/47] flink git commit: [FLINK-2354] [runtime] Add job graph and
checkpoint recovery
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 1330b66..f6ee5c5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -18,14 +18,13 @@
package org.apache.flink.runtime.checkpoint;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
@@ -34,13 +33,25 @@ import org.junit.Test;
import java.util.Iterator;
import java.util.List;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
/**
* Tests for the checkpoint coordinator.
*/
public class CheckpointCoordinatorTest {
-
+
private static final ClassLoader cl = Thread.currentThread().getContextClassLoader();
-
+
@Test
public void testCheckpointAbortsIfTriggerTasksAreNotExecuted() {
try {
@@ -50,7 +61,7 @@ public class CheckpointCoordinatorTest {
// create some mock Execution vertices that receive the checkpoint trigger messages
ExecutionVertex triggerVertex1 = mock(ExecutionVertex.class);
ExecutionVertex triggerVertex2 = mock(ExecutionVertex.class);
-
+
// create some mock Execution vertices that need to ack the checkpoint
final ExecutionAttemptID ackAttemptID1 = new ExecutionAttemptID();
final ExecutionAttemptID ackAttemptID2 = new ExecutionAttemptID();
@@ -59,10 +70,12 @@ public class CheckpointCoordinatorTest {
// set up the coordinator and validate the initial state
CheckpointCoordinator coord = new CheckpointCoordinator(
- jid, 1, 600000,
+ jid, 600000,
new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
new ExecutionVertex[] { ackVertex1, ackVertex2 },
- new ExecutionVertex[] {}, cl );
+ new ExecutionVertex[] {}, cl,
+ new StandaloneCheckpointIDCounter(),
+ new StandaloneCompletedCheckpointStore(1, cl), RecoveryMode.STANDALONE);
// nothing should be happening
assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -103,10 +116,12 @@ public class CheckpointCoordinatorTest {
// set up the coordinator and validate the initial state
CheckpointCoordinator coord = new CheckpointCoordinator(
- jid, 1, 600000,
+ jid, 600000,
new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
new ExecutionVertex[] { ackVertex1, ackVertex2 },
- new ExecutionVertex[] {}, cl );
+ new ExecutionVertex[] {}, cl,
+ new StandaloneCheckpointIDCounter(),
+ new StandaloneCompletedCheckpointStore(1, cl), RecoveryMode.STANDALONE);
// nothing should be happening
assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -138,17 +153,18 @@ public class CheckpointCoordinatorTest {
final ExecutionAttemptID triggerAttemptID2 = new ExecutionAttemptID();
ExecutionVertex triggerVertex1 = mockExecutionVertex(triggerAttemptID1);
ExecutionVertex triggerVertex2 = mockExecutionVertex(triggerAttemptID2);
-
+
// create some mock Execution vertices that receive the checkpoint trigger messages
ExecutionVertex ackVertex1 = mock(ExecutionVertex.class);
ExecutionVertex ackVertex2 = mock(ExecutionVertex.class);
// set up the coordinator and validate the initial state
CheckpointCoordinator coord = new CheckpointCoordinator(
- jid, 1, 600000,
+ jid, 600000,
new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
new ExecutionVertex[] { ackVertex1, ackVertex2 },
- new ExecutionVertex[] {}, cl );
+ new ExecutionVertex[] {}, cl, new StandaloneCheckpointIDCounter(), new
+ StandaloneCompletedCheckpointStore(1, cl), RecoveryMode.STANDALONE);
// nothing should be happening
assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -168,13 +184,13 @@ public class CheckpointCoordinatorTest {
fail(e.getMessage());
}
}
-
+
@Test
public void testTriggerAndConfirmSimpleCheckpoint() {
try {
final JobID jid = new JobID();
final long timestamp = System.currentTimeMillis();
-
+
// create some mock Execution vertices that receive the checkpoint trigger messages
final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
final ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
@@ -183,24 +199,26 @@ public class CheckpointCoordinatorTest {
// set up the coordinator and validate the initial state
CheckpointCoordinator coord = new CheckpointCoordinator(
- jid, 1, 600000,
+ jid, 600000,
new ExecutionVertex[] { vertex1, vertex2 },
new ExecutionVertex[] { vertex1, vertex2 },
- new ExecutionVertex[] { vertex1, vertex2 }, cl);
-
+ new ExecutionVertex[] { vertex1, vertex2 }, cl,
+ new StandaloneCheckpointIDCounter(),
+ new StandaloneCompletedCheckpointStore(1, cl), RecoveryMode.STANDALONE);
+
assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
-
+
// trigger the first checkpoint. this should succeed
assertTrue(coord.triggerCheckpoint(timestamp));
-
+
// validate that we have a pending checkpoint
assertEquals(1, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
-
+
long checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
PendingCheckpoint checkpoint = coord.getPendingCheckpoints().get(checkpointId);
-
+
assertNotNull(checkpoint);
assertEquals(checkpointId, checkpoint.getCheckpointId());
assertEquals(timestamp, checkpoint.getCheckpointTimestamp());
@@ -210,7 +228,7 @@ public class CheckpointCoordinatorTest {
assertEquals(0, checkpoint.getCollectedStates().size());
assertFalse(checkpoint.isDiscarded());
assertFalse(checkpoint.isFullyAcknowledged());
-
+
// check that the vertices received the trigger checkpoint message
{
TriggerCheckpoint expectedMessage1 = new TriggerCheckpoint(jid, attemptID1, checkpointId, timestamp);
@@ -218,7 +236,7 @@ public class CheckpointCoordinatorTest {
verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(expectedMessage1), eq(attemptID1));
verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(expectedMessage2), eq(attemptID2));
}
-
+
// acknowledge from one of the tasks
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
assertEquals(1, checkpoint.getNumberOfAcknowledgedTasks());
@@ -233,15 +251,15 @@ public class CheckpointCoordinatorTest {
// acknowledge the other task.
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId));
-
+
// the checkpoint is internally converted to a successful checkpoint and the
// pending checkpoint object is disposed
assertTrue(checkpoint.isDiscarded());
-
+
// the now we should have a completed checkpoint
assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
assertEquals(0, coord.getNumberOfPendingCheckpoints());
-
+
// validate that the relevant tasks got a confirmation message
{
NotifyCheckpointComplete confirmMessage1 = new NotifyCheckpointComplete(jid, attemptID1, checkpointId, timestamp);
@@ -249,13 +267,13 @@ public class CheckpointCoordinatorTest {
verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(confirmMessage1), eq(attemptID1));
verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(confirmMessage2), eq(attemptID2));
}
-
- SuccessfulCheckpoint success = coord.getSuccessfulCheckpoints().get(0);
+
+ CompletedCheckpoint success = coord.getSuccessfulCheckpoints().get(0);
assertEquals(jid, success.getJobId());
assertEquals(timestamp, success.getTimestamp());
assertEquals(checkpoint.getCheckpointId(), success.getCheckpointID());
assertTrue(success.getStates().isEmpty());
-
+
// ---------------
// trigger another checkpoint and see that this one replaces the other checkpoint
// ---------------
@@ -265,11 +283,11 @@ public class CheckpointCoordinatorTest {
long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointIdNew));
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointIdNew));
-
+
assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
-
- SuccessfulCheckpoint successNew = coord.getSuccessfulCheckpoints().get(0);
+
+ CompletedCheckpoint successNew = coord.getSuccessfulCheckpoints().get(0);
assertEquals(jid, successNew.getJobId());
assertEquals(timestampNew, successNew.getTimestamp());
assertEquals(checkpointIdNew, successNew.getCheckpointID());
@@ -295,8 +313,7 @@ public class CheckpointCoordinatorTest {
fail(e.getMessage());
}
}
-
-
+
@Test
public void testMultipleConcurrentCheckpoints() {
try {
@@ -305,7 +322,7 @@ public class CheckpointCoordinatorTest {
final long timestamp2 = timestamp1 + 8617;
// create some mock execution vertices
-
+
final ExecutionAttemptID triggerAttemptID1 = new ExecutionAttemptID();
final ExecutionAttemptID triggerAttemptID2 = new ExecutionAttemptID();
@@ -314,23 +331,25 @@ public class CheckpointCoordinatorTest {
final ExecutionAttemptID ackAttemptID3 = new ExecutionAttemptID();
final ExecutionAttemptID commitAttemptID = new ExecutionAttemptID();
-
+
ExecutionVertex triggerVertex1 = mockExecutionVertex(triggerAttemptID1);
ExecutionVertex triggerVertex2 = mockExecutionVertex(triggerAttemptID2);
ExecutionVertex ackVertex1 = mockExecutionVertex(ackAttemptID1);
ExecutionVertex ackVertex2 = mockExecutionVertex(ackAttemptID2);
ExecutionVertex ackVertex3 = mockExecutionVertex(ackAttemptID3);
-
+
ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID);
-
+
// set up the coordinator and validate the initial state
CheckpointCoordinator coord = new CheckpointCoordinator(
- jid, 2, 600000,
+ jid, 600000,
new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 },
- new ExecutionVertex[] { commitVertex }, cl);
-
+ new ExecutionVertex[] { commitVertex }, cl,
+ new StandaloneCheckpointIDCounter(),
+ new StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE);
+
assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -339,7 +358,7 @@ public class CheckpointCoordinatorTest {
assertEquals(1, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
-
+
PendingCheckpoint pending1 = coord.getPendingCheckpoints().values().iterator().next();
long checkpointId1 = pending1.getCheckpointId();
@@ -348,10 +367,10 @@ public class CheckpointCoordinatorTest {
new TriggerCheckpoint(jid, triggerAttemptID1, checkpointId1, timestamp1), triggerAttemptID1);
verify(triggerVertex2, times(1)).sendMessageToCurrentExecution(
new TriggerCheckpoint(jid, triggerAttemptID2, checkpointId1, timestamp1), triggerAttemptID2);
-
+
// acknowledge one of the three tasks
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId1));
-
+
// start the second checkpoint
// trigger the first checkpoint. this should succeed
assertTrue(coord.triggerCheckpoint(timestamp2));
@@ -373,23 +392,23 @@ public class CheckpointCoordinatorTest {
new TriggerCheckpoint(jid, triggerAttemptID1, checkpointId2, timestamp2), triggerAttemptID1);
verify(triggerVertex2, times(1)).sendMessageToCurrentExecution(
new TriggerCheckpoint(jid, triggerAttemptID2, checkpointId2, timestamp2), triggerAttemptID2);
-
+
// we acknowledge the remaining two tasks from the first
// checkpoint and two tasks from the second checkpoint
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1));
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId2));
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId1));
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId2));
-
+
// now, the first checkpoint should be confirmed
assertEquals(1, coord.getNumberOfPendingCheckpoints());
assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
assertTrue(pending1.isDiscarded());
-
+
// the first confirm message should be out
verify(commitVertex, times(1)).sendMessageToCurrentExecution(
new NotifyCheckpointComplete(jid, commitAttemptID, checkpointId1, timestamp1), commitAttemptID);
-
+
// send the last remaining ack for the second checkpoint
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2));
@@ -401,17 +420,17 @@ public class CheckpointCoordinatorTest {
// the second commit message should be out
verify(commitVertex, times(1)).sendMessageToCurrentExecution(
new NotifyCheckpointComplete(jid, commitAttemptID, checkpointId2, timestamp2), commitAttemptID);
-
+
// validate the committed checkpoints
- List<SuccessfulCheckpoint> scs = coord.getSuccessfulCheckpoints();
-
- SuccessfulCheckpoint sc1 = scs.get(0);
+ List<CompletedCheckpoint> scs = coord.getSuccessfulCheckpoints();
+
+ CompletedCheckpoint sc1 = scs.get(0);
assertEquals(checkpointId1, sc1.getCheckpointID());
assertEquals(timestamp1, sc1.getTimestamp());
assertEquals(jid, sc1.getJobId());
assertTrue(sc1.getStates().isEmpty());
-
- SuccessfulCheckpoint sc2 = scs.get(1);
+
+ CompletedCheckpoint sc2 = scs.get(1);
assertEquals(checkpointId2, sc2.getCheckpointID());
assertEquals(timestamp2, sc2.getTimestamp());
assertEquals(jid, sc2.getJobId());
@@ -453,10 +472,12 @@ public class CheckpointCoordinatorTest {
// set up the coordinator and validate the initial state
CheckpointCoordinator coord = new CheckpointCoordinator(
- jid, 10, 600000,
+ jid, 600000,
new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 },
- new ExecutionVertex[] { commitVertex }, cl);
+ new ExecutionVertex[] { commitVertex }, cl,
+ new StandaloneCheckpointIDCounter(),
+ new StandaloneCompletedCheckpointStore(10, cl), RecoveryMode.STANDALONE);
assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -513,13 +534,13 @@ public class CheckpointCoordinatorTest {
// into a successful checkpoint
assertTrue(pending1.isDiscarded());
assertTrue(pending2.isDiscarded());
-
+
assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
// validate the committed checkpoints
- List<SuccessfulCheckpoint> scs = coord.getSuccessfulCheckpoints();
- SuccessfulCheckpoint success = scs.get(0);
+ List<CompletedCheckpoint> scs = coord.getSuccessfulCheckpoints();
+ CompletedCheckpoint success = scs.get(0);
assertEquals(checkpointId2, success.getCheckpointID());
assertEquals(timestamp2, success.getTimestamp());
assertEquals(jid, success.getJobId());
@@ -531,7 +552,7 @@ public class CheckpointCoordinatorTest {
// send the last remaining ack for the first checkpoint. This should not do anything
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1));
-
+
coord.shutdown();
}
catch (Exception e) {
@@ -539,8 +560,7 @@ public class CheckpointCoordinatorTest {
fail(e.getMessage());
}
}
-
-
+
@Test
public void testCheckpointTimeoutIsolated() {
try {
@@ -565,22 +585,24 @@ public class CheckpointCoordinatorTest {
// set up the coordinator
// the timeout for the checkpoint is a 200 milliseconds
-
+
CheckpointCoordinator coord = new CheckpointCoordinator(
- jid, 2, 200,
+ jid, 200,
new ExecutionVertex[] { triggerVertex },
new ExecutionVertex[] { ackVertex1, ackVertex2 },
- new ExecutionVertex[] { commitVertex }, cl);
-
+ new ExecutionVertex[] { commitVertex }, cl,
+ new StandaloneCheckpointIDCounter(),
+ new StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE);
+
// trigger a checkpoint, partially acknowledged
assertTrue(coord.triggerCheckpoint(timestamp));
assertEquals(1, coord.getNumberOfPendingCheckpoints());
-
+
PendingCheckpoint checkpoint = coord.getPendingCheckpoints().values().iterator().next();
assertFalse(checkpoint.isDiscarded());
-
+
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpoint.getCheckpointId()));
-
+
// wait until the checkpoint must have expired.
// we check every 250 msecs conservatively for 5 seconds
// to give even slow build servers a very good chance of completing this
@@ -591,7 +613,7 @@ public class CheckpointCoordinatorTest {
while (!checkpoint.isDiscarded() &&
coord.getNumberOfPendingCheckpoints() > 0 &&
System.currentTimeMillis() < deadline);
-
+
assertTrue("Checkpoint was not canceled by the timeout", checkpoint.isDiscarded());
assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -599,7 +621,7 @@ public class CheckpointCoordinatorTest {
// no confirm message must have been sent
verify(commitVertex, times(0))
.sendMessageToCurrentExecution(any(NotifyCheckpointComplete.class), any(ExecutionAttemptID.class));
-
+
coord.shutdown();
}
catch (Exception e) {
@@ -607,7 +629,7 @@ public class CheckpointCoordinatorTest {
fail(e.getMessage());
}
}
-
+
@Test
public void handleMessagesForNonExistingCheckpoints() {
try {
@@ -625,27 +647,28 @@ public class CheckpointCoordinatorTest {
ExecutionVertex ackVertex1 = mockExecutionVertex(ackAttemptID1);
ExecutionVertex ackVertex2 = mockExecutionVertex(ackAttemptID2);
ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID);
-
+
CheckpointCoordinator coord = new CheckpointCoordinator(
- jid, 2, 200000,
+ jid, 200000,
new ExecutionVertex[] { triggerVertex },
new ExecutionVertex[] { ackVertex1, ackVertex2 },
- new ExecutionVertex[] { commitVertex }, cl);
+ new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter
+ (), new StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE);
assertTrue(coord.triggerCheckpoint(timestamp));
-
+
long checkpointId = coord.getPendingCheckpoints().keySet().iterator().next();
-
+
// send some messages that do not belong to either the job or the any
// of the vertices that need to be acknowledged.
// non of the messages should throw an exception
-
+
// wrong job id
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), ackAttemptID1, checkpointId));
-
+
// unknown checkpoint
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, 1L));
-
+
// unknown ack vertex
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, new ExecutionAttemptID(), checkpointId));
@@ -660,15 +683,16 @@ public class CheckpointCoordinatorTest {
private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID) {
return mockExecutionVertex(attemptID, ExecutionState.RUNNING);
}
-
+
private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID, ExecutionState state) {
final Execution exec = mock(Execution.class);
when(exec.getAttemptId()).thenReturn(attemptID);
when(exec.getState()).thenReturn(state);
ExecutionVertex vertex = mock(ExecutionVertex.class);
+ when(vertex.getJobvertexId()).thenReturn(new JobVertexID());
when(vertex.getCurrentExecutionAttempt()).thenReturn(exec);
-
+
return vertex;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTest.java
new file mode 100644
index 0000000..96c4eea
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertEquals;
+
+public abstract class CheckpointIDCounterTest extends TestLogger {
+
+ protected abstract CheckpointIDCounter createCompletedCheckpoints() throws Exception;
+
+ public static class StandaloneCheckpointIDCounterTest extends CheckpointIDCounterTest {
+
+ @Override
+ protected CheckpointIDCounter createCompletedCheckpoints() throws Exception {
+ return new StandaloneCheckpointIDCounter();
+ }
+ }
+
+ public static class ZooKeeperCheckpointIDCounterITCase extends CheckpointIDCounterTest {
+
+ private final static ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ if (ZooKeeper != null) {
+ ZooKeeper.shutdown();
+ }
+ }
+
+ @Before
+ public void cleanUp() throws Exception {
+ ZooKeeper.deleteAll();
+ }
+
+ @Override
+ protected CheckpointIDCounter createCompletedCheckpoints() throws Exception {
+ return new ZooKeeperCheckpointIDCounter(ZooKeeper.getClient(),
+ "/checkpoint-id-counter");
+ }
+ }
+
+ // ---------------------------------------------------------------------------------------------
+
+ /**
+ * Tests serial increment and get calls.
+ */
+ @Test
+ public void testSerialIncrementAndGet() throws Exception {
+ final CheckpointIDCounter counter = createCompletedCheckpoints();
+
+ try {
+ counter.start();
+
+ assertEquals(1, counter.getAndIncrement());
+ assertEquals(2, counter.getAndIncrement());
+ assertEquals(3, counter.getAndIncrement());
+ assertEquals(4, counter.getAndIncrement());
+ }
+ finally {
+ counter.stop();
+ }
+ }
+
+ /**
+ * Tests concurrent increment and get calls from multiple Threads and verifies that the numbers
+ * counts strictly increasing.
+ */
+ @Test
+ public void testConcurrentGetAndIncrement() throws Exception {
+ // Config
+ final int numThreads = 8;
+
+ // Setup
+ final CountDownLatch startLatch = new CountDownLatch(1);
+ final CheckpointIDCounter counter = createCompletedCheckpoints();
+ counter.start();
+
+ ExecutorService executor = null;
+ try {
+ executor = Executors.newFixedThreadPool(numThreads);
+
+ List<Future<List<Long>>> resultFutures = new ArrayList<>(numThreads);
+
+ for (int i = 0; i < numThreads; i++) {
+ resultFutures.add(executor.submit(new Incrementer(startLatch, counter)));
+ }
+
+ // Kick off the incrementing
+ startLatch.countDown();
+
+ final int expectedTotal = numThreads * Incrementer.NumIncrements;
+
+ List<Long> all = new ArrayList<>(expectedTotal);
+
+ // Get the counts
+ for (Future<List<Long>> result : resultFutures) {
+ List<Long> counts = result.get();
+
+ for (long val : counts) {
+ all.add(val);
+ }
+ }
+
+ // Verify
+ Collections.sort(all);
+
+ assertEquals(expectedTotal, all.size());
+
+ long current = 0;
+ for (long val : all) {
+ // Incrementing counts
+ assertEquals(++current, val);
+ }
+
+ // The final count
+ assertEquals(expectedTotal + 1, counter.getAndIncrement());
+ }
+ finally {
+ if (executor != null) {
+ executor.shutdown();
+ }
+
+ counter.stop();
+ }
+ }
+
+ /**
+ * Task repeatedly incrementing the {@link CheckpointIDCounter}.
+ */
+ private static class Incrementer implements Callable<List<Long>> {
+
+ /** Total number of {@link CheckpointIDCounter#getAndIncrement()} calls. */
+ private final static int NumIncrements = 128;
+
+ private final CountDownLatch startLatch;
+
+ private final CheckpointIDCounter counter;
+
+ public Incrementer(CountDownLatch startLatch, CheckpointIDCounter counter) {
+ this.startLatch = startLatch;
+ this.counter = counter;
+ }
+
+ @Override
+ public List<Long> call() throws Exception {
+ final Random rand = new Random();
+ final List<Long> counts = new ArrayList<>();
+
+ // Wait for the main thread to kick off execution
+ this.startLatch.await();
+
+ for (int i = 0; i < NumIncrements; i++) {
+ counts.add(counter.getAndIncrement());
+
+ // To get some "random" interleaving ;)
+ Thread.sleep(rand.nextInt(20));
+ }
+
+ return counts;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 08cb0a3..32c15bf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.state.LocalStateHandle;
import org.apache.flink.runtime.state.StateHandle;
@@ -79,10 +80,12 @@ public class CheckpointStateRestoreTest {
map.put(statelessId, stateless);
- CheckpointCoordinator coord = new CheckpointCoordinator(jid, 1, 200000L,
+ CheckpointCoordinator coord = new CheckpointCoordinator(jid, 200000L,
new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
- new ExecutionVertex[0], cl);
+ new ExecutionVertex[0], cl,
+ new StandaloneCheckpointIDCounter(),
+ new StandaloneCompletedCheckpointStore(1, cl), RecoveryMode.STANDALONE);
// create ourselves a checkpoint with state
final long timestamp = 34623786L;
@@ -148,10 +151,12 @@ public class CheckpointStateRestoreTest {
map.put(statelessId, stateless);
- CheckpointCoordinator coord = new CheckpointCoordinator(jid, 1, 200000L,
+ CheckpointCoordinator coord = new CheckpointCoordinator(jid, 200000L,
new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
- new ExecutionVertex[0], cl);
+ new ExecutionVertex[0], cl,
+ new StandaloneCheckpointIDCounter(),
+ new StandaloneCompletedCheckpointStore(1, cl), RecoveryMode.STANDALONE);
// create ourselves a checkpoint with state
final long timestamp = 34623786L;
@@ -188,10 +193,12 @@ public class CheckpointStateRestoreTest {
@Test
public void testNoCheckpointAvailable() {
try {
- CheckpointCoordinator coord = new CheckpointCoordinator(new JobID(), 1, 200000L,
+ CheckpointCoordinator coord = new CheckpointCoordinator(new JobID(), 200000L,
new ExecutionVertex[] { mock(ExecutionVertex.class) },
new ExecutionVertex[] { mock(ExecutionVertex.class) },
- new ExecutionVertex[0], cl);
+ new ExecutionVertex[0], cl,
+ new StandaloneCheckpointIDCounter(),
+ new StandaloneCompletedCheckpointStore(1, cl), RecoveryMode.STANDALONE);
try {
coord.restoreLatestCheckpointedState(new HashMap<JobVertexID, ExecutionJobVertex>(), true, false);
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
new file mode 100644
index 0000000..9e3c605
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.messages.CheckpointMessagesTest;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for basic {@link CompletedCheckpointStore} contract.
+ */
+public abstract class CompletedCheckpointStoreTest extends TestLogger {
+
+ /**
+ * Creates the {@link CompletedCheckpointStore} implementation to be tested.
+ */
+ protected abstract CompletedCheckpointStore createCompletedCheckpoints(
+ int maxNumberOfCheckpointsToRetain, ClassLoader userLoader) throws Exception;
+
+ // ---------------------------------------------------------------------------------------------
+
+ // Verify that discarded checkpoints are called with the correct class loader
+ private final ClassLoader userClassLoader = ClassLoader.getSystemClassLoader();
+
+ /**
+ * Tests that at least one checkpoint needs to be retained.
+ */
+ @Test(expected = Exception.class)
+ public void testExceptionOnNoRetainedCheckpoints() throws Exception {
+ createCompletedCheckpoints(0, userClassLoader);
+ }
+
+ /**
+ * Tests adding and getting a checkpoint.
+ */
+ @Test
+ public void testAddAndGetLatestCheckpoint() throws Exception {
+ CompletedCheckpointStore checkpoints = createCompletedCheckpoints(4, userClassLoader);
+
+ // Empty state
+ assertEquals(0, checkpoints.getNumberOfRetainedCheckpoints());
+ assertEquals(0, checkpoints.getAllCheckpoints().size());
+
+ TestCheckpoint[] expected = new TestCheckpoint[] {
+ createCheckpoint(0), createCheckpoint(1) };
+
+ // Add and get latest
+ checkpoints.addCheckpoint(expected[0]);
+ assertEquals(1, checkpoints.getNumberOfRetainedCheckpoints());
+ verifyCheckpoint(expected[0], checkpoints.getLatestCheckpoint());
+
+ checkpoints.addCheckpoint(expected[1]);
+ assertEquals(2, checkpoints.getNumberOfRetainedCheckpoints());
+ verifyCheckpoint(expected[1], checkpoints.getLatestCheckpoint());
+ }
+
+ /**
+ * Tests that adding more checkpoints than retained discards the correct checkpoints (using
+ * the correct class loader).
+ */
+ @Test
+ public void testAddCheckpointMoreThanMaxRetained() throws Exception {
+ CompletedCheckpointStore checkpoints = createCompletedCheckpoints(1, userClassLoader);
+
+ TestCheckpoint[] expected = new TestCheckpoint[] {
+ createCheckpoint(0), createCheckpoint(1),
+ createCheckpoint(2), createCheckpoint(3)
+ };
+
+ // Add checkpoints
+ checkpoints.addCheckpoint(expected[0]);
+ assertEquals(1, checkpoints.getNumberOfRetainedCheckpoints());
+
+ for (int i = 1; i < expected.length; i++) {
+ checkpoints.addCheckpoint(expected[i]);
+
+ // The ZooKeeper implementation discards asynchronously
+ expected[i - 1].awaitDiscard();
+ assertTrue(expected[i - 1].isDiscarded());
+ assertEquals(userClassLoader, expected[i - 1].getDiscardClassLoader());
+
+ assertEquals(1, checkpoints.getNumberOfRetainedCheckpoints());
+ }
+ }
+
+ /**
+ * Tests that
+ * <ul>
+ * <li>{@link CompletedCheckpointStore#getLatestCheckpoint()} returns <code>null</code>,</li>
+ * <li>{@link CompletedCheckpointStore#getAllCheckpoints()} returns an empty list,</li>
+ * <li>{@link CompletedCheckpointStore#getNumberOfRetainedCheckpoints()} returns 0.</li>
+ * </ul>
+ */
+ @Test
+ public void testEmptyState() throws Exception {
+ CompletedCheckpointStore checkpoints = createCompletedCheckpoints(1, userClassLoader);
+
+ assertNull(checkpoints.getLatestCheckpoint());
+ assertEquals(0, checkpoints.getAllCheckpoints().size());
+ assertEquals(0, checkpoints.getNumberOfRetainedCheckpoints());
+ }
+
+ /**
+ * Tests that all added checkpoints are returned.
+ */
+ @Test
+ public void testGetAllCheckpoints() throws Exception {
+ CompletedCheckpointStore checkpoints = createCompletedCheckpoints(4, userClassLoader);
+
+ TestCheckpoint[] expected = new TestCheckpoint[] {
+ createCheckpoint(0), createCheckpoint(1),
+ createCheckpoint(2), createCheckpoint(3)
+ };
+
+ for (TestCheckpoint checkpoint : expected) {
+ checkpoints.addCheckpoint(checkpoint);
+ }
+
+ List<CompletedCheckpoint> actual = checkpoints.getAllCheckpoints();
+
+ assertEquals(expected.length, actual.size());
+
+ for (int i = 0; i < expected.length; i++) {
+ assertEquals(expected[i], actual.get(i));
+ }
+ }
+
+ /**
+ * Tests that all checkpoints are discarded (using the correct class loader).
+ */
+ @Test
+ public void testDiscardAllCheckpoints() throws Exception {
+ CompletedCheckpointStore checkpoints = createCompletedCheckpoints(4, userClassLoader);
+
+ TestCheckpoint[] expected = new TestCheckpoint[] {
+ createCheckpoint(0), createCheckpoint(1),
+ createCheckpoint(2), createCheckpoint(3)
+ };
+
+ for (TestCheckpoint checkpoint : expected) {
+ checkpoints.addCheckpoint(checkpoint);
+ }
+
+ checkpoints.discardAllCheckpoints();
+
+ // Empty state
+ assertNull(checkpoints.getLatestCheckpoint());
+ assertEquals(0, checkpoints.getAllCheckpoints().size());
+ assertEquals(0, checkpoints.getNumberOfRetainedCheckpoints());
+
+ // All have been discarded
+ for (TestCheckpoint checkpoint : expected) {
+ // The ZooKeeper implementation discards asynchronously
+ checkpoint.awaitDiscard();
+ assertTrue(checkpoint.isDiscarded());
+ assertEquals(userClassLoader, checkpoint.getDiscardClassLoader());
+ }
+ }
+
+ // ---------------------------------------------------------------------------------------------
+
+ protected TestCheckpoint createCheckpoint(int id) throws IOException {
+ return createCheckpoint(id, 4);
+ }
+
+ protected TestCheckpoint createCheckpoint(int id, int numberOfStates)
+ throws IOException {
+
+ JobVertexID jvid = new JobVertexID();
+
+ ArrayList<StateForTask> taskStates = new ArrayList<>();
+
+ for (int i = 0; i < numberOfStates; i++) {
+ SerializedValue<StateHandle<?>> stateHandle = new SerializedValue<StateHandle<?>>(
+ new CheckpointMessagesTest.MyHandle());
+
+ taskStates.add(new StateForTask(stateHandle, jvid, i));
+ }
+
+ return new TestCheckpoint(new JobID(), id, 0, taskStates);
+ }
+
+ private void verifyCheckpoint(CompletedCheckpoint expected, CompletedCheckpoint actual) {
+ assertEquals(expected.getJobId(), actual.getJobId());
+ assertEquals(expected.getCheckpointID(), actual.getCheckpointID());
+ assertEquals(expected.getTimestamp(), actual.getTimestamp());
+
+ List<StateForTask> expectedStates = expected.getStates();
+ List<StateForTask> actualStates = actual.getStates();
+
+ assertEquals(expectedStates.size(), actualStates.size());
+
+ for (int i = 0; i < expectedStates.size(); i++) {
+ assertEquals(expectedStates.get(i), actualStates.get(i));
+ }
+ }
+
+ /**
+ * A test {@link CompletedCheckpoint}. We want to verify that the correct class loader is
+ * used when discarding. Spying on a regular {@link CompletedCheckpoint} instance with
+ * Mockito doesn't work, because it it breaks serializability.
+ */
+ protected static class TestCheckpoint extends CompletedCheckpoint {
+
+ private static final long serialVersionUID = 4211419809665983026L;
+
+ private boolean isDiscarded;
+
+ // Latch for test variants which discard asynchronously
+ private transient final CountDownLatch discardLatch = new CountDownLatch(1);
+
+ private transient ClassLoader discardClassLoader;
+
+ public TestCheckpoint(
+ JobID jobId,
+ long checkpointId,
+ long timestamp,
+ ArrayList<StateForTask> states) {
+
+ super(jobId, checkpointId, timestamp, states);
+ }
+
+ @Override
+ public void discard(ClassLoader userClassLoader) {
+ super.discard(userClassLoader);
+
+ if (!isDiscarded) {
+ this.discardClassLoader = userClassLoader;
+ this.isDiscarded = true;
+
+ if (discardLatch != null) {
+ discardLatch.countDown();
+ }
+ }
+ }
+
+ public boolean isDiscarded() {
+ return isDiscarded;
+ }
+
+ public void awaitDiscard() throws InterruptedException {
+ if (discardLatch != null) {
+ discardLatch.await();
+ }
+ }
+
+ public ClassLoader getDiscardClassLoader() {
+ return discardClassLoader;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TestCheckpoint that = (TestCheckpoint) o;
+
+ return getJobId().equals(that.getJobId())
+ && getCheckpointID() == that.getCheckpointID();
+ }
+
+ @Override
+ public int hashCode() {
+ return getJobId().hashCode() + (int) getCheckpointID();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
new file mode 100644
index 0000000..beccbf8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+/**
+ * Tests for basic {@link CompletedCheckpointStore} contract.
+ */
+public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointStoreTest {
+
+ @Override
+ protected CompletedCheckpointStore createCompletedCheckpoints(
+ int maxNumberOfCheckpointsToRetain,
+ ClassLoader userClassLoader) throws Exception {
+
+ return new StandaloneCompletedCheckpointStore(maxNumberOfCheckpointsToRetain, userClassLoader);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
new file mode 100644
index 0000000..4c6ddfd
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.state.LocalStateHandle;
+import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for basic {@link CompletedCheckpointStore} contract and ZooKeeper state handling.
+ */
+public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpointStoreTest {
+
+ private final static ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
+
+ private final static String CheckpointsPath = "/checkpoints";
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ if (ZooKeeper != null) {
+ ZooKeeper.shutdown();
+ }
+ }
+
+ @Before
+ public void cleanUp() throws Exception {
+ ZooKeeper.deleteAll();
+ }
+
+ @Override
+ protected CompletedCheckpointStore createCompletedCheckpoints(int maxNumberOfCheckpointsToRetain,
+ ClassLoader userLoader) throws Exception {
+
+ return new ZooKeeperCompletedCheckpointStore(maxNumberOfCheckpointsToRetain, userLoader,
+ ZooKeeper.createClient(), CheckpointsPath, new LocalStateHandle
+ .LocalStateHandleProvider<CompletedCheckpoint>());
+ }
+
+ // ---------------------------------------------------------------------------------------------
+
+ /**
+ * Tests that older checkpoints are cleaned up at startup.
+ */
+ @Test
+ public void testRecover() throws Exception {
+ CompletedCheckpointStore checkpoints = createCompletedCheckpoints(3, ClassLoader
+ .getSystemClassLoader());
+
+ TestCheckpoint[] expected = new TestCheckpoint[] {
+ createCheckpoint(0), createCheckpoint(1), createCheckpoint(2)
+ };
+
+ // Add multiple checkpoints
+ checkpoints.addCheckpoint(expected[0]);
+ checkpoints.addCheckpoint(expected[1]);
+ checkpoints.addCheckpoint(expected[2]);
+
+ // All three should be in ZK
+ assertEquals(3, ZooKeeper.getClient().getChildren().forPath(CheckpointsPath).size());
+
+ // Recover
+ checkpoints.recover();
+
+ // Only the latest one should be in ZK
+ Deadline deadline = new FiniteDuration(1, TimeUnit.MINUTES).fromNow();
+
+ // Retry this operation, because removal is asynchronous
+ while (deadline.hasTimeLeft() && ZooKeeper.getClient()
+ .getChildren().forPath(CheckpointsPath).size() != 1) {
+
+ Thread.sleep(Math.min(100, deadline.timeLeft().toMillis()));
+ }
+
+ assertEquals(1, ZooKeeper.getClient().getChildren().forPath(CheckpointsPath).size());
+ assertEquals(expected[2], checkpoints.getLatestCheckpoint());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index 5a5ef57..fa61acf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -178,6 +178,10 @@ public class BlobLibraryCacheManagerTest {
// un-register them again
libCache.unregisterTask(jid, executionId);
+
+ // Don't fail if called again
+ libCache.unregisterTask(jid, executionId);
+
assertEquals(0, libCache.getNumberOfReferenceHolders(jid));
// library is still cached (but not associated with job any more)
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
index 56e5bde..ca8810b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
@@ -25,7 +25,7 @@ import io.netty.channel.ChannelPromise;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.ConnectionID;
-import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.util.NetUtils;
import org.junit.Ignore;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerSubmittedJobGraphsRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerSubmittedJobGraphsRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerSubmittedJobGraphsRecoveryITCase.java
new file mode 100644
index 0000000..ac250bd
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerSubmittedJobGraphsRecoveryITCase.java
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.testkit.TestActorRef;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.TrueFileFilter;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.leaderelection.TestingListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.JobManagerMessages.JobStatusResponse;
+import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
+import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
+import org.apache.flink.runtime.testutils.JobManagerProcess;
+import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
+import org.apache.flink.util.TestLogger;
+import org.apache.zookeeper.data.Stat;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import scala.Option;
+import scala.Some;
+import scala.Tuple2;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests recovery of {@link SubmittedJobGraph} instances.
+ */
+public class JobManagerSubmittedJobGraphsRecoveryITCase extends TestLogger {
+
+ private final static ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
+
+ private final static FiniteDuration TestTimeOut = new FiniteDuration(5, TimeUnit.MINUTES);
+
+ private static final File FileStateBackendBasePath;
+
+ static {
+ try {
+ FileStateBackendBasePath = CommonTestUtils.createTempDirectory();
+ }
+ catch (IOException e) {
+ throw new RuntimeException("Error in test setup. Could not create directory.", e);
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ ZooKeeper.shutdown();
+
+ if (FileStateBackendBasePath != null) {
+ FileUtils.deleteDirectory(FileStateBackendBasePath);
+ }
+ }
+
+ @Before
+ public void cleanUp() throws Exception {
+ if (FileStateBackendBasePath != null) {
+ FileUtils.cleanDirectory(FileStateBackendBasePath);
+ }
+
+ ZooKeeper.deleteAll();
+ }
+
+ // ---------------------------------------------------------------------------------------------
+
+ /**
+ * Tests that the recovery state is cleaned up after a JobManager stops.
+ */
+ @Test
+ public void testJobManagerCleanUp() throws Exception {
+ Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(
+ ZooKeeper.getConnectString(), FileStateBackendBasePath.getPath());
+
+ // Configure the cluster
+ config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1);
+ config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+
+ TestingCluster flink = new TestingCluster(config, false, false, StreamingMode.STREAMING);
+
+ try {
+ final Deadline deadline = TestTimeOut.fromNow();
+
+ // Start the JobManager and TaskManager
+ flink.start(true);
+
+ JobGraph jobGraph = createBlockingJobGraph();
+
+ ActorGateway jobManager = flink.getLeaderGateway(deadline.timeLeft());
+
+ // Submit the job
+ jobManager.tell(new SubmitJob(jobGraph, ListeningBehaviour.DETACHED));
+
+ // Wait for the job to start
+ JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.RUNNING,
+ jobManager, deadline.timeLeft());
+ }
+ finally {
+ flink.shutdown();
+ }
+
+ // Verify that everything is clean
+ verifyCleanRecoveryState(config);
+ }
+
+ /**
+ * Tests that submissions to non-leaders are handled.
+ */
+ @Test
+ public void testSubmitJobToNonLeader() throws Exception {
+ Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(
+ ZooKeeper.getConnectString(), FileStateBackendBasePath.getPath());
+
+ // Configure the cluster
+ config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 2);
+ config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+
+ TestingCluster flink = new TestingCluster(config, false, false, StreamingMode.STREAMING);
+
+ try {
+ final Deadline deadline = TestTimeOut.fromNow();
+
+ // Start the JobManager and TaskManager
+ flink.start(true);
+
+ JobGraph jobGraph = createBlockingJobGraph();
+
+ List<ActorRef> bothJobManagers = flink.getJobManagersAsJava();
+
+ ActorGateway leadingJobManager = flink.getLeaderGateway(deadline.timeLeft());
+
+ ActorGateway nonLeadingJobManager;
+ if (bothJobManagers.get(0).equals(leadingJobManager.actor())) {
+ nonLeadingJobManager = new AkkaActorGateway(bothJobManagers.get(1), null);
+ }
+ else {
+ nonLeadingJobManager = new AkkaActorGateway(bothJobManagers.get(0), null);
+ }
+
+ // Submit the job
+ nonLeadingJobManager.tell(new SubmitJob(jobGraph, ListeningBehaviour.DETACHED));
+
+ // Wait for the job to start. We are asking the *leading** JM here although we've
+ // submitted the job to the non-leading JM. This is the behaviour under test.
+ JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.RUNNING,
+ leadingJobManager, deadline.timeLeft());
+
+ // Make sure that the **non-leading** JM has actually removed the job graph from her
+ // local state.
+ boolean success = false;
+ while (!success && deadline.hasTimeLeft()) {
+ JobStatusResponse jobStatusResponse = JobManagerActorTestUtils.requestJobStatus(
+ jobGraph.getJobID(), nonLeadingJobManager, deadline.timeLeft());
+
+ if (jobStatusResponse instanceof JobManagerMessages.JobNotFound) {
+ success = true;
+ }
+ else {
+ Thread.sleep(100);
+ }
+ }
+
+ if (!success) {
+ fail("Non-leading JM was still holding reference to the job graph.");
+ }
+ }
+ finally {
+ flink.shutdown();
+ }
+
+ // Verify that everything is clean
+ verifyCleanRecoveryState(config);
+ }
+
+ /**
+ * Tests that clients receive updates after recovery by a new leader.
+ */
+ @Test
+ public void testClientNonDetachedListeningBehaviour() throws Exception {
+ Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(
+ ZooKeeper.getConnectString(), FileStateBackendBasePath.getPath());
+
+ // Test actor system
+ ActorSystem testSystem = null;
+
+ // JobManager setup. Start the job managers as separate processes in order to not run the
+ // actors postStop, which cleans up all running jobs.
+ JobManagerProcess[] jobManagerProcess = new JobManagerProcess[2];
+
+ LeaderRetrievalService leaderRetrievalService = null;
+
+ ActorSystem taskManagerSystem = null;
+
+ try {
+ final Deadline deadline = TestTimeOut.fromNow();
+
+ // Test actor system
+ testSystem = AkkaUtils.createActorSystem(new Configuration(),
+ new Some<>(new Tuple2<String, Object>("localhost", 0)));
+
+ // The job managers
+ jobManagerProcess[0] = new JobManagerProcess(0, config);
+ jobManagerProcess[1] = new JobManagerProcess(1, config);
+
+ jobManagerProcess[0].createAndStart();
+ jobManagerProcess[1].createAndStart();
+
+ // Leader listener
+ TestingListener leaderListener = new TestingListener();
+ leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(config);
+ leaderRetrievalService.start(leaderListener);
+
+ // The task manager
+ taskManagerSystem = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
+ TaskManager.startTaskManagerComponentsAndActor(
+ config, taskManagerSystem, "localhost",
+ Option.<String>empty(), Option.<LeaderRetrievalService>empty(),
+ false, StreamingMode.STREAMING, TaskManager.class);
+
+ // Client test actor
+ TestActorRef<RecordingTestClient> clientRef = TestActorRef.create(
+ testSystem, Props.create(RecordingTestClient.class));
+
+ JobGraph jobGraph = createBlockingJobGraph();
+
+ {
+ // Initial submission
+ leaderListener.waitForNewLeader(deadline.timeLeft().toMillis());
+
+ String leaderAddress = leaderListener.getAddress();
+ UUID leaderId = leaderListener.getLeaderSessionID();
+
+ // The client
+ AkkaActorGateway client = new AkkaActorGateway(clientRef, leaderId);
+
+ // Get the leader ref
+ ActorRef leaderRef = AkkaUtils.getActorRef(
+ leaderAddress, testSystem, deadline.timeLeft());
+ ActorGateway leader = new AkkaActorGateway(leaderRef, leaderId);
+
+ // Submit the job in non-detached mode
+ leader.tell(new SubmitJob(jobGraph,
+ ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES), client);
+
+ JobManagerActorTestUtils.waitForJobStatus(
+ jobGraph.getJobID(), JobStatus.RUNNING, leader, deadline.timeLeft());
+ }
+
+ // Who's the boss?
+ JobManagerProcess leadingJobManagerProcess;
+ if (jobManagerProcess[0].getJobManagerAkkaURL().equals(leaderListener.getAddress())) {
+ leadingJobManagerProcess = jobManagerProcess[0];
+ }
+ else {
+ leadingJobManagerProcess = jobManagerProcess[1];
+ }
+
+ // Kill the leading job manager process
+ leadingJobManagerProcess.destroy();
+
+ {
+ // Recovery by the standby JobManager
+ leaderListener.waitForNewLeader(deadline.timeLeft().toMillis());
+
+ String leaderAddress = leaderListener.getAddress();
+ UUID leaderId = leaderListener.getLeaderSessionID();
+
+ ActorRef leaderRef = AkkaUtils.getActorRef(
+ leaderAddress, testSystem, deadline.timeLeft());
+ ActorGateway leader = new AkkaActorGateway(leaderRef, leaderId);
+
+ JobManagerActorTestUtils.waitForJobStatus(
+ jobGraph.getJobID(), JobStatus.RUNNING, leader, deadline.timeLeft());
+
+ // Cancel the job
+ leader.tell(new JobManagerMessages.CancelJob(jobGraph.getJobID()));
+ }
+
+ // Wait for the execution result
+ clientRef.underlyingActor().awaitJobResult(deadline.timeLeft().toMillis());
+
+ int jobSubmitSuccessMessages = 0;
+ for (Object msg : clientRef.underlyingActor().getMessages()) {
+ if (msg instanceof JobManagerMessages.JobSubmitSuccess) {
+ jobSubmitSuccessMessages++;
+ }
+ }
+
+ // At least two submissions should be ack-ed (initial and recovery). This is quite
+ // conservative, but it is still possible that these messages are overtaken by the
+ // final message.
+ assertEquals(2, jobSubmitSuccessMessages);
+ }
+ catch (Throwable t) {
+ // In case of an error, print the job manager process logs.
+ if (jobManagerProcess[0] != null) {
+ jobManagerProcess[0].printProcessLog();
+ }
+
+ if (jobManagerProcess[1] != null) {
+ jobManagerProcess[1].printProcessLog();
+ }
+
+ t.printStackTrace();
+ }
+ finally {
+ if (jobManagerProcess[0] != null) {
+ jobManagerProcess[0].destroy();
+ }
+
+ if (jobManagerProcess[1] != null) {
+ jobManagerProcess[1].destroy();
+ }
+
+ if (leaderRetrievalService != null) {
+ leaderRetrievalService.stop();
+ }
+
+ if (taskManagerSystem != null) {
+ taskManagerSystem.shutdown();
+ }
+
+ if (testSystem != null) {
+ testSystem.shutdown();
+ }
+ }
+ }
+
+ /**
+ * Simple recording client.
+ */
+ private static class RecordingTestClient extends UntypedActor {
+
+ private final Queue<Object> messages = new ConcurrentLinkedQueue<>();
+
+ private CountDownLatch jobResultLatch = new CountDownLatch(1);
+
+ @Override
+ public void onReceive(Object message) throws Exception {
+ if (message instanceof LeaderSessionMessage) {
+ message = ((LeaderSessionMessage) message).message();
+ }
+
+ messages.add(message);
+
+ // Check for job result
+ if (message instanceof JobManagerMessages.JobResultFailure ||
+ message instanceof JobManagerMessages.JobResultSuccess) {
+
+ jobResultLatch.countDown();
+ }
+ }
+
+ public Queue<Object> getMessages() {
+ return messages;
+ }
+
+ public void awaitJobResult(long timeout) throws InterruptedException {
+ jobResultLatch.await(timeout, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ // ---------------------------------------------------------------------------------------------
+
+ /**
+ * Creates a simple blocking JobGraph.
+ */
+ private static JobGraph createBlockingJobGraph() {
+ JobGraph jobGraph = new JobGraph("Blocking program");
+
+ JobVertex jobVertex = new JobVertex("Blocking Vertex");
+ jobVertex.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
+
+ jobGraph.addVertex(jobVertex);
+
+ return jobGraph;
+ }
+
+ /**
+ * Fails the test if the recovery state (file state backend and ZooKeeper) is not clean.
+ */
+ private static void verifyCleanRecoveryState(Configuration config) throws Exception {
+ // File state backend empty
+ Collection<File> stateHandles = FileUtils.listFiles(
+ FileStateBackendBasePath, TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);
+
+ if (!stateHandles.isEmpty()) {
+ fail("File state backend is not clean: " + stateHandles);
+ }
+
+ // ZooKeeper
+ String currentJobsPath = config.getString(
+ ConfigConstants.ZOOKEEPER_JOBGRAPHS_PATH,
+ ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH);
+
+ Stat stat = ZooKeeper.getClient().checkExists().forPath(currentJobsPath);
+
+ if (stat.getCversion() == 0) {
+ // Sanity check: verify that some changes have been performed
+ fail("ZooKeeper state for '" + currentJobsPath + "' has not been modified during " +
+ "this test. What are you testing?");
+ }
+
+ if (stat.getNumChildren() != 0) {
+ // Is everything clean again?
+ fail("ZooKeeper path '" + currentJobsPath + "' is not clean: " +
+ ZooKeeper.getClient().getChildren().forPath(currentJobsPath));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java
new file mode 100644
index 0000000..753e7be
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import akka.actor.ActorRef;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class StandaloneSubmittedJobGraphStoreTest {
+
+ /**
+ * Tests that all operations work and don't change the state.
+ */
+ @Test
+ public void testNoOps() throws Exception {
+ StandaloneSubmittedJobGraphStore jobGraphs = new StandaloneSubmittedJobGraphStore();
+
+ SubmittedJobGraph jobGraph = new SubmittedJobGraph(
+ new JobGraph("testNoOps"),
+ new JobInfo(ActorRef.noSender(), ListeningBehaviour.DETACHED, 0, Integer.MAX_VALUE));
+
+ assertEquals(0, jobGraphs.recoverJobGraphs().size());
+
+ jobGraphs.putJobGraph(jobGraph);
+ assertEquals(0, jobGraphs.recoverJobGraphs().size());
+
+ jobGraphs.removeJobGraph(jobGraph.getJobGraph().getJobID());
+ assertEquals(0, jobGraphs.recoverJobGraphs().size());
+
+ assertTrue(jobGraphs.recoverJobGraph(new JobID()).isEmpty());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
new file mode 100644
index 0000000..861a713
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import akka.actor.ActorRef;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener;
+import org.apache.flink.runtime.state.LocalStateHandle.LocalStateHandleProvider;
+import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.atMost;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for basic {@link SubmittedJobGraphStore} contract.
+ */
+public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
+
+ private final static ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
+
+ private final static LocalStateHandleProvider<SubmittedJobGraph> StateHandleProvider =
+ new LocalStateHandleProvider<>();
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ if (ZooKeeper != null) {
+ ZooKeeper.shutdown();
+ }
+ }
+
+ @Before
+ public void cleanUp() throws Exception {
+ ZooKeeper.deleteAll();
+ }
+
+ @Test
+ public void testPutAndRemoveJobGraph() throws Exception {
+ ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
+ ZooKeeper.createClient(), "/testPutAndRemoveJobGraph",
+ StateHandleProvider);
+
+ try {
+ SubmittedJobGraphListener listener = mock(SubmittedJobGraphListener.class);
+
+ jobGraphs.start(listener);
+
+ SubmittedJobGraph jobGraph = createSubmittedJobGraph(new JobID(), 0);
+
+ // Empty state
+ assertEquals(0, jobGraphs.recoverJobGraphs().size());
+
+ // Add initial
+ jobGraphs.putJobGraph(jobGraph);
+
+ // Verify initial job graph
+ List<SubmittedJobGraph> actual = jobGraphs.recoverJobGraphs();
+ assertEquals(1, actual.size());
+
+ verifyJobGraphs(jobGraph, actual.get(0));
+
+ // Update (same ID)
+ jobGraph = createSubmittedJobGraph(jobGraph.getJobId(), 1);
+ jobGraphs.putJobGraph(jobGraph);
+
+ // Verify updated
+ actual = jobGraphs.recoverJobGraphs();
+ assertEquals(1, actual.size());
+
+ verifyJobGraphs(jobGraph, actual.get(0));
+
+ // Remove
+ jobGraphs.removeJobGraph(jobGraph.getJobId());
+
+ // Empty state
+ assertEquals(0, jobGraphs.recoverJobGraphs().size());
+
+ // Nothing should have been notified
+ verify(listener, atMost(1)).onAddedJobGraph(any(JobID.class));
+ verify(listener, never()).onRemovedJobGraph(any(JobID.class));
+
+ // Don't fail if called again
+ jobGraphs.removeJobGraph(jobGraph.getJobId());
+ }
+ finally {
+ jobGraphs.stop();
+ }
+ }
+
+ @Test
+ public void testRecoverJobGraphs() throws Exception {
+ ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
+ ZooKeeper.createClient(), "/testRecoverJobGraphs", StateHandleProvider);
+
+ try {
+ SubmittedJobGraphListener listener = mock(SubmittedJobGraphListener.class);
+
+ jobGraphs.start(listener);
+
+ HashMap<JobID, SubmittedJobGraph> expected = new HashMap<>();
+ JobID[] jobIds = new JobID[] { new JobID(), new JobID(), new JobID() };
+
+ expected.put(jobIds[0], createSubmittedJobGraph(jobIds[0], 0));
+ expected.put(jobIds[1], createSubmittedJobGraph(jobIds[1], 1));
+ expected.put(jobIds[2], createSubmittedJobGraph(jobIds[2], 2));
+
+ // Add all
+ for (SubmittedJobGraph jobGraph : expected.values()) {
+ jobGraphs.putJobGraph(jobGraph);
+ }
+
+ List<SubmittedJobGraph> actual = jobGraphs.recoverJobGraphs();
+
+ assertEquals(expected.size(), actual.size());
+
+ for (SubmittedJobGraph jobGraph : actual) {
+ assertTrue(expected.containsKey(jobGraph.getJobId()));
+
+ verifyJobGraphs(expected.get(jobGraph.getJobId()), jobGraph);
+
+ jobGraphs.removeJobGraph(jobGraph.getJobId());
+ }
+
+ // Empty state
+ assertEquals(0, jobGraphs.recoverJobGraphs().size());
+
+ // Nothing should have been notified
+ verify(listener, atMost(expected.size())).onAddedJobGraph(any(JobID.class));
+ verify(listener, never()).onRemovedJobGraph(any(JobID.class));
+ }
+ finally {
+ jobGraphs.stop();
+ }
+ }
+
+ @Test
+ public void testConcurrentAddJobGraph() throws Exception {
+ ZooKeeperSubmittedJobGraphStore jobGraphs = null;
+ ZooKeeperSubmittedJobGraphStore otherJobGraphs = null;
+
+ try {
+ jobGraphs = new ZooKeeperSubmittedJobGraphStore(
+ ZooKeeper.createClient(), "/testConcurrentAddJobGraph", StateHandleProvider);
+
+ otherJobGraphs = new ZooKeeperSubmittedJobGraphStore(
+ ZooKeeper.createClient(), "/testConcurrentAddJobGraph", StateHandleProvider);
+
+
+ SubmittedJobGraph jobGraph = createSubmittedJobGraph(new JobID(), 0);
+ SubmittedJobGraph otherJobGraph = createSubmittedJobGraph(new JobID(), 0);
+
+ SubmittedJobGraphListener listener = mock(SubmittedJobGraphListener.class);
+
+ final JobID[] actualOtherJobId = new JobID[1];
+ final CountDownLatch sync = new CountDownLatch(1);
+
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ actualOtherJobId[0] = (JobID) invocation.getArguments()[0];
+ sync.countDown();
+
+ return null;
+ }
+ }).when(listener).onAddedJobGraph(any(JobID.class));
+
+ // Test
+ jobGraphs.start(listener);
+ otherJobGraphs.start(null);
+
+ jobGraphs.putJobGraph(jobGraph);
+
+ // Everything is cool... not much happening ;)
+ verify(listener, never()).onAddedJobGraph(any(JobID.class));
+ verify(listener, never()).onRemovedJobGraph(any(JobID.class));
+
+ // This bad boy adds the other job graph
+ otherJobGraphs.putJobGraph(otherJobGraph);
+
+ // Wait for the cache to call back
+ sync.await();
+
+ verify(listener, times(1)).onAddedJobGraph(any(JobID.class));
+ verify(listener, never()).onRemovedJobGraph(any(JobID.class));
+
+ assertEquals(otherJobGraph.getJobId(), actualOtherJobId[0]);
+ }
+ finally {
+ if (jobGraphs != null) {
+ jobGraphs.stop();
+ }
+
+ if (otherJobGraphs != null) {
+ otherJobGraphs.stop();
+ }
+ }
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testUpdateJobGraphYouDidNotGetOrAdd() throws Exception {
+ ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
+ ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", StateHandleProvider);
+
+ ZooKeeperSubmittedJobGraphStore otherJobGraphs = new ZooKeeperSubmittedJobGraphStore(
+ ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", StateHandleProvider);
+
+ jobGraphs.start(null);
+ otherJobGraphs.start(null);
+
+ SubmittedJobGraph jobGraph = createSubmittedJobGraph(new JobID(), 0);
+
+ jobGraphs.putJobGraph(jobGraph);
+
+ otherJobGraphs.putJobGraph(jobGraph);
+ }
+
+ // ---------------------------------------------------------------------------------------------
+
+ private SubmittedJobGraph createSubmittedJobGraph(JobID jobId, long start) {
+ final JobGraph jobGraph = new JobGraph(jobId, "Test JobGraph");
+
+ final JobVertex jobVertex = new JobVertex("Test JobVertex");
+ jobVertex.setParallelism(1);
+
+ jobGraph.addVertex(jobVertex);
+
+ final JobInfo jobInfo = new JobInfo(
+ ActorRef.noSender(), ListeningBehaviour.DETACHED, start, Integer.MAX_VALUE);
+
+ return new SubmittedJobGraph(jobGraph, jobInfo);
+ }
+
+ protected void verifyJobGraphs(SubmittedJobGraph expected, SubmittedJobGraph actual)
+ throws Exception {
+
+ JobGraph expectedJobGraph = expected.getJobGraph();
+ JobGraph actualJobGraph = actual.getJobGraph();
+
+ assertEquals(expectedJobGraph.getName(), actualJobGraph.getName());
+ assertEquals(expectedJobGraph.getJobID(), actualJobGraph.getJobID());
+
+ JobInfo expectedJobInfo = expected.getJobInfo();
+ JobInfo actualJobInfo = actual.getJobInfo();
+
+ assertEquals(expectedJobInfo.listeningBehaviour(), actualJobInfo.listeningBehaviour());
+ assertEquals(expectedJobInfo.start(), actualJobInfo.start());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index 753bbab..bbd8fad 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -25,19 +25,25 @@ import akka.actor.Props;
import akka.pattern.Patterns;
import akka.testkit.JavaTestKit;
import akka.util.Timeout;
+import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.test.TestingServer;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.instance.InstanceManager;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.testingUtils.TestingJobManager;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.util.LeaderElectionUtils;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -151,8 +157,19 @@ public class JobManagerLeaderElectionTest extends TestLogger {
}
private Props createJobManagerProps(Configuration configuration) throws Exception {
- LeaderElectionService leaderElectionService = LeaderElectionUtils.
- createLeaderElectionService(configuration);
+ LeaderElectionService leaderElectionService;
+ if (RecoveryMode.fromConfig(configuration) == RecoveryMode.STANDALONE) {
+ leaderElectionService = new StandaloneLeaderElectionService();
+ }
+ else {
+ CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration);
+ leaderElectionService = ZooKeeperUtils.createLeaderElectionService(client,
+ configuration);
+ }
+
+ // We don't need recovery in this test
+ SubmittedJobGraphStore submittedJobGraphStore = new StandaloneSubmittedJobGraphStore();
+ CheckpointRecoveryFactory checkpointRecoveryFactory = new StandaloneCheckpointRecoveryFactory();
return Props.create(
TestingJobManager.class,
@@ -166,7 +183,9 @@ public class JobManagerLeaderElectionTest extends TestLogger {
1L,
AkkaUtils.getDefaultTimeout(),
StreamingMode.BATCH_ONLY,
- leaderElectionService
+ leaderElectionService,
+ submittedJobGraphStore,
+ checkpointRecoveryFactory
);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
index c4fccd7..ea058f4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
@@ -27,6 +27,8 @@ import java.util.UUID;
*/
public class TestingLeaderElectionService implements LeaderElectionService, Serializable {
+ private static final long serialVersionUID = -8007939683948014574L;
+
private LeaderContender contender;
private boolean hasLeadership = false;
@@ -51,10 +53,12 @@ public class TestingLeaderElectionService implements LeaderElectionService, Seri
}
public void isLeader(UUID leaderSessionID) {
+ hasLeadership = true;
contender.grantLeadership(leaderSessionID);
}
public void notLeader() {
+ hasLeadership = false;
contender.revokeLeadership();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
index bb60415..aae1840 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
@@ -18,13 +18,14 @@
package org.apache.flink.runtime.leaderelection;
+import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.test.TestingServer;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.util.LeaderElectionUtils;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Before;
@@ -41,7 +42,7 @@ import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.concurrent.TimeUnit;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
public class ZooKeeperLeaderRetrievalTest extends TestLogger{
@@ -92,7 +93,11 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
Thread thread;
+ CuratorFramework[] client = new CuratorFramework[2];
+
try {
+ client[0] = ZooKeeperUtils.startCuratorFramework(config);
+ client[1] = ZooKeeperUtils.startCuratorFramework(config);
InetSocketAddress wrongInetSocketAddress = new InetSocketAddress(InetAddress.getByName("1.1.1.1"), 1234);
@@ -116,7 +121,7 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
String correctAddress = JobManager.getRemoteJobManagerAkkaURL(correctInetSocketAddress, Option.<String>empty());
- faultyLeaderElectionService = LeaderElectionUtils.createLeaderElectionService(config);
+ faultyLeaderElectionService = ZooKeeperUtils.createLeaderElectionService(client[0], config);
TestingContender wrongLeaderAddressContender = new TestingContender(wrongAddress, faultyLeaderElectionService);
faultyLeaderElectionService.start(wrongLeaderAddressContender);
@@ -127,7 +132,7 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
thread.start();
- leaderElectionService = LeaderElectionUtils.createLeaderElectionService(config);
+ leaderElectionService = ZooKeeperUtils.createLeaderElectionService(client[1], config);
TestingContender correctLeaderAddressContender = new TestingContender(correctAddress, leaderElectionService);
Thread.sleep(sleepingTime);
@@ -155,6 +160,14 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
if (leaderElectionService != null) {
leaderElectionService.stop();
}
+
+ if (client[0] != null) {
+ client[0].close();
+ }
+
+ if (client[1] != null) {
+ client[1].close();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
index 68575e5..087e0fd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
@@ -78,7 +78,7 @@ public class CheckpointMessagesTest {
assertNotNull(copy.toString());
}
- private static class MyHandle implements StateHandle<Serializable> {
+ public static class MyHandle implements StateHandle<Serializable> {
private static final long serialVersionUID = 8128146204128728332L;