You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/01/27 12:19:39 UTC
[2/2] flink git commit: [FLINK-5049] [queryable state] Remove TM
failure test
[FLINK-5049] [queryable state] Remove TM failure test
The test was flakey, because the expected behaviour is actually
not possible with the current state of queryable state, which
allows uncommitted reads.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c89a6224
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c89a6224
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c89a6224
Branch: refs/heads/master
Commit: c89a622449cf0f08fa1b8944c67c5cabe7aa1cc6
Parents: 24db045
Author: Ufuk Celebi <uc...@apache.org>
Authored: Fri Jan 27 13:15:24 2017 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Fri Jan 27 13:15:24 2017 +0100
----------------------------------------------------------------------
.../query/AbstractQueryableStateITCase.java | 219 -------------------
1 file changed, 219 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c89a6224/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
index 5683c5e..1912c0f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
@@ -18,11 +18,8 @@
package org.apache.flink.test.query;
-import akka.actor.ActorRef;
import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
import akka.dispatch.Futures;
-import akka.dispatch.Mapper;
import akka.dispatch.OnSuccess;
import akka.dispatch.Recover;
import akka.pattern.Patterns;
@@ -40,15 +37,11 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.QueryableStateOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
import org.apache.flink.runtime.messages.JobManagerMessages.JobFound;
-import org.apache.flink.runtime.query.KvStateLocation;
-import org.apache.flink.runtime.query.KvStateMessage;
import org.apache.flink.runtime.query.QueryableStateClient;
import org.apache.flink.runtime.query.netty.UnknownKeyOrNamespace;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
@@ -57,13 +50,10 @@ import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.ResponseRunningTasks;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.QueryableStateStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.util.MathUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
@@ -84,12 +74,9 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongArray;
-import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ExecutionGraphFound;
import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.JobStatusIs;
import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobStatus;
-import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestExecutionGraph;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -305,212 +292,6 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
}
/**
- * Queries a random key and waits for some checkpoints to complete. After
- * that the task manager where this key was held is killed. Then query the
- * key again and check for the expected Exception. Finally, add another
- * task manager and re-query the key (expecting a count >= the previous
- * one).
- */
- @Test
- public void testQueryableStateWithTaskManagerFailure() throws Exception {
- // Config
- final Deadline deadline = TEST_TIMEOUT.fromNow();
- final int numKeys = 256;
-
- final QueryableStateClient client = new QueryableStateClient(cluster.configuration());
-
- JobID jobId = null;
-
- try {
- //
- // Test program
- //
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStateBackend(stateBackend);
- env.setParallelism(NUM_SLOTS);
- // Very important, because cluster is shared between tests and we
- // don't explicitly check that all slots are available before
- // submitting.
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000));
- env.getCheckpointConfig().setCheckpointInterval(500);
-
- DataStream<Tuple2<Integer, Long>> source = env
- .addSource(new TestKeyRangeSource(numKeys));
-
- // Reducing state
- ReducingStateDescriptor<Tuple2<Integer, Long>> reducingState = new ReducingStateDescriptor<>(
- "any-name",
- new SumReduce(),
- source.getType());
-
- final String queryName = "hakuna-matata";
-
- final QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
- source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
- @Override
- public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
- return value.f0;
- }
- }).asQueryableState(queryName, reducingState);
-
- // Submit the job graph
- JobGraph jobGraph = env.getStreamGraph().getJobGraph();
- cluster.submitJobDetached(jobGraph);
-
- //
- // Start querying
- //
- jobId = jobGraph.getJobID();
-
- final int key = ThreadLocalRandom.current().nextInt(numKeys);
-
- // Query a random key
- final byte[] serializedKey = KvStateRequestSerializer.serializeKeyAndNamespace(
- key,
- queryableState.getKeySerializer(),
- VoidNamespace.INSTANCE,
- VoidNamespaceSerializer.INSTANCE);
-
- long countForKey = 0;
-
- boolean success = false;
- while (!success && deadline.hasTimeLeft()) {
- Future<byte[]> serializedResultFuture = getKvStateWithRetries(
- client,
- jobId,
- queryName,
- key,
- serializedKey,
- QUERY_RETRY_DELAY,
- false);
-
- byte[] serializedResult = Await.result(serializedResultFuture, deadline.timeLeft());
-
- Tuple2<Integer, Long> result = KvStateRequestSerializer.deserializeValue(
- serializedResult,
- queryableState.getValueSerializer());
-
- countForKey = result.f1;
-
- assertEquals("Key mismatch", key, result.f0.intValue());
- success = countForKey > 3; // Wait for some progress
- }
-
- assertTrue("No progress for count", countForKey > 3);
-
- long currentCheckpointId = TestKeyRangeSource.LATEST_CHECKPOINT_ID.get();
- long waitUntilCheckpointId = currentCheckpointId + 2;
-
- // Wait for some checkpoint after the query result
- while (deadline.hasTimeLeft() &&
- TestKeyRangeSource.LATEST_CHECKPOINT_ID.get() < waitUntilCheckpointId) {
- Thread.sleep(100);
- }
-
- assertTrue("Did not complete enough checkpoints to continue",
- TestKeyRangeSource.LATEST_CHECKPOINT_ID.get() >= waitUntilCheckpointId);
-
- //
- // Find out on which TaskManager the KvState is located and kill that TaskManager
- //
- // This is the subtask index
- int keyGroupIndex = MathUtils.murmurHash(key) % NUM_SLOTS;
-
- // Find out which task manager holds this key
- Future<ExecutionGraph> egFuture = cluster.getLeaderGateway(deadline.timeLeft())
- .ask(new RequestExecutionGraph(jobId), deadline.timeLeft())
- .mapTo(ClassTag$.MODULE$.<ExecutionGraphFound>apply(ExecutionGraphFound.class))
- .map(new Mapper<ExecutionGraphFound, ExecutionGraph>() {
- @Override
- public ExecutionGraph apply(ExecutionGraphFound found) {
- return (ExecutionGraph) found.executionGraph();
- }
- }, TEST_ACTOR_SYSTEM.dispatcher());
- ExecutionGraph eg = Await.result(egFuture, deadline.timeLeft());
-
- Future<KvStateLocation> locationFuture = cluster
- .getLeaderGateway(deadline.timeLeft())
- .ask(new KvStateMessage.LookupKvStateLocation(jobId, queryName), deadline.timeLeft())
- .mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class));
-
- KvStateLocation location = Await.result(locationFuture, deadline.timeLeft());
-
- ExecutionAttemptID executionId = eg.getJobVertex(location.getJobVertexId())
- .getTaskVertices()[keyGroupIndex]
- .getCurrentExecutionAttempt()
- .getAttemptId();
-
- List<ActorRef> taskManagers = cluster.getTaskManagersAsJava();
- ActorRef taskManagerToKill = null;
- for (ActorRef taskManager : taskManagers) {
- Future<ResponseRunningTasks> runningFuture = Patterns.ask(
- taskManager,
- TestingTaskManagerMessages.getRequestRunningTasksMessage(),
- deadline.timeLeft().toMillis())
- .mapTo(ClassTag$.MODULE$.<ResponseRunningTasks>apply(ResponseRunningTasks.class));
-
- ResponseRunningTasks running = Await.result(runningFuture, deadline.timeLeft());
-
- if (running.asJava().containsKey(executionId)) {
- taskManagerToKill = taskManager;
- break;
- }
- }
-
- assertNotNull("Did not find TaskManager holding the key", taskManagerToKill);
-
- // Kill the task manager
- taskManagerToKill.tell(PoisonPill.getInstance(), ActorRef.noSender());
-
- success = false;
- for (int i = 0; i < 10 && !success; i++) {
- try {
- // Wait for the expected error. We might have to retry if
- // the query is very fast.
- Await.result(client.getKvState(jobId, queryName, key, serializedKey), deadline.timeLeft());
- Thread.sleep(500);
- } catch (Throwable ignored) {
- success = true;
- }
- }
-
- assertTrue("Query did not fail", success);
-
- // Now start another task manager
- cluster.addTaskManager();
-
- Future<byte[]> serializedResultFuture = getKvStateWithRetries(
- client,
- jobId,
- queryName,
- key,
- serializedKey,
- QUERY_RETRY_DELAY,
- false);
-
- byte[] serializedResult = Await.result(serializedResultFuture, deadline.timeLeft());
-
- Tuple2<Integer, Long> result = KvStateRequestSerializer.deserializeValue(
- serializedResult,
- queryableState.getValueSerializer());
-
- assertTrue("Count moved backwards", result.f1 >= countForKey);
- } finally {
- // Free cluster resources
- if (jobId != null) {
- Future<CancellationSuccess> cancellation = cluster
- .getLeaderGateway(deadline.timeLeft())
- .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
- .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
-
- Await.ready(cancellation, deadline.timeLeft());
- }
-
- client.shutDown();
- }
- }
-
- /**
* Tests that duplicate query registrations fail the job at the JobManager.
*/
@Test