You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/12/06 15:11:50 UTC
[3/3] flink git commit: [FLINK-7880][QS] Wait for proper resource
cleanup after each ITCase.
[FLINK-7880][QS] Wait for proper resource cleanup after each ITCase.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a3fd548e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a3fd548e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a3fd548e
Branch: refs/heads/master
Commit: a3fd548e9c76c67609bbf159d5fb743d756450b1
Parents: 74d052b
Author: kkloudas <kk...@gmail.com>
Authored: Wed Dec 6 14:32:46 2017 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Wed Dec 6 14:33:28 2017 +0100
----------------------------------------------------------------------
.../itcases/AbstractQueryableStateTestBase.java | 1073 ++++++++----------
.../HAAbstractQueryableStateTestBase.java | 22 +-
.../HAQueryableStateRocksDBBackendITCase.java | 2 -
.../NonHAAbstractQueryableStateTestBase.java | 11 +-
...NonHAQueryableStateRocksDBBackendITCase.java | 2 -
5 files changed, 476 insertions(+), 634 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a3fd548e/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index 65e9bb5..5a28367 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -73,6 +73,7 @@ import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import java.util.ArrayList;
@@ -92,7 +93,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongArray;
-import scala.concurrent.Await;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
import scala.reflect.ClassTag$;
@@ -159,52 +159,40 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
@Test
@SuppressWarnings("unchecked")
public void testQueryableState() throws Exception {
- // Config
+
final Deadline deadline = TEST_TIMEOUT.fromNow();
final int numKeys = 256;
- JobID jobId = null;
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStateBackend(stateBackend);
+ env.setParallelism(maxParallelism);
+ // Very important, because cluster is shared between tests and we
+ // don't explicitly check that all slots are available before
+ // submitting.
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
- try {
- //
- // Test program
- //
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStateBackend(stateBackend);
- env.setParallelism(maxParallelism);
- // Very important, because cluster is shared between tests and we
- // don't explicitly check that all slots are available before
- // submitting.
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
- DataStream<Tuple2<Integer, Long>> source = env
- .addSource(new TestKeyRangeSource(numKeys));
-
- // Reducing state
- ReducingStateDescriptor<Tuple2<Integer, Long>> reducingState = new ReducingStateDescriptor<>(
- "any-name",
- new SumReduce(),
- source.getType());
-
- final String queryName = "hakuna-matata";
-
- source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
- private static final long serialVersionUID = 7143749578983540352L;
-
- @Override
- public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
- return value.f0;
- }
- }).asQueryableState(queryName, reducingState);
+ DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestKeyRangeSource(numKeys));
- // Submit the job graph
- JobGraph jobGraph = env.getStreamGraph().getJobGraph();
- cluster.submitJobDetached(jobGraph);
+ ReducingStateDescriptor<Tuple2<Integer, Long>> reducingState = new ReducingStateDescriptor<>(
+ "any-name", new SumReduce(), source.getType());
+
+ final String queryName = "hakuna-matata";
+
+ source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+ private static final long serialVersionUID = 7143749578983540352L;
+
+ @Override
+ public Integer getKey(Tuple2<Integer, Long> value) {
+ return value.f0;
+ }
+ }).asQueryableState(queryName, reducingState);
- //
- // Start querying
- //
- jobId = jobGraph.getJobID();
+ try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) {
+
+ final JobID jobId = autoCancellableJob.getJobId();
+ final JobGraph jobGraph = autoCancellableJob.getJobGraph();
+
+ cluster.submitJobDetached(jobGraph);
final AtomicLongArray counts = new AtomicLongArray(numKeys);
@@ -261,16 +249,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
long count = counts.get(i);
assertTrue("Count at position " + i + " is " + count, count > 0);
}
- } finally {
- // Free cluster resources
- if (jobId != null) {
- CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
- .getLeaderGateway(deadline.timeLeft())
- .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
- .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
-
- cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
- }
}
}
@@ -282,91 +260,94 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
final Deadline deadline = TEST_TIMEOUT.fromNow();
final int numKeys = 256;
- JobID jobId = null;
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStateBackend(stateBackend);
+ env.setParallelism(maxParallelism);
+ // Very important, because cluster is shared between tests and we
+ // don't explicitly check that all slots are available before
+ // submitting.
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
- try {
- //
- // Test program
- //
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStateBackend(stateBackend);
- env.setParallelism(maxParallelism);
- // Very important, because cluster is shared between tests and we
- // don't explicitly check that all slots are available before
- // submitting.
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
- DataStream<Tuple2<Integer, Long>> source = env
- .addSource(new TestKeyRangeSource(numKeys));
-
- // Reducing state
- ReducingStateDescriptor<Tuple2<Integer, Long>> reducingState = new ReducingStateDescriptor<>(
- "any-name",
- new SumReduce(),
- source.getType());
-
- final String queryName = "duplicate-me";
-
- final QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
- source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
- private static final long serialVersionUID = -4126824763829132959L;
-
- @Override
- public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
- return value.f0;
- }
- }).asQueryableState(queryName, reducingState);
+ DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestKeyRangeSource(numKeys));
- final QueryableStateStream<Integer, Tuple2<Integer, Long>> duplicate =
- source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
- private static final long serialVersionUID = -6265024000462809436L;
+ // Reducing state
+ ReducingStateDescriptor<Tuple2<Integer, Long>> reducingState = new ReducingStateDescriptor<>(
+ "any-name",
+ new SumReduce(),
+ source.getType());
- @Override
- public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
- return value.f0;
- }
- }).asQueryableState(queryName);
+ final String queryName = "duplicate-me";
- // Submit the job graph
- JobGraph jobGraph = env.getStreamGraph().getJobGraph();
- jobId = jobGraph.getJobID();
+ final QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
+ source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+ private static final long serialVersionUID = -4126824763829132959L;
- CompletableFuture<TestingJobManagerMessages.JobStatusIs> failedFuture = FutureUtils.toJava(
- cluster.getLeaderGateway(deadline.timeLeft())
- .ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.FAILED), deadline.timeLeft())
- .mapTo(ClassTag$.MODULE$.<TestingJobManagerMessages.JobStatusIs>apply(TestingJobManagerMessages.JobStatusIs.class)));
+ @Override
+ public Integer getKey(Tuple2<Integer, Long> value) {
+ return value.f0;
+ }
+ }).asQueryableState(queryName, reducingState);
- cluster.submitJobDetached(jobGraph);
+ final QueryableStateStream<Integer, Tuple2<Integer, Long>> duplicate =
+ source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+ private static final long serialVersionUID = -6265024000462809436L;
- TestingJobManagerMessages.JobStatusIs jobStatus =
+ @Override
+ public Integer getKey(Tuple2<Integer, Long> value) {
+ return value.f0;
+ }
+ }).asQueryableState(queryName);
+
+ // Submit the job graph
+ final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+ final JobID jobId = jobGraph.getJobID();
+
+ final CompletableFuture<TestingJobManagerMessages.JobStatusIs> failedFuture =
+ notifyWhenJobStatusIs(jobId, JobStatus.FAILED, deadline);
+
+ final CompletableFuture<TestingJobManagerMessages.JobStatusIs> cancellationFuture =
+ notifyWhenJobStatusIs(jobId, JobStatus.CANCELED, deadline);
+
+ cluster.submitJobDetached(jobGraph);
+
+ try {
+ final TestingJobManagerMessages.JobStatusIs jobStatus =
failedFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+
assertEquals(JobStatus.FAILED, jobStatus.state());
+ } catch (Exception e) {
+
+ // if the assertion fails, it means that the job was (falsely) not cancelled.
+ // in this case, and given that the mini-cluster is shared with other tests,
+ // we cancel the job and wait for the cancellation so that the resources are freed.
- // Get the job and check the cause
- JobManagerMessages.JobFound jobFound = FutureUtils.toJava(
- cluster.getLeaderGateway(deadline.timeLeft())
- .ask(new JobManagerMessages.RequestJob(jobId), deadline.timeLeft())
- .mapTo(ClassTag$.MODULE$.<JobManagerMessages.JobFound>apply(JobManagerMessages.JobFound.class)))
- .get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-
- String failureCause = jobFound.executionGraph().getFailureCause().getExceptionAsString();
-
- assertTrue("Not instance of SuppressRestartsException", failureCause.startsWith("org.apache.flink.runtime.execution.SuppressRestartsException"));
- int causedByIndex = failureCause.indexOf("Caused by: ");
- String subFailureCause = failureCause.substring(causedByIndex + "Caused by: ".length());
- assertTrue("Not caused by IllegalStateException", subFailureCause.startsWith("java.lang.IllegalStateException"));
- assertTrue("Exception does not contain registration name", subFailureCause.contains(queryName));
- } finally {
- // Free cluster resources
if (jobId != null) {
- scala.concurrent.Future<CancellationSuccess> cancellation = cluster
- .getLeaderGateway(deadline.timeLeft())
+ cluster.getLeaderGateway(deadline.timeLeft())
.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
- .mapTo(ClassTag$.MODULE$.<JobManagerMessages.CancellationSuccess>apply(JobManagerMessages.CancellationSuccess.class));
+ .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
- Await.ready(cancellation, deadline.timeLeft());
+ cancellationFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
}
+
+ // and we re-throw the exception.
+ throw e;
}
+
+ // Get the job and check the cause
+ JobManagerMessages.JobFound jobFound = FutureUtils.toJava(
+ cluster.getLeaderGateway(deadline.timeLeft())
+ .ask(new JobManagerMessages.RequestJob(jobId), deadline.timeLeft())
+ .mapTo(ClassTag$.MODULE$.<JobManagerMessages.JobFound>apply(JobManagerMessages.JobFound.class)))
+ .get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+
+ String failureCause = jobFound.executionGraph().getFailureCause().getExceptionAsString();
+
+ assertEquals(JobStatus.FAILED, jobFound.executionGraph().getState());
+ assertTrue("Not instance of SuppressRestartsException", failureCause.startsWith("org.apache.flink.runtime.execution.SuppressRestartsException"));
+ int causedByIndex = failureCause.indexOf("Caused by: ");
+ String subFailureCause = failureCause.substring(causedByIndex + "Caused by: ".length());
+ assertTrue("Not caused by IllegalStateException", subFailureCause.startsWith("java.lang.IllegalStateException"));
+ assertTrue("Exception does not contain registration name", subFailureCause.contains(queryName));
}
/**
@@ -377,55 +358,40 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
*/
@Test
public void testValueState() throws Exception {
- // Config
- final Deadline deadline = TEST_TIMEOUT.fromNow();
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
final long numElements = 1024L;
- JobID jobId = null;
- try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStateBackend(stateBackend);
- env.setParallelism(maxParallelism);
- // Very important, because cluster is shared between tests and we
- // don't explicitly check that all slots are available before
- // submitting.
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
- DataStream<Tuple2<Integer, Long>> source = env
- .addSource(new TestAscendingValueSource(numElements));
-
- // Value state
- ValueStateDescriptor<Tuple2<Integer, Long>> valueState = new ValueStateDescriptor<>(
- "any",
- source.getType());
-
- source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
- private static final long serialVersionUID = 7662520075515707428L;
-
- @Override
- public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
- return value.f0;
- }
- }).asQueryableState("hakuna", valueState);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStateBackend(stateBackend);
+ env.setParallelism(maxParallelism);
+ // Very important, because cluster is shared between tests and we
+ // don't explicitly check that all slots are available before
+ // submitting.
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
- // Submit the job graph
- JobGraph jobGraph = env.getStreamGraph().getJobGraph();
- jobId = jobGraph.getJobID();
+ DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestAscendingValueSource(numElements));
- cluster.submitJobDetached(jobGraph);
+ // Value state
+ ValueStateDescriptor<Tuple2<Integer, Long>> valueState = new ValueStateDescriptor<>("any", source.getType());
- executeValueQuery(deadline, client, jobId, "hakuna", valueState, numElements);
- } finally {
- // Free cluster resources
- if (jobId != null) {
- CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
- .getLeaderGateway(deadline.timeLeft())
- .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
- .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+ source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+ private static final long serialVersionUID = 7662520075515707428L;
- cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ @Override
+ public Integer getKey(Tuple2<Integer, Long> value) {
+ return value.f0;
}
+ }).asQueryableState("hakuna", valueState);
+
+ try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) {
+
+ final JobID jobId = autoCancellableJob.getJobId();
+ final JobGraph jobGraph = autoCancellableJob.getJobGraph();
+
+ cluster.submitJobDetached(jobGraph);
+
+ executeValueQuery(deadline, client, jobId, "hakuna", valueState, numElements);
}
}
@@ -434,48 +400,36 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
* contains a wrong jobId or wrong queryable state name.
*/
@Test
+ @Ignore
public void testWrongJobIdAndWrongQueryableStateName() throws Exception {
- // Config
- final Deadline deadline = TEST_TIMEOUT.fromNow();
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
final long numElements = 1024L;
- JobID jobId = null;
- try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStateBackend(stateBackend);
- env.setParallelism(maxParallelism);
- // Very important, because cluster is shared between tests and we
- // don't explicitly check that all slots are available before
- // submitting.
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
- DataStream<Tuple2<Integer, Long>> source = env
- .addSource(new TestAscendingValueSource(numElements));
-
- // Value state
- ValueStateDescriptor<Tuple2<Integer, Long>> valueState =
- new ValueStateDescriptor<>("any", source.getType());
-
- source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
- private static final long serialVersionUID = 7662520075515707428L;
-
- @Override
- public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
- return value.f0;
- }
- }).asQueryableState("hakuna", valueState);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStateBackend(stateBackend);
+ env.setParallelism(maxParallelism);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
- // Submit the job graph
- JobGraph jobGraph = env.getStreamGraph().getJobGraph();
- jobId = jobGraph.getJobID();
+ DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestAscendingValueSource(numElements));
+ ValueStateDescriptor<Tuple2<Integer, Long>> valueState = new ValueStateDescriptor<>("any", source.getType());
- CompletableFuture<TestingJobManagerMessages.JobStatusIs> runningFuture = FutureUtils.toJava(
- cluster.getLeaderGateway(deadline.timeLeft())
- .ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.RUNNING), deadline.timeLeft())
- .mapTo(ClassTag$.MODULE$.<TestingJobManagerMessages.JobStatusIs>apply(TestingJobManagerMessages.JobStatusIs.class)));
+ source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+ private static final long serialVersionUID = 7662520075515707428L;
- cluster.submitJobDetached(jobGraph);
+ @Override
+ public Integer getKey(Tuple2<Integer, Long> value) {
+ return value.f0;
+ }
+ }).asQueryableState("hakuna", valueState);
+
+ try (AutoCancellableJob closableJobGraph = new AutoCancellableJob(cluster, env, deadline)) {
+
+ // register to be notified when the job is running.
+ CompletableFuture<TestingJobManagerMessages.JobStatusIs> runningFuture =
+ notifyWhenJobStatusIs(closableJobGraph.getJobId(), JobStatus.RUNNING, deadline);
+
+ cluster.submitJobDetached(closableJobGraph.getJobGraph());
// expect for the job to be running
TestingJobManagerMessages.JobStatusIs jobStatus =
@@ -486,49 +440,38 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
CompletableFuture<ValueState<Tuple2<Integer, Long>>> unknownJobFuture = client.getKvState(
wrongJobId, // this is the wrong job id
- "hankuna",
+ "hakuna",
0,
BasicTypeInfo.INT_TYPE_INFO,
valueState);
try {
- unknownJobFuture.get();
- fail(); // by now the job must have failed.
+ unknownJobFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ fail(); // by now the request must have failed.
} catch (ExecutionException e) {
- Assert.assertTrue(e.getCause() instanceof RuntimeException);
- Assert.assertTrue(e.getCause().getMessage().contains(
+ Assert.assertTrue("GOT: " + e.getCause().getMessage(), e.getCause() instanceof RuntimeException);
+ Assert.assertTrue("GOT: " + e.getCause().getMessage(), e.getCause().getMessage().contains(
"FlinkJobNotFoundException: Could not find Flink job (" + wrongJobId + ")"));
- } catch (Exception ignored) {
- fail("Unexpected type of exception.");
+ } catch (Exception f) {
+ fail("Unexpected type of exception: " + f.getMessage());
}
CompletableFuture<ValueState<Tuple2<Integer, Long>>> unknownQSName = client.getKvState(
- jobId,
- "wrong-hankuna", // this is the wrong name.
+ closableJobGraph.getJobId(),
+ "wrong-hakuna", // this is the wrong name.
0,
BasicTypeInfo.INT_TYPE_INFO,
valueState);
try {
- unknownQSName.get();
- fail(); // by now the job must have failed.
+ unknownQSName.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ fail(); // by now the request must have failed.
} catch (ExecutionException e) {
- Assert.assertTrue(e.getCause() instanceof RuntimeException);
- Assert.assertTrue(e.getCause().getMessage().contains(
- "UnknownKvStateLocation: No KvStateLocation found for KvState instance with name 'wrong-hankuna'."));
- } catch (Exception ignored) {
- fail("Unexpected type of exception.");
- }
-
- } finally {
- // Free cluster resources
- if (jobId != null) {
- CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
- .getLeaderGateway(deadline.timeLeft())
- .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
- .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
-
- cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ Assert.assertTrue("GOT: " + e.getCause().getMessage(), e.getCause() instanceof RuntimeException);
+ Assert.assertTrue("GOT: " + e.getCause().getMessage(), e.getCause().getMessage().contains(
+ "UnknownKvStateLocation: No KvStateLocation found for KvState instance with name 'wrong-hakuna'."));
+ } catch (Exception f) {
+ fail("Unexpected type of exception: " + f.getMessage());
}
}
}
@@ -539,50 +482,44 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
*/
@Test
public void testQueryNonStartedJobState() throws Exception {
- // Config
- final Deadline deadline = TEST_TIMEOUT.fromNow();
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
final long numElements = 1024L;
- JobID jobId = null;
- try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStateBackend(stateBackend);
- env.setParallelism(maxParallelism);
- // Very important, because cluster is shared between tests and we
- // don't explicitly check that all slots are available before
- // submitting.
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
- DataStream<Tuple2<Integer, Long>> source = env
- .addSource(new TestAscendingValueSource(numElements));
-
- // Value state
- ValueStateDescriptor<Tuple2<Integer, Long>> valueState = new ValueStateDescriptor<>(
- "any",
- source.getType(),
- null);
-
- QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStateBackend(stateBackend);
+ env.setParallelism(maxParallelism);
+ // Very important, because cluster is shared between tests and we
+ // don't explicitly check that all slots are available before
+ // submitting.
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
+
+ DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestAscendingValueSource(numElements));
+
+ ValueStateDescriptor<Tuple2<Integer, Long>> valueState = new ValueStateDescriptor<>(
+ "any", source.getType(), null);
+
+ QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+
private static final long serialVersionUID = 7480503339992214681L;
@Override
- public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
+ public Integer getKey(Tuple2<Integer, Long> value) {
return value.f0;
}
}).asQueryableState("hakuna", valueState);
- // Submit the job graph
- JobGraph jobGraph = env.getStreamGraph().getJobGraph();
- jobId = jobGraph.getJobID();
+ try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) {
+
+ final JobID jobId = autoCancellableJob.getJobId();
+ final JobGraph jobGraph = autoCancellableJob.getJobGraph();
- // Now query
long expected = numElements;
// query once
client.getKvState(
- jobId,
+ autoCancellableJob.getJobId(),
queryableState.getQueryableStateName(),
0,
BasicTypeInfo.INT_TYPE_INFO,
@@ -591,16 +528,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
cluster.submitJobDetached(jobGraph);
executeValueQuery(deadline, client, jobId, "hakuna", valueState, expected);
- } finally {
- // Free cluster resources
- if (jobId != null) {
- CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
- .getLeaderGateway(deadline.timeLeft())
- .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
- .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
-
- cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
- }
}
}
@@ -615,51 +542,37 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
@Test(expected = UnknownKeyOrNamespaceException.class)
public void testValueStateDefault() throws Throwable {
- // Config
final Deadline deadline = TEST_TIMEOUT.fromNow();
-
final long numElements = 1024L;
- JobID jobId = null;
- try {
- StreamExecutionEnvironment env =
- StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStateBackend(stateBackend);
- env.setParallelism(maxParallelism);
- // Very important, because cluster is shared between tests and we
- // don't explicitly check that all slots are available before
- // submitting.
- env.setRestartStrategy(RestartStrategies
- .fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
- DataStream<Tuple2<Integer, Long>> source = env
- .addSource(new TestAscendingValueSource(numElements));
-
- // Value state
- ValueStateDescriptor<Tuple2<Integer, Long>> valueState =
- new ValueStateDescriptor<>(
- "any",
- source.getType(),
- Tuple2.of(0, 1337L));
-
- // only expose key "1"
- QueryableStateStream<Integer, Tuple2<Integer, Long>>
- queryableState =
- source.keyBy(
- new KeySelector<Tuple2<Integer, Long>, Integer>() {
- private static final long serialVersionUID = 4509274556892655887L;
-
- @Override
- public Integer getKey(
- Tuple2<Integer, Long> value) throws
- Exception {
- return 1;
- }
- }).asQueryableState("hakuna", valueState);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStateBackend(stateBackend);
+ env.setParallelism(maxParallelism);
+ // Very important, because cluster is shared between tests and we
+ // don't explicitly check that all slots are available before
+ // submitting.
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
- // Submit the job graph
- JobGraph jobGraph = env.getStreamGraph().getJobGraph();
- jobId = jobGraph.getJobID();
+ DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestAscendingValueSource(numElements));
+
+ ValueStateDescriptor<Tuple2<Integer, Long>> valueState = new ValueStateDescriptor<>(
+ "any", source.getType(), Tuple2.of(0, 1337L));
+
+ // only expose key "1"
+ QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState = source.keyBy(
+ new KeySelector<Tuple2<Integer, Long>, Integer>() {
+ private static final long serialVersionUID = 4509274556892655887L;
+
+ @Override
+ public Integer getKey(Tuple2<Integer, Long> value) {
+ return 1;
+ }
+ }).asQueryableState("hakuna", valueState);
+
+ try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) {
+
+ final JobID jobId = autoCancellableJob.getJobId();
+ final JobGraph jobGraph = autoCancellableJob.getJobGraph();
cluster.submitJobDetached(jobGraph);
@@ -683,17 +596,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
// exception in an ExecutionException.
throw e.getCause();
}
- } finally {
-
- // Free cluster resources
- if (jobId != null) {
- CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
- .getLeaderGateway(deadline.timeLeft())
- .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
- .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
-
- cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
- }
}
}
@@ -707,55 +609,41 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
*/
@Test
public void testValueStateShortcut() throws Exception {
- // Config
- final Deadline deadline = TEST_TIMEOUT.fromNow();
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
final long numElements = 1024L;
- JobID jobId = null;
- try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStateBackend(stateBackend);
- env.setParallelism(maxParallelism);
- // Very important, because cluster is shared between tests and we
- // don't explicitly check that all slots are available before
- // submitting.
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
- DataStream<Tuple2<Integer, Long>> source = env
- .addSource(new TestAscendingValueSource(numElements));
-
- // Value state shortcut
- QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
- source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
- private static final long serialVersionUID = 9168901838808830068L;
-
- @Override
- public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
- return value.f0;
- }
- }).asQueryableState("matata");
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStateBackend(stateBackend);
+ env.setParallelism(maxParallelism);
+ // Very important, because cluster is shared between tests and we
+ // don't explicitly check that all slots are available before
+ // submitting.
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
- // Submit the job graph
- JobGraph jobGraph = env.getStreamGraph().getJobGraph();
- jobId = jobGraph.getJobID();
+ DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestAscendingValueSource(numElements));
- cluster.submitJobDetached(jobGraph);
+ // Value state shortcut
+ final QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
+ source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+ private static final long serialVersionUID = 9168901838808830068L;
- final ValueStateDescriptor<Tuple2<Integer, Long>> stateDesc =
- (ValueStateDescriptor<Tuple2<Integer, Long>>) queryableState.getStateDescriptor();
- executeValueQuery(deadline, client, jobId, "matata", stateDesc, numElements);
- } finally {
+ @Override
+ public Integer getKey(Tuple2<Integer, Long> value) {
+ return value.f0;
+ }
+ }).asQueryableState("matata");
- // Free cluster resources
- if (jobId != null) {
- CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(
- cluster.getLeaderGateway(deadline.timeLeft())
- .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
- .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+ final ValueStateDescriptor<Tuple2<Integer, Long>> stateDesc =
+ (ValueStateDescriptor<Tuple2<Integer, Long>>) queryableState.getStateDescriptor();
- cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
- }
+ try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) {
+
+ final JobID jobId = autoCancellableJob.getJobId();
+ final JobGraph jobGraph = autoCancellableJob.getJobGraph();
+
+ cluster.submitJobDetached(jobGraph);
+ executeValueQuery(deadline, client, jobId, "matata", stateDesc, numElements);
}
}
@@ -768,50 +656,40 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
*/
@Test
public void testFoldingState() throws Exception {
- // Config
- final Deadline deadline = TEST_TIMEOUT.fromNow();
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
final int numElements = 1024;
- JobID jobId = null;
- try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStateBackend(stateBackend);
- env.setParallelism(maxParallelism);
- // Very important, because cluster is shared between tests and we
- // don't explicitly check that all slots are available before
- // submitting.
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
- DataStream<Tuple2<Integer, Long>> source = env
- .addSource(new TestAscendingValueSource(numElements));
-
- // Folding state
- FoldingStateDescriptor<Tuple2<Integer, Long>, String> foldingState =
- new FoldingStateDescriptor<>(
- "any",
- "0",
- new SumFold(),
- StringSerializer.INSTANCE);
-
- QueryableStateStream<Integer, String> queryableState =
- source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
- private static final long serialVersionUID = -842809958106747539L;
-
- @Override
- public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
- return value.f0;
- }
- }).asQueryableState("pumba", foldingState);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStateBackend(stateBackend);
+ env.setParallelism(maxParallelism);
+ // Very important, because cluster is shared between tests and we
+ // don't explicitly check that all slots are available before
+ // submitting.
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
+
+ DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestAscendingValueSource(numElements));
+
+ FoldingStateDescriptor<Tuple2<Integer, Long>, String> foldingState = new FoldingStateDescriptor<>(
+ "any", "0", new SumFold(), StringSerializer.INSTANCE);
- // Submit the job graph
- JobGraph jobGraph = env.getStreamGraph().getJobGraph();
- jobId = jobGraph.getJobID();
+ source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+ private static final long serialVersionUID = -842809958106747539L;
+
+ @Override
+ public Integer getKey(Tuple2<Integer, Long> value) {
+ return value.f0;
+ }
+ }).asQueryableState("pumba", foldingState);
+
+ try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) {
+
+ final JobID jobId = autoCancellableJob.getJobId();
+ final JobGraph jobGraph = autoCancellableJob.getJobGraph();
cluster.submitJobDetached(jobGraph);
- // Now query
- String expected = Integer.toString(numElements * (numElements + 1) / 2);
+ final String expected = Integer.toString(numElements * (numElements + 1) / 2);
for (int key = 0; key < maxParallelism; key++) {
boolean success = false;
@@ -840,16 +718,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
assertTrue("Did not succeed query", success);
}
- } finally {
- // Free cluster resources
- if (jobId != null) {
- CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
- .getLeaderGateway(deadline.timeLeft())
- .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
- .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
-
- cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
- }
}
}
@@ -861,48 +729,40 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
*/
@Test
public void testReducingState() throws Exception {
- // Config
- final Deadline deadline = TEST_TIMEOUT.fromNow();
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
final long numElements = 1024L;
- JobID jobId = null;
- try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStateBackend(stateBackend);
- env.setParallelism(maxParallelism);
- // Very important, because cluster is shared between tests and we
- // don't explicitly check that all slots are available before
- // submitting.
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
- DataStream<Tuple2<Integer, Long>> source = env
- .addSource(new TestAscendingValueSource(numElements));
-
- // Reducing state
- ReducingStateDescriptor<Tuple2<Integer, Long>> reducingState =
- new ReducingStateDescriptor<>(
- "any",
- new SumReduce(),
- source.getType());
-
- source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
- private static final long serialVersionUID = 8470749712274833552L;
-
- @Override
- public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
- return value.f0;
- }
- }).asQueryableState("jungle", reducingState);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStateBackend(stateBackend);
+ env.setParallelism(maxParallelism);
+ // Very important, because cluster is shared between tests and we
+ // don't explicitly check that all slots are available before
+ // submitting.
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
+
+ DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestAscendingValueSource(numElements));
- // Submit the job graph
- JobGraph jobGraph = env.getStreamGraph().getJobGraph();
- jobId = jobGraph.getJobID();
+ ReducingStateDescriptor<Tuple2<Integer, Long>> reducingState = new ReducingStateDescriptor<>(
+ "any", new SumReduce(), source.getType());
+
+ source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+ private static final long serialVersionUID = 8470749712274833552L;
+
+ @Override
+ public Integer getKey(Tuple2<Integer, Long> value) {
+ return value.f0;
+ }
+ }).asQueryableState("jungle", reducingState);
+
+ try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) {
+
+ final JobID jobId = autoCancellableJob.getJobId();
+ final JobGraph jobGraph = autoCancellableJob.getJobGraph();
cluster.submitJobDetached(jobGraph);
- // Now query
- long expected = numElements * (numElements + 1L) / 2L;
+ final long expected = numElements * (numElements + 1L) / 2L;
for (int key = 0; key < maxParallelism; key++) {
boolean success = false;
@@ -931,16 +791,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
assertTrue("Did not succeed query", success);
}
- } finally {
- // Free cluster resources
- if (jobId != null) {
- CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
- .getLeaderGateway(deadline.timeLeft())
- .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
- .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
-
- cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
- }
}
}
@@ -952,66 +802,60 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
*/
@Test
public void testMapState() throws Exception {
- // Config
- final Deadline deadline = TEST_TIMEOUT.fromNow();
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
final long numElements = 1024L;
- JobID jobId = null;
- try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStateBackend(stateBackend);
- env.setParallelism(maxParallelism);
- // Very important, because cluster is shared between tests and we
- // don't explicitly check that all slots are available before
- // submitting.
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
- DataStream<Tuple2<Integer, Long>> source = env
- .addSource(new TestAscendingValueSource(numElements));
-
- final MapStateDescriptor<Integer, Tuple2<Integer, Long>> mapStateDescriptor = new MapStateDescriptor<>(
- "timon",
- BasicTypeInfo.INT_TYPE_INFO,
- source.getType());
- mapStateDescriptor.setQueryable("timon-queryable");
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStateBackend(stateBackend);
+ env.setParallelism(maxParallelism);
+ // Very important, because cluster is shared between tests and we
+ // don't explicitly check that all slots are available before
+ // submitting.
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
- source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
- private static final long serialVersionUID = 8470749712274833552L;
+ DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestAscendingValueSource(numElements));
- @Override
- public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
- return value.f0;
- }
- }).process(new ProcessFunction<Tuple2<Integer, Long>, Object>() {
- private static final long serialVersionUID = -805125545438296619L;
+ final MapStateDescriptor<Integer, Tuple2<Integer, Long>> mapStateDescriptor = new MapStateDescriptor<>(
+ "timon", BasicTypeInfo.INT_TYPE_INFO, source.getType());
+ mapStateDescriptor.setQueryable("timon-queryable");
- private transient MapState<Integer, Tuple2<Integer, Long>> mapState;
+ source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+ private static final long serialVersionUID = 8470749712274833552L;
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- mapState = getRuntimeContext().getMapState(mapStateDescriptor);
- }
+ @Override
+ public Integer getKey(Tuple2<Integer, Long> value) {
+ return value.f0;
+ }
+ }).process(new ProcessFunction<Tuple2<Integer, Long>, Object>() {
+ private static final long serialVersionUID = -805125545438296619L;
- @Override
- public void processElement(Tuple2<Integer, Long> value, Context ctx, Collector<Object> out) throws Exception {
- Tuple2<Integer, Long> v = mapState.get(value.f0);
- if (v == null) {
- v = new Tuple2<>(value.f0, 0L);
- }
- mapState.put(value.f0, new Tuple2<>(v.f0, v.f1 + value.f1));
+ private transient MapState<Integer, Tuple2<Integer, Long>> mapState;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ mapState = getRuntimeContext().getMapState(mapStateDescriptor);
+ }
+
+ @Override
+ public void processElement(Tuple2<Integer, Long> value, Context ctx, Collector<Object> out) throws Exception {
+ Tuple2<Integer, Long> v = mapState.get(value.f0);
+ if (v == null) {
+ v = new Tuple2<>(value.f0, 0L);
}
- });
+ mapState.put(value.f0, new Tuple2<>(v.f0, v.f1 + value.f1));
+ }
+ });
+
+ try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) {
- // Submit the job graph
- JobGraph jobGraph = env.getStreamGraph().getJobGraph();
- jobId = jobGraph.getJobID();
+ final JobID jobId = autoCancellableJob.getJobId();
+ final JobGraph jobGraph = autoCancellableJob.getJobGraph();
cluster.submitJobDetached(jobGraph);
- // Now query
- long expected = numElements * (numElements + 1L) / 2L;
+ final long expected = numElements * (numElements + 1L) / 2L;
for (int key = 0; key < maxParallelism; key++) {
boolean success = false;
@@ -1039,16 +883,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
assertTrue("Did not succeed query", success);
}
- } finally {
- // Free cluster resources
- if (jobId != null) {
- CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
- .getLeaderGateway(deadline.timeLeft())
- .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
- .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
-
- cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
- }
}
}
@@ -1061,62 +895,56 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
*/
@Test
public void testListState() throws Exception {
- // Config
- final Deadline deadline = TEST_TIMEOUT.fromNow();
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
final long numElements = 1024L;
- JobID jobId = null;
- try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStateBackend(stateBackend);
- env.setParallelism(maxParallelism);
- // Very important, because cluster is shared between tests and we
- // don't explicitly check that all slots are available before
- // submitting.
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
- DataStream<Tuple2<Integer, Long>> source = env
- .addSource(new TestAscendingValueSource(numElements));
-
- final ListStateDescriptor<Long> listStateDescriptor = new ListStateDescriptor<Long>(
- "list",
- BasicTypeInfo.LONG_TYPE_INFO);
- listStateDescriptor.setQueryable("list-queryable");
-
- source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
- private static final long serialVersionUID = 8470749712274833552L;
-
- @Override
- public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
- return value.f0;
- }
- }).process(new ProcessFunction<Tuple2<Integer, Long>, Object>() {
- private static final long serialVersionUID = -805125545438296619L;
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStateBackend(stateBackend);
+ env.setParallelism(maxParallelism);
+ // Very important, because cluster is shared between tests and we
+ // don't explicitly check that all slots are available before
+ // submitting.
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
- private transient ListState<Long> listState;
+ DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestAscendingValueSource(numElements));
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- listState = getRuntimeContext().getListState(listStateDescriptor);
- }
+ final ListStateDescriptor<Long> listStateDescriptor = new ListStateDescriptor<Long>(
+ "list", BasicTypeInfo.LONG_TYPE_INFO);
+ listStateDescriptor.setQueryable("list-queryable");
- @Override
- public void processElement(Tuple2<Integer, Long> value, Context ctx, Collector<Object> out) throws Exception {
- listState.add(value.f1);
- }
- });
+ source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+ private static final long serialVersionUID = 8470749712274833552L;
- // Submit the job graph
- JobGraph jobGraph = env.getStreamGraph().getJobGraph();
- jobId = jobGraph.getJobID();
+ @Override
+ public Integer getKey(Tuple2<Integer, Long> value) {
+ return value.f0;
+ }
+ }).process(new ProcessFunction<Tuple2<Integer, Long>, Object>() {
+ private static final long serialVersionUID = -805125545438296619L;
- cluster.submitJobDetached(jobGraph);
+ private transient ListState<Long> listState;
- // Now query
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ listState = getRuntimeContext().getListState(listStateDescriptor);
+ }
- Map<Integer, Set<Long>> results = new HashMap<>();
+ @Override
+ public void processElement(Tuple2<Integer, Long> value, Context ctx, Collector<Object> out) throws Exception {
+ listState.add(value.f1);
+ }
+ });
+
+ try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) {
+
+ final JobID jobId = autoCancellableJob.getJobId();
+ final JobGraph jobGraph = autoCancellableJob.getJobGraph();
+
+ cluster.submitJobDetached(jobGraph);
+
+ final Map<Integer, Set<Long>> results = new HashMap<>();
for (int key = 0; key < maxParallelism; key++) {
boolean success = false;
@@ -1159,66 +987,48 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
}
}
- } finally {
- // Free cluster resources
- if (jobId != null) {
- CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
- .getLeaderGateway(deadline.timeLeft())
- .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
- .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
-
- cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
- }
}
}
@Test
public void testAggregatingState() throws Exception {
- // Config
- final Deadline deadline = TEST_TIMEOUT.fromNow();
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
final long numElements = 1024L;
- JobID jobId = null;
- try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStateBackend(stateBackend);
- env.setParallelism(maxParallelism);
- // Very important, because cluster is shared between tests and we
- // don't explicitly check that all slots are available before
- // submitting.
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
- DataStream<Tuple2<Integer, Long>> source = env
- .addSource(new TestAscendingValueSource(numElements));
-
- final AggregatingStateDescriptor<Tuple2<Integer, Long>, String, String> aggrStateDescriptor =
- new AggregatingStateDescriptor<>(
- "aggregates",
- new SumAggr(),
- String.class);
- aggrStateDescriptor.setQueryable("aggr-queryable");
-
- source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
- private static final long serialVersionUID = 8470749712274833552L;
-
- @Override
- public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
- return value.f0;
- }
- }).transform(
- "TestAggregatingOperator",
- BasicTypeInfo.STRING_TYPE_INFO,
- new AggregatingTestOperator(aggrStateDescriptor)
- );
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStateBackend(stateBackend);
+ env.setParallelism(maxParallelism);
+ // Very important, because cluster is shared between tests and we
+ // don't explicitly check that all slots are available before
+ // submitting.
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
- // Submit the job graph
- JobGraph jobGraph = env.getStreamGraph().getJobGraph();
- jobId = jobGraph.getJobID();
+ DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestAscendingValueSource(numElements));
- cluster.submitJobDetached(jobGraph);
+ final AggregatingStateDescriptor<Tuple2<Integer, Long>, String, String> aggrStateDescriptor =
+ new AggregatingStateDescriptor<>("aggregates", new SumAggr(), String.class);
+ aggrStateDescriptor.setQueryable("aggr-queryable");
- // Now query
+ source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+ private static final long serialVersionUID = 8470749712274833552L;
+
+ @Override
+ public Integer getKey(Tuple2<Integer, Long> value) {
+ return value.f0;
+ }
+ }).transform(
+ "TestAggregatingOperator",
+ BasicTypeInfo.STRING_TYPE_INFO,
+ new AggregatingTestOperator(aggrStateDescriptor)
+ );
+
+ try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) {
+
+ final JobID jobId = autoCancellableJob.getJobId();
+ final JobGraph jobGraph = autoCancellableJob.getJobGraph();
+
+ cluster.submitJobDetached(jobGraph);
for (int key = 0; key < maxParallelism; key++) {
boolean success = false;
@@ -1246,16 +1056,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
assertTrue("Did not succeed query", success);
}
- } finally {
- // Free cluster resources
- if (jobId != null) {
- CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
- .getLeaderGateway(deadline.timeLeft())
- .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
- .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
-
- cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
- }
}
}
@@ -1316,7 +1116,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
notifyAll();
}
}
-
}
/**
@@ -1465,6 +1264,60 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
///// General Utility Methods //////
+ /**
+ * A wrapper of the job graph that makes sure to cancel the job and wait for
+ * termination after the execution of every test.
+ */
+ private static class AutoCancellableJob implements AutoCloseable {
+
+ private final FlinkMiniCluster cluster;
+ private final Deadline deadline;
+ private final JobGraph jobGraph;
+
+ private final JobID jobId;
+ private final CompletableFuture<TestingJobManagerMessages.JobStatusIs> cancellationFuture;
+
+ AutoCancellableJob(final FlinkMiniCluster cluster, final StreamExecutionEnvironment env, final Deadline deadline) {
+ Preconditions.checkNotNull(env);
+
+ this.cluster = Preconditions.checkNotNull(cluster);
+ this.jobGraph = env.getStreamGraph().getJobGraph();
+ this.deadline = Preconditions.checkNotNull(deadline);
+
+ this.jobId = jobGraph.getJobID();
+ this.cancellationFuture = notifyWhenJobStatusIs(jobId, JobStatus.CANCELED, deadline);
+ }
+
+ JobGraph getJobGraph() {
+ return jobGraph;
+ }
+
+ JobID getJobId() {
+ return jobId;
+ }
+
+ @Override
+ public void close() throws Exception {
+ // Free cluster resources
+ if (jobId != null) {
+ cluster.getLeaderGateway(deadline.timeLeft())
+ .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+ .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
+
+ cancellationFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
+ private static CompletableFuture<TestingJobManagerMessages.JobStatusIs> notifyWhenJobStatusIs(
+ final JobID jobId, final JobStatus status, final Deadline deadline) {
+
+ return FutureUtils.toJava(
+ cluster.getLeaderGateway(deadline.timeLeft())
+ .ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, status), deadline.timeLeft())
+ .mapTo(ClassTag$.MODULE$.<TestingJobManagerMessages.JobStatusIs>apply(TestingJobManagerMessages.JobStatusIs.class)));
+ }
+
private static <K, S extends State, V> CompletableFuture<S> getKvState(
final Deadline deadline,
final QueryableStateClient client,
http://git-wip-us.apache.org/repos/asf/flink/blob/a3fd548e/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
index b9ce7c2..8767b52 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
@@ -31,6 +31,8 @@ import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.rules.TemporaryFolder;
+import java.io.IOException;
+
import static org.junit.Assert.fail;
/**
@@ -79,19 +81,13 @@ public abstract class HAAbstractQueryableStateTestBase extends AbstractQueryable
}
@AfterClass
- public static void tearDown() {
- if (cluster != null) {
- cluster.stop();
- cluster.awaitTermination();
- }
+ public static void tearDown() throws IOException {
+ client.shutdownAndWait();
- try {
- zkServer.stop();
- zkServer.close();
- client.shutdownAndWait();
- } catch (Throwable e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ cluster.stop();
+ cluster.awaitTermination();
+
+ zkServer.stop();
+ zkServer.close();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a3fd548e/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
index 18b167f..cae02e2 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
@@ -22,14 +22,12 @@ import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
/**
* Several integration tests for queryable state using the {@link RocksDBStateBackend}.
*/
-@Ignore
public class HAQueryableStateRocksDBBackendITCase extends HAAbstractQueryableStateTestBase {
@Rule
http://git-wip-us.apache.org/repos/asf/flink/blob/a3fd548e/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
index a5e24b2..2686a29 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
@@ -67,12 +67,9 @@ public abstract class NonHAAbstractQueryableStateTestBase extends AbstractQuerya
@AfterClass
public static void tearDown() {
- try {
- cluster.stop();
- client.shutdownAndWait();
- } catch (Throwable e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ client.shutdownAndWait();
+
+ cluster.stop();
+ cluster.awaitTermination();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a3fd548e/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
index 39fbe9e..7778a94 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
@@ -22,14 +22,12 @@ import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
/**
* Several integration tests for queryable state using the {@link RocksDBStateBackend}.
*/
-@Ignore
public class NonHAQueryableStateRocksDBBackendITCase extends NonHAAbstractQueryableStateTestBase {
@Rule