You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/01/27 12:20:56 UTC

flink git commit: [FLINK-5049] [queryable state] Remove TM failure test

Repository: flink
Updated Branches:
  refs/heads/release-1.2 b1ab75f48 -> c8c86e7bb


[FLINK-5049] [queryable state] Remove TM failure test

The test was flakey, because the expected behaviour is actually
not possible with the current state of queryable state, which
allows uncommitted reads.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c8c86e7b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c8c86e7b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c8c86e7b

Branch: refs/heads/release-1.2
Commit: c8c86e7bb254839d9c8ac595b36a8e70cf375933
Parents: b1ab75f
Author: Ufuk Celebi <uc...@apache.org>
Authored: Fri Jan 27 13:15:24 2017 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Fri Jan 27 13:20:45 2017 +0100

----------------------------------------------------------------------
 .../query/AbstractQueryableStateITCase.java     | 219 -------------------
 1 file changed, 219 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c8c86e7b/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
index 5683c5e..1912c0f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
@@ -18,11 +18,8 @@
 
 package org.apache.flink.test.query;
 
-import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
 import akka.dispatch.Futures;
-import akka.dispatch.Mapper;
 import akka.dispatch.OnSuccess;
 import akka.dispatch.Recover;
 import akka.pattern.Patterns;
@@ -40,15 +37,11 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.QueryableStateOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
 import org.apache.flink.runtime.messages.JobManagerMessages.JobFound;
-import org.apache.flink.runtime.query.KvStateLocation;
-import org.apache.flink.runtime.query.KvStateMessage;
 import org.apache.flink.runtime.query.QueryableStateClient;
 import org.apache.flink.runtime.query.netty.UnknownKeyOrNamespace;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
@@ -57,13 +50,10 @@ import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.ResponseRunningTasks;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.QueryableStateStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
@@ -84,12 +74,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLongArray;
 
-import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ExecutionGraphFound;
 import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.JobStatusIs;
 import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobStatus;
-import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestExecutionGraph;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -305,212 +292,6 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 	}
 
 	/**
-	 * Queries a random key and waits for some checkpoints to complete. After
-	 * that the task manager where this key was held is killed. Then query the
-	 * key again and check for the expected Exception. Finally, add another
-	 * task manager and re-query the key (expecting a count >= the previous
-	 * one).
-	 */
-	@Test
-	public void testQueryableStateWithTaskManagerFailure() throws Exception {
-		// Config
-		final Deadline deadline = TEST_TIMEOUT.fromNow();
-		final int numKeys = 256;
-
-		final QueryableStateClient client = new QueryableStateClient(cluster.configuration());
-
-		JobID jobId = null;
-
-		try {
-			//
-			// Test program
-			//
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-			env.setStateBackend(stateBackend);
-			env.setParallelism(NUM_SLOTS);
-			// Very important, because cluster is shared between tests and we
-			// don't explicitly check that all slots are available before
-			// submitting.
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000));
-			env.getCheckpointConfig().setCheckpointInterval(500);
-
-			DataStream<Tuple2<Integer, Long>> source = env
-					.addSource(new TestKeyRangeSource(numKeys));
-
-			// Reducing state
-			ReducingStateDescriptor<Tuple2<Integer, Long>> reducingState = new ReducingStateDescriptor<>(
-					"any-name",
-					new SumReduce(),
-					source.getType());
-
-			final String queryName = "hakuna-matata";
-
-			final QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
-					source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
-						@Override
-						public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
-							return value.f0;
-						}
-					}).asQueryableState(queryName, reducingState);
-
-			// Submit the job graph
-			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-			cluster.submitJobDetached(jobGraph);
-
-			//
-			// Start querying
-			//
-			jobId = jobGraph.getJobID();
-
-			final int key = ThreadLocalRandom.current().nextInt(numKeys);
-
-			// Query a random key
-			final byte[] serializedKey = KvStateRequestSerializer.serializeKeyAndNamespace(
-					key,
-					queryableState.getKeySerializer(),
-					VoidNamespace.INSTANCE,
-					VoidNamespaceSerializer.INSTANCE);
-
-			long countForKey = 0;
-
-			boolean success = false;
-			while (!success && deadline.hasTimeLeft()) {
-				Future<byte[]> serializedResultFuture = getKvStateWithRetries(
-						client,
-						jobId,
-						queryName,
-						key,
-						serializedKey,
-						QUERY_RETRY_DELAY,
-						false);
-
-				byte[] serializedResult = Await.result(serializedResultFuture, deadline.timeLeft());
-
-				Tuple2<Integer, Long> result = KvStateRequestSerializer.deserializeValue(
-						serializedResult,
-						queryableState.getValueSerializer());
-
-				countForKey = result.f1;
-
-				assertEquals("Key mismatch", key, result.f0.intValue());
-				success = countForKey > 3; // Wait for some progress
-			}
-
-			assertTrue("No progress for count", countForKey > 3);
-
-			long currentCheckpointId = TestKeyRangeSource.LATEST_CHECKPOINT_ID.get();
-			long waitUntilCheckpointId = currentCheckpointId + 2;
-
-			// Wait for some checkpoint after the query result
-			while (deadline.hasTimeLeft() &&
-					TestKeyRangeSource.LATEST_CHECKPOINT_ID.get() < waitUntilCheckpointId) {
-				Thread.sleep(100);
-			}
-
-			assertTrue("Did not complete enough checkpoints to continue",
-					TestKeyRangeSource.LATEST_CHECKPOINT_ID.get() >= waitUntilCheckpointId);
-
-			//
-			// Find out on which TaskManager the KvState is located and kill that TaskManager
-			//
-			// This is the subtask index
-			int keyGroupIndex = MathUtils.murmurHash(key) % NUM_SLOTS;
-
-			// Find out which task manager holds this key
-			Future<ExecutionGraph> egFuture = cluster.getLeaderGateway(deadline.timeLeft())
-					.ask(new RequestExecutionGraph(jobId), deadline.timeLeft())
-					.mapTo(ClassTag$.MODULE$.<ExecutionGraphFound>apply(ExecutionGraphFound.class))
-					.map(new Mapper<ExecutionGraphFound, ExecutionGraph>() {
-						@Override
-						public ExecutionGraph apply(ExecutionGraphFound found) {
-							return (ExecutionGraph) found.executionGraph();
-						}
-					}, TEST_ACTOR_SYSTEM.dispatcher());
-			ExecutionGraph eg = Await.result(egFuture, deadline.timeLeft());
-
-			Future<KvStateLocation> locationFuture = cluster
-					.getLeaderGateway(deadline.timeLeft())
-					.ask(new KvStateMessage.LookupKvStateLocation(jobId, queryName), deadline.timeLeft())
-					.mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class));
-
-			KvStateLocation location = Await.result(locationFuture, deadline.timeLeft());
-
-			ExecutionAttemptID executionId = eg.getJobVertex(location.getJobVertexId())
-					.getTaskVertices()[keyGroupIndex]
-					.getCurrentExecutionAttempt()
-					.getAttemptId();
-
-			List<ActorRef> taskManagers = cluster.getTaskManagersAsJava();
-			ActorRef taskManagerToKill = null;
-			for (ActorRef taskManager : taskManagers) {
-				Future<ResponseRunningTasks> runningFuture = Patterns.ask(
-						taskManager,
-						TestingTaskManagerMessages.getRequestRunningTasksMessage(),
-						deadline.timeLeft().toMillis())
-						.mapTo(ClassTag$.MODULE$.<ResponseRunningTasks>apply(ResponseRunningTasks.class));
-
-				ResponseRunningTasks running = Await.result(runningFuture, deadline.timeLeft());
-
-				if (running.asJava().containsKey(executionId)) {
-					taskManagerToKill = taskManager;
-					break;
-				}
-			}
-
-			assertNotNull("Did not find TaskManager holding the key", taskManagerToKill);
-
-			// Kill the task manager
-			taskManagerToKill.tell(PoisonPill.getInstance(), ActorRef.noSender());
-
-			success = false;
-			for (int i = 0; i < 10 && !success; i++) {
-				try {
-					// Wait for the expected error. We might have to retry if
-					// the query is very fast.
-					Await.result(client.getKvState(jobId, queryName, key, serializedKey), deadline.timeLeft());
-					Thread.sleep(500);
-				} catch (Throwable ignored) {
-					success = true;
-				}
-			}
-
-			assertTrue("Query did not fail", success);
-
-			// Now start another task manager
-			cluster.addTaskManager();
-
-			Future<byte[]> serializedResultFuture = getKvStateWithRetries(
-					client,
-					jobId,
-					queryName,
-					key,
-					serializedKey,
-					QUERY_RETRY_DELAY,
-					false);
-
-			byte[] serializedResult = Await.result(serializedResultFuture, deadline.timeLeft());
-
-			Tuple2<Integer, Long> result = KvStateRequestSerializer.deserializeValue(
-					serializedResult,
-					queryableState.getValueSerializer());
-
-			assertTrue("Count moved backwards", result.f1 >= countForKey);
-		} finally {
-			// Free cluster resources
-			if (jobId != null) {
-				Future<CancellationSuccess> cancellation = cluster
-						.getLeaderGateway(deadline.timeLeft())
-						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
-
-				Await.ready(cancellation, deadline.timeLeft());
-			}
-
-			client.shutDown();
-		}
-	}
-
-	/**
 	 * Tests that duplicate query registrations fail the job at the JobManager.
 	 */
 	@Test