You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/10/11 15:46:11 UTC
[11/14] flink git commit: [FLINK-7770][QS] Hide the queryable state
behind a proxy.
http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java
index a7f65f3..7ff4ec6 100644
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -34,8 +35,12 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.queryablestate.UnknownKeyOrNamespace;
+import org.apache.flink.configuration.QueryableStateOptions;
+import org.apache.flink.queryablestate.UnknownKeyOrNamespaceException;
import org.apache.flink.queryablestate.client.QueryableStateClient;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.messages.JobManagerMessages;
@@ -53,25 +58,27 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
-import akka.actor.ActorSystem;
-import akka.dispatch.Futures;
-import akka.dispatch.OnSuccess;
-import akka.dispatch.Recover;
-import akka.pattern.Patterns;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongArray;
+import java.util.function.Supplier;
import scala.concurrent.Await;
-import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
import scala.reflect.ClassTag$;
@@ -84,10 +91,12 @@ import static org.junit.Assert.assertTrue;
*/
public abstract class AbstractQueryableStateITCase extends TestLogger {
- protected static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10000, TimeUnit.SECONDS);
- private static final FiniteDuration QUERY_RETRY_DELAY = new FiniteDuration(100, TimeUnit.MILLISECONDS);
+ private static final int NO_OF_RETRIES = 100;
+ private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10000L, TimeUnit.SECONDS);
+ private static final Time QUERY_RETRY_DELAY = Time.milliseconds(100L);
- protected static ActorSystem testActorSystem;
+ private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
+ private final ScheduledExecutor executor = new ScheduledExecutorServiceAdapter(executorService);
/**
* State backend to use.
@@ -136,7 +145,9 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
final Deadline deadline = TEST_TIMEOUT.fromNow();
final int numKeys = 256;
- final QueryableStateClient client = new QueryableStateClient(cluster.configuration());
+ final QueryableStateClient client = new QueryableStateClient(
+ "localhost",
+ cluster.configuration().getInteger(QueryableStateOptions.SERVER_PORT));
JobID jobId = null;
@@ -150,7 +161,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
// Very important, because cluster is shared between tests and we
// don't explicitly check that all slots are available before
// submitting.
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000));
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
DataStream<Tuple2<Integer, Long>> source = env
.addSource(new TestKeyRangeSource(numKeys));
@@ -163,15 +174,14 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
final String queryName = "hakuna-matata";
- final QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
- source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
- private static final long serialVersionUID = 7143749578983540352L;
+ source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+ private static final long serialVersionUID = 7143749578983540352L;
- @Override
- public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
- return value.f0;
- }
- }).asQueryableState(queryName, reducingState);
+ @Override
+ public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
+ return value.f0;
+ }
+ }).asQueryableState(queryName, reducingState);
// Submit the job graph
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
@@ -188,19 +198,19 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
while (!allNonZero && deadline.hasTimeLeft()) {
allNonZero = true;
- final List<Future<Tuple2<Integer, Long>>> futures = new ArrayList<>(numKeys);
+ final List<CompletableFuture<Tuple2<Integer, Long>>> futures = new ArrayList<>(numKeys);
for (int i = 0; i < numKeys; i++) {
final int key = i;
- if (counts.get(key) > 0) {
+ if (counts.get(key) > 0L) {
// Skip this one
continue;
} else {
allNonZero = false;
}
- Future<Tuple2<Integer, Long>> result = getKvStateWithRetries(
+ CompletableFuture<Tuple2<Integer, Long>> result = getKvStateWithRetries(
client,
jobId,
queryName,
@@ -208,24 +218,21 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
BasicTypeInfo.INT_TYPE_INFO,
reducingState,
QUERY_RETRY_DELAY,
- false);
+ false,
+ executor);
- result.onSuccess(new OnSuccess<Tuple2<Integer, Long>>() {
- @Override
- public void onSuccess(Tuple2<Integer, Long> result) throws Throwable {
- counts.set(key, result.f1);
- assertEquals("Key mismatch", key, result.f0.intValue());
- }
- }, testActorSystem.dispatcher());
+ result.thenAccept(res -> {
+ counts.set(key, res.f1);
+ assertEquals("Key mismatch", key, res.f0.intValue());
+ });
futures.add(result);
}
- Future<Iterable<Tuple2<Integer, Long>>> futureSequence = Futures.sequence(
- futures,
- testActorSystem.dispatcher());
-
- Await.ready(futureSequence, deadline.timeLeft());
+ // wait for all the futures to complete
+ CompletableFuture
+ .allOf(futures.toArray(new CompletableFuture<?>[futures.size()]))
+ .get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
}
assertTrue("Not all keys are non-zero", allNonZero);
@@ -238,15 +245,15 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
} finally {
// Free cluster resources
if (jobId != null) {
- Future<CancellationSuccess> cancellation = cluster
+ CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
.getLeaderGateway(deadline.timeLeft())
.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
- .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
+ .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
- Await.ready(cancellation, deadline.timeLeft());
+ cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
}
- client.shutDown();
+ client.shutdown();
}
}
@@ -274,7 +281,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
// Very important, because cluster is shared between tests and we
// don't explicitly check that all slots are available before
// submitting.
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000));
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
DataStream<Tuple2<Integer, Long>> source = env
.addSource(new TestKeyRangeSource(numKeys));
@@ -311,22 +318,23 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
jobId = jobGraph.getJobID();
- Future<TestingJobManagerMessages.JobStatusIs> failedFuture = cluster
- .getLeaderGateway(deadline.timeLeft())
- .ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.FAILED), deadline.timeLeft())
- .mapTo(ClassTag$.MODULE$.<TestingJobManagerMessages.JobStatusIs>apply(TestingJobManagerMessages.JobStatusIs.class));
+ CompletableFuture<TestingJobManagerMessages.JobStatusIs> failedFuture = FutureUtils.toJava(
+ cluster.getLeaderGateway(deadline.timeLeft())
+ .ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.FAILED), deadline.timeLeft())
+ .mapTo(ClassTag$.MODULE$.<TestingJobManagerMessages.JobStatusIs>apply(TestingJobManagerMessages.JobStatusIs.class)));
cluster.submitJobDetached(jobGraph);
- TestingJobManagerMessages.JobStatusIs jobStatus = Await.result(failedFuture, deadline.timeLeft());
+ TestingJobManagerMessages.JobStatusIs jobStatus =
+ failedFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
assertEquals(JobStatus.FAILED, jobStatus.state());
// Get the job and check the cause
- JobManagerMessages.JobFound jobFound = Await.result(
+ JobManagerMessages.JobFound jobFound = FutureUtils.toJava(
cluster.getLeaderGateway(deadline.timeLeft())
.ask(new JobManagerMessages.RequestJob(jobId), deadline.timeLeft())
- .mapTo(ClassTag$.MODULE$.<JobManagerMessages.JobFound>apply(JobManagerMessages.JobFound.class)),
- deadline.timeLeft());
+ .mapTo(ClassTag$.MODULE$.<JobManagerMessages.JobFound>apply(JobManagerMessages.JobFound.class)))
+ .get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
String failureCause = jobFound.executionGraph().getFailureCause().getExceptionAsString();
@@ -338,10 +346,10 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
} finally {
// Free cluster resources
if (jobId != null) {
- Future<CancellationSuccess> cancellation = cluster
+ scala.concurrent.Future<CancellationSuccess> cancellation = cluster
.getLeaderGateway(deadline.timeLeft())
.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
- .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
+ .mapTo(ClassTag$.MODULE$.<JobManagerMessages.CancellationSuccess>apply(JobManagerMessages.CancellationSuccess.class));
Await.ready(cancellation, deadline.timeLeft());
}
@@ -359,9 +367,11 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
// Config
final Deadline deadline = TEST_TIMEOUT.fromNow();
- final int numElements = 1024;
+ final long numElements = 1024L;
- final QueryableStateClient client = new QueryableStateClient(cluster.configuration());
+ final QueryableStateClient client = new QueryableStateClient(
+ "localhost",
+ cluster.configuration().getInteger(QueryableStateOptions.SERVER_PORT));
JobID jobId = null;
try {
@@ -371,7 +381,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
// Very important, because cluster is shared between tests and we
// don't explicitly check that all slots are available before
// submitting.
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000));
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
DataStream<Tuple2<Integer, Long>> source = env
.addSource(new TestAscendingValueSource(numElements));
@@ -381,15 +391,14 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
"any",
source.getType());
- QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
- source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
- private static final long serialVersionUID = 7662520075515707428L;
+ source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+ private static final long serialVersionUID = 7662520075515707428L;
- @Override
- public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
- return value.f0;
- }
- }).asQueryableState("hakuna", valueState);
+ @Override
+ public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
+ return value.f0;
+ }
+ }).asQueryableState("hakuna", valueState);
// Submit the job graph
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
@@ -397,22 +406,19 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
cluster.submitJobDetached(jobGraph);
- // Now query
- long expected = numElements;
-
- executeQuery(deadline, client, jobId, "hakuna", valueState, expected);
+ executeQuery(deadline, client, jobId, "hakuna", valueState, numElements);
} finally {
// Free cluster resources
if (jobId != null) {
- Future<CancellationSuccess> cancellation = cluster
+ CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
.getLeaderGateway(deadline.timeLeft())
.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
- .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
+ .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
- Await.ready(cancellation, deadline.timeLeft());
+ cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
}
- client.shutDown();
+ client.shutdown();
}
}
@@ -425,9 +431,11 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
// Config
final Deadline deadline = TEST_TIMEOUT.fromNow();
- final int numElements = 1024;
+ final long numElements = 1024L;
- final QueryableStateClient client = new QueryableStateClient(cluster.configuration());
+ final QueryableStateClient client = new QueryableStateClient(
+ "localhost",
+ cluster.configuration().getInteger(QueryableStateOptions.SERVER_PORT));
JobID jobId = null;
try {
@@ -437,7 +445,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
// Very important, because cluster is shared between tests and we
// don't explicitly check that all slots are available before
// submitting.
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000));
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
DataStream<Tuple2<Integer, Long>> source = env
.addSource(new TestAscendingValueSource(numElements));
@@ -481,15 +489,15 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
} finally {
// Free cluster resources
if (jobId != null) {
- Future<CancellationSuccess> cancellation = cluster
- .getLeaderGateway(deadline.timeLeft())
- .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
- .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
+ CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
+ .getLeaderGateway(deadline.timeLeft())
+ .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+ .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
- Await.ready(cancellation, deadline.timeLeft());
+ cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
}
- client.shutDown();
+ client.shutdown();
}
}
@@ -508,23 +516,25 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
for (int key = 0; key < maxParallelism; key++) {
boolean success = false;
while (deadline.hasTimeLeft() && !success) {
- Future<Tuple2<Integer, Long>> future = getKvStateWithRetries(client,
- jobId,
- queryableStateName,
- key,
- BasicTypeInfo.INT_TYPE_INFO,
- stateDescriptor,
- QUERY_RETRY_DELAY,
- false);
+ CompletableFuture<Tuple2<Integer, Long>> future = getKvStateWithRetries(
+ client,
+ jobId,
+ queryableStateName,
+ key,
+ BasicTypeInfo.INT_TYPE_INFO,
+ stateDescriptor,
+ QUERY_RETRY_DELAY,
+ false,
+ executor);
- Tuple2<Integer, Long> value = Await.result(future, deadline.timeLeft());
+ Tuple2<Integer, Long> value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
assertEquals("Key mismatch", key, value.f0.intValue());
if (expected == value.f1) {
success = true;
} else {
// Retry
- Thread.sleep(50);
+ Thread.sleep(50L);
}
}
@@ -554,16 +564,17 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
BasicTypeInfo.INT_TYPE_INFO,
valueSerializer,
QUERY_RETRY_DELAY,
- false);
+ false,
+ executor);
- Tuple2<Integer, Long> value = Await.result(future, deadline.timeLeft());
+ Tuple2<Integer, Long> value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
assertEquals("Key mismatch", key, value.f0.intValue());
if (expected == value.f1) {
success = true;
} else {
// Retry
- Thread.sleep(50);
+ Thread.sleep(50L);
}
}
@@ -575,20 +586,21 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
* Tests simple value state queryable state instance with a default value
* set. Each source emits (subtaskIndex, 0)..(subtaskIndex, numElements)
* tuples, the key is mapped to 1 but key 0 is queried which should throw
- * a {@link UnknownKeyOrNamespace} exception.
+ * a {@link UnknownKeyOrNamespaceException} exception.
*
- * @throws UnknownKeyOrNamespace thrown due querying a non-existent key
+ * @throws UnknownKeyOrNamespaceException thrown due querying a non-existent key
*/
- @Test(expected = UnknownKeyOrNamespace.class)
- public void testValueStateDefault() throws
- Exception, UnknownKeyOrNamespace {
+ @Test(expected = UnknownKeyOrNamespaceException.class)
+ public void testValueStateDefault() throws Throwable {
// Config
final Deadline deadline = TEST_TIMEOUT.fromNow();
- final int numElements = 1024;
+ final long numElements = 1024L;
- final QueryableStateClient client = new QueryableStateClient(cluster.configuration());
+ final QueryableStateClient client = new QueryableStateClient(
+ "localhost",
+ cluster.configuration().getInteger(QueryableStateOptions.SERVER_PORT));
JobID jobId = null;
try {
@@ -600,7 +612,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
// don't explicitly check that all slots are available before
// submitting.
env.setRestartStrategy(RestartStrategies
- .fixedDelayRestart(Integer.MAX_VALUE, 1000));
+ .fixedDelayRestart(Integer.MAX_VALUE, 1000L));
DataStream<Tuple2<Integer, Long>> source = env
.addSource(new TestAscendingValueSource(numElements));
@@ -635,30 +647,37 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
// Now query
int key = 0;
- Future<Tuple2<Integer, Long>> future = getKvStateWithRetries(client,
- jobId,
- queryableState.getQueryableStateName(),
- key,
- BasicTypeInfo.INT_TYPE_INFO,
- valueState,
- QUERY_RETRY_DELAY,
- true);
-
- Await.result(future, deadline.timeLeft());
+ CompletableFuture<Tuple2<Integer, Long>> future = getKvStateWithRetries(
+ client,
+ jobId,
+ queryableState.getQueryableStateName(),
+ key,
+ BasicTypeInfo.INT_TYPE_INFO,
+ valueState,
+ QUERY_RETRY_DELAY,
+ true,
+ executor);
+
+ try {
+ future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ } catch (ExecutionException | CompletionException e) {
+ // get() on a completedExceptionally future wraps the
+ // exception in an ExecutionException.
+ throw e.getCause();
+ }
} finally {
+
// Free cluster resources
if (jobId != null) {
- Future<CancellationSuccess> cancellation = cluster
- .getLeaderGateway(deadline.timeLeft())
- .ask(new JobManagerMessages.CancelJob(jobId),
- deadline.timeLeft())
- .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(
- CancellationSuccess.class));
+ CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
+ .getLeaderGateway(deadline.timeLeft())
+ .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+ .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
- Await.ready(cancellation, deadline.timeLeft());
+ cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
}
- client.shutDown();
+ client.shutdown();
}
}
@@ -675,9 +694,11 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
// Config
final Deadline deadline = TEST_TIMEOUT.fromNow();
- final int numElements = 1024;
+ final long numElements = 1024L;
- final QueryableStateClient client = new QueryableStateClient(cluster.configuration());
+ final QueryableStateClient client = new QueryableStateClient(
+ "localhost",
+ cluster.configuration().getInteger(QueryableStateOptions.SERVER_PORT));
JobID jobId = null;
try {
@@ -687,7 +708,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
// Very important, because cluster is shared between tests and we
// don't explicitly check that all slots are available before
// submitting.
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000));
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
DataStream<Tuple2<Integer, Long>> source = env
.addSource(new TestAscendingValueSource(numElements));
@@ -709,23 +730,21 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
cluster.submitJobDetached(jobGraph);
- // Now query
- long expected = numElements;
-
executeQuery(deadline, client, jobId, "matata",
- queryableState.getValueSerializer(), expected);
+ queryableState.getValueSerializer(), numElements);
} finally {
+
// Free cluster resources
if (jobId != null) {
- Future<CancellationSuccess> cancellation = cluster
- .getLeaderGateway(deadline.timeLeft())
- .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
- .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
+ CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(
+ cluster.getLeaderGateway(deadline.timeLeft())
+ .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+ .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
- Await.ready(cancellation, deadline.timeLeft());
+ cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
}
- client.shutDown();
+ client.shutdown();
}
}
@@ -743,7 +762,9 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
final int numElements = 1024;
- final QueryableStateClient client = new QueryableStateClient(cluster.configuration());
+ final QueryableStateClient client = new QueryableStateClient(
+ "localhost",
+ cluster.configuration().getInteger(QueryableStateOptions.SERVER_PORT));
JobID jobId = null;
try {
@@ -788,21 +809,23 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
for (int key = 0; key < maxParallelism; key++) {
boolean success = false;
while (deadline.hasTimeLeft() && !success) {
- Future<String> future = getKvStateWithRetries(client,
+ CompletableFuture<String> future = getKvStateWithRetries(
+ client,
jobId,
queryableState.getQueryableStateName(),
key,
BasicTypeInfo.INT_TYPE_INFO,
foldingState,
QUERY_RETRY_DELAY,
- false);
+ false,
+ executor);
- String value = Await.result(future, deadline.timeLeft());
+ String value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
if (expected.equals(value)) {
success = true;
} else {
// Retry
- Thread.sleep(50);
+ Thread.sleep(50L);
}
}
@@ -811,15 +834,15 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
} finally {
// Free cluster resources
if (jobId != null) {
- Future<CancellationSuccess> cancellation = cluster
+ CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
.getLeaderGateway(deadline.timeLeft())
.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
- .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
+ .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
- Await.ready(cancellation, deadline.timeLeft());
+ cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
}
- client.shutDown();
+ client.shutdown();
}
}
@@ -834,9 +857,11 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
// Config
final Deadline deadline = TEST_TIMEOUT.fromNow();
- final int numElements = 1024;
+ final long numElements = 1024L;
- final QueryableStateClient client = new QueryableStateClient(cluster.configuration());
+ final QueryableStateClient client = new QueryableStateClient(
+ "localhost",
+ cluster.configuration().getInteger(QueryableStateOptions.SERVER_PORT));
JobID jobId = null;
try {
@@ -858,15 +883,14 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
new SumReduce(),
source.getType());
- QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
- source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
- private static final long serialVersionUID = 8470749712274833552L;
+ source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+ private static final long serialVersionUID = 8470749712274833552L;
- @Override
- public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
- return value.f0;
- }
- }).asQueryableState("jungle", reducingState);
+ @Override
+ public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
+ return value.f0;
+ }
+ }).asQueryableState("jungle", reducingState);
// Submit the job graph
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
@@ -877,117 +901,24 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
// Wait until job is running
// Now query
- long expected = numElements * (numElements + 1) / 2;
+ long expected = numElements * (numElements + 1L) / 2L;
executeQuery(deadline, client, jobId, "jungle", reducingState, expected);
} finally {
// Free cluster resources
if (jobId != null) {
- Future<CancellationSuccess> cancellation = cluster
+ CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
.getLeaderGateway(deadline.timeLeft())
.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
- .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
+ .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
- Await.ready(cancellation, deadline.timeLeft());
+ cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
}
- client.shutDown();
+ client.shutdown();
}
}
- private static <K, V> Future<V> getKvStateWithRetries(
- final QueryableStateClient client,
- final JobID jobId,
- final String queryName,
- final K key,
- final TypeInformation<K> keyTypeInfo,
- final TypeSerializer<V> valueTypeSerializer,
- final FiniteDuration retryDelay,
- final boolean failForUnknownKeyOrNamespace) {
-
- return client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, valueTypeSerializer)
- .recoverWith(new Recover<Future<V>>() {
- @Override
- public Future<V> recover(Throwable failure) throws Throwable {
- if (failure instanceof AssertionError) {
- return Futures.failed(failure);
- } else if (failForUnknownKeyOrNamespace &&
- (failure instanceof UnknownKeyOrNamespace)) {
- return Futures.failed(failure);
- } else {
- // At startup some failures are expected
- // due to races. Make sure that they don't
- // fail this test.
- return Patterns.after(
- retryDelay,
- testActorSystem.scheduler(),
- testActorSystem.dispatcher(),
- new Callable<Future<V>>() {
- @Override
- public Future<V> call() throws Exception {
- return getKvStateWithRetries(
- client,
- jobId,
- queryName,
- key,
- keyTypeInfo,
- valueTypeSerializer,
- retryDelay,
- failForUnknownKeyOrNamespace);
- }
- });
- }
- }
- }, testActorSystem.dispatcher());
-
- }
-
- private static <K, V> Future<V> getKvStateWithRetries(
- final QueryableStateClient client,
- final JobID jobId,
- final String queryName,
- final K key,
- final TypeInformation<K> keyTypeInfo,
- final StateDescriptor<?, V> stateDescriptor,
- final FiniteDuration retryDelay,
- final boolean failForUnknownKeyOrNamespace) {
-
- return client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor)
- .recoverWith(new Recover<Future<V>>() {
- @Override
- public Future<V> recover(Throwable failure) throws Throwable {
- if (failure instanceof AssertionError) {
- return Futures.failed(failure);
- } else if (failForUnknownKeyOrNamespace &&
- (failure instanceof UnknownKeyOrNamespace)) {
- return Futures.failed(failure);
- } else {
- // At startup some failures are expected
- // due to races. Make sure that they don't
- // fail this test.
- return Patterns.after(
- retryDelay,
- testActorSystem.scheduler(),
- testActorSystem.dispatcher(),
- new Callable<Future<V>>() {
- @Override
- public Future<V> call() throws Exception {
- return getKvStateWithRetries(
- client,
- jobId,
- queryName,
- key,
- keyTypeInfo,
- stateDescriptor,
- retryDelay,
- failForUnknownKeyOrNamespace);
- }
- });
- }
- }
- }, testActorSystem.dispatcher());
- }
-
/**
* Test source producing (key, 0)..(key, maxValue) with key being the sub
* task index.
@@ -1030,7 +961,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
while (isRunning) {
synchronized (this) {
- this.wait();
+ wait();
}
}
}
@@ -1040,7 +971,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
isRunning = false;
synchronized (this) {
- this.notifyAll();
+ notifyAll();
}
}
@@ -1125,4 +1056,105 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
}
}
+ ///// General Utility Methods //////
+
+ private static <K, V> Future<V> getKvStateWithRetries(
+ final QueryableStateClient client,
+ final JobID jobId,
+ final String queryName,
+ final K key,
+ final TypeInformation<K> keyTypeInfo,
+ final TypeSerializer<V> valueTypeSerializer,
+ final Time retryDelay,
+ final boolean failForUnknownKeyOrNamespace,
+ final ScheduledExecutor executor) {
+
+ return retryWithDelay(
+ () -> client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, valueTypeSerializer),
+ NO_OF_RETRIES,
+ retryDelay,
+ executor,
+ failForUnknownKeyOrNamespace);
+ }
+
+ private static <K, V> CompletableFuture<V> getKvStateWithRetries(
+ final QueryableStateClient client,
+ final JobID jobId,
+ final String queryName,
+ final K key,
+ final TypeInformation<K> keyTypeInfo,
+ final StateDescriptor<?, V> stateDescriptor,
+ final Time retryDelay,
+ final boolean failForUnknownKeyOrNamespace,
+ final ScheduledExecutor executor) {
+ return retryWithDelay(
+ () -> client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor),
+ NO_OF_RETRIES,
+ retryDelay,
+ executor,
+ failForUnknownKeyOrNamespace);
+ }
+
+ private static <T> CompletableFuture<T> retryWithDelay(
+ final Supplier<CompletableFuture<T>> operation,
+ final int retries,
+ final Time retryDelay,
+ final ScheduledExecutor scheduledExecutor,
+ final boolean failIfUnknownKeyOrNamespace) {
+
+ final CompletableFuture<T> resultFuture = new CompletableFuture<>();
+
+ retryWithDelay(
+ resultFuture,
+ operation,
+ retries,
+ retryDelay,
+ scheduledExecutor,
+ failIfUnknownKeyOrNamespace);
+
+ return resultFuture;
+ }
+
+ public static <T> void retryWithDelay(
+ final CompletableFuture<T> resultFuture,
+ final Supplier<CompletableFuture<T>> operation,
+ final int retries,
+ final Time retryDelay,
+ final ScheduledExecutor scheduledExecutor,
+ final boolean failIfUnknownKeyOrNamespace) {
+
+ if (!resultFuture.isDone()) {
+ final CompletableFuture<T> operationResultFuture = operation.get();
+ operationResultFuture.whenCompleteAsync(
+ (t, throwable) -> {
+ if (throwable != null) {
+ if (throwable.getCause() instanceof CancellationException) {
+ resultFuture.completeExceptionally(new FutureUtils.RetryException("Operation future was cancelled.", throwable.getCause()));
+ } else if (throwable.getCause() instanceof AssertionError ||
+ (failIfUnknownKeyOrNamespace && throwable.getCause() instanceof UnknownKeyOrNamespaceException)) {
+ resultFuture.completeExceptionally(throwable.getCause());
+ } else {
+ if (retries > 0) {
+ final ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule(
+ () -> retryWithDelay(resultFuture, operation, retries - 1, retryDelay, scheduledExecutor, failIfUnknownKeyOrNamespace),
+ retryDelay.toMilliseconds(),
+ TimeUnit.MILLISECONDS);
+
+ resultFuture.whenComplete(
+ (innerT, innerThrowable) -> scheduledFuture.cancel(false));
+ } else {
+ resultFuture.completeExceptionally(new FutureUtils.RetryException("Could not complete the operation. Number of retries " +
+ "has been exhausted.", throwable));
+ }
+ }
+ } else {
+ resultFuture.complete(t);
+ }
+ },
+ scheduledExecutor);
+
+ resultFuture.whenComplete(
+ (t, throwable) -> operationResultFuture.cancel(false));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
index 15a5ff6..a2a9678 100644
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
@@ -22,7 +22,6 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.QueryableStateOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.testingUtils.TestingCluster;
@@ -40,7 +39,7 @@ import static org.junit.Assert.fail;
public abstract class HAAbstractQueryableStateITCase extends AbstractQueryableStateITCase {
private static final int NUM_JMS = 2;
- private static final int NUM_TMS = 4;
+ private static final int NUM_TMS = 1;
private static final int NUM_SLOTS_PER_TM = 4;
private static TestingServer zkServer;
@@ -67,8 +66,6 @@ public abstract class HAAbstractQueryableStateITCase extends AbstractQueryableSt
cluster = new TestingCluster(config, false);
cluster.start();
- testActorSystem = AkkaUtils.createDefaultActorSystem();
-
// verify that we are in HA mode
Assert.assertTrue(cluster.haMode() == HighAvailabilityMode.ZOOKEEPER);
@@ -85,9 +82,6 @@ public abstract class HAAbstractQueryableStateITCase extends AbstractQueryableSt
cluster.awaitTermination();
}
- testActorSystem.shutdown();
- testActorSystem.awaitTermination();
-
try {
zkServer.stop();
zkServer.close();
http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
index c52acc8..1173d0d 100644
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
@@ -22,7 +22,6 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.QueryableStateOptions;
import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.testingUtils.TestingCluster;
@@ -37,7 +36,7 @@ import static org.junit.Assert.fail;
*/
public abstract class NonHAAbstractQueryableStateITCase extends AbstractQueryableStateITCase {
- private static final int NUM_TMS = 2;
+ private static final int NUM_TMS = 1;
private static final int NUM_SLOTS_PER_TM = 4;
@BeforeClass
@@ -48,14 +47,13 @@ public abstract class NonHAAbstractQueryableStateITCase extends AbstractQueryabl
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1);
+ config.setInteger(QueryableStateOptions.SERVER_PORT, 9069);
config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true);
config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1);
cluster = new TestingCluster(config, false);
cluster.start(true);
- testActorSystem = AkkaUtils.createDefaultActorSystem();
-
// verify that we are not in HA mode
Assert.assertTrue(cluster.haMode() == HighAvailabilityMode.NONE);
@@ -73,9 +71,5 @@ public abstract class NonHAAbstractQueryableStateITCase extends AbstractQueryabl
e.printStackTrace();
fail(e.getMessage());
}
-
- if (testActorSystem != null) {
- testActorSystem.shutdown();
- }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AkkaKvStateLocationLookupServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AkkaKvStateLocationLookupServiceTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AkkaKvStateLocationLookupServiceTest.java
deleted file mode 100644
index d9a41a1..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AkkaKvStateLocationLookupServiceTest.java
+++ /dev/null
@@ -1,399 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.network;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.queryablestate.UnknownJobManager;
-import org.apache.flink.queryablestate.client.AkkaKvStateLocationLookupService;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.akka.FlinkUntypedActor;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
-import org.apache.flink.runtime.query.KvStateLocation;
-import org.apache.flink.runtime.query.KvStateMessage.LookupKvStateLocation;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.TestLogger;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.actor.Status;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.util.ArrayDeque;
-import java.util.Queue;
-import java.util.UUID;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for {@link AkkaKvStateLocationLookupService}.
- */
-public class AkkaKvStateLocationLookupServiceTest extends TestLogger {
-
- /** The default timeout. */
- private static final FiniteDuration TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS);
-
- /** Test actor system shared between the tests. */
- private static ActorSystem testActorSystem;
-
- @BeforeClass
- public static void setUp() throws Exception {
- testActorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
- }
-
- @AfterClass
- public static void tearDown() throws Exception {
- if (testActorSystem != null) {
- testActorSystem.shutdown();
- }
- }
-
- /**
- * Tests responses if no leader notification has been reported or leadership
- * has been lost (leaderAddress = <code>null</code>).
- */
- @Test
- public void testNoJobManagerRegistered() throws Exception {
- TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(
- null,
- null);
- Queue<LookupKvStateLocation> received = new LinkedBlockingQueue<>();
-
- AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService(
- leaderRetrievalService,
- testActorSystem,
- TIMEOUT,
- new AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory());
-
- lookupService.start();
-
- //
- // No leader registered initially => fail with UnknownJobManager
- //
- try {
- JobID jobId = new JobID();
- String name = "coffee";
-
- Future<KvStateLocation> locationFuture = lookupService.getKvStateLookupInfo(jobId, name);
-
- Await.result(locationFuture, TIMEOUT);
- fail("Did not throw expected Exception");
- } catch (UnknownJobManager ignored) {
- // Expected
- }
-
- assertEquals("Received unexpected lookup", 0, received.size());
-
- //
- // Leader registration => communicate with new leader
- //
- UUID leaderSessionId = HighAvailabilityServices.DEFAULT_LEADER_ID;
- KvStateLocation expected = new KvStateLocation(new JobID(), new JobVertexID(), 8282, "tea");
-
- ActorRef testActor = LookupResponseActor.create(received, leaderSessionId, expected);
-
- String testActorAddress = AkkaUtils.getAkkaURL(testActorSystem, testActor);
-
- // Notify the service about a leader
- leaderRetrievalService.notifyListener(testActorAddress, leaderSessionId);
-
- JobID jobId = new JobID();
- String name = "tea";
-
- // Verify that the leader response is handled
- KvStateLocation location = Await.result(lookupService.getKvStateLookupInfo(jobId, name), TIMEOUT);
- assertEquals(expected, location);
-
- // Verify that the correct message was sent to the leader
- assertEquals(1, received.size());
-
- verifyLookupMsg(received.poll(), jobId, name);
-
- //
- // Leader loss => fail with UnknownJobManager
- //
- leaderRetrievalService.notifyListener(null, null);
-
- try {
- Future<KvStateLocation> locationFuture = lookupService
- .getKvStateLookupInfo(new JobID(), "coffee");
-
- Await.result(locationFuture, TIMEOUT);
- fail("Did not throw expected Exception");
- } catch (UnknownJobManager ignored) {
- // Expected
- }
-
- // No new messages received
- assertEquals(0, received.size());
- }
-
- /**
- * Tests that messages are properly decorated with the leader session ID.
- */
- @Test
- public void testLeaderSessionIdChange() throws Exception {
- TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(
- "localhost",
- HighAvailabilityServices.DEFAULT_LEADER_ID);
- Queue<LookupKvStateLocation> received = new LinkedBlockingQueue<>();
-
- AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService(
- leaderRetrievalService,
- testActorSystem,
- TIMEOUT,
- new AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory());
-
- lookupService.start();
-
- // Create test actors with random leader session IDs
- KvStateLocation expected1 = new KvStateLocation(new JobID(), new JobVertexID(), 8282, "salt");
- UUID leaderSessionId1 = UUID.randomUUID();
- ActorRef testActor1 = LookupResponseActor.create(received, leaderSessionId1, expected1);
- String testActorAddress1 = AkkaUtils.getAkkaURL(testActorSystem, testActor1);
-
- KvStateLocation expected2 = new KvStateLocation(new JobID(), new JobVertexID(), 22321, "pepper");
- UUID leaderSessionId2 = UUID.randomUUID();
- ActorRef testActor2 = LookupResponseActor.create(received, leaderSessionId1, expected2);
- String testActorAddress2 = AkkaUtils.getAkkaURL(testActorSystem, testActor2);
-
- JobID jobId = new JobID();
-
- //
- // Notify about first leader
- //
- leaderRetrievalService.notifyListener(testActorAddress1, leaderSessionId1);
-
- KvStateLocation location = Await.result(lookupService.getKvStateLookupInfo(jobId, "rock"), TIMEOUT);
- assertEquals(expected1, location);
-
- assertEquals(1, received.size());
- verifyLookupMsg(received.poll(), jobId, "rock");
-
- //
- // Notify about second leader
- //
- leaderRetrievalService.notifyListener(testActorAddress2, leaderSessionId2);
-
- location = Await.result(lookupService.getKvStateLookupInfo(jobId, "roll"), TIMEOUT);
- assertEquals(expected2, location);
-
- assertEquals(1, received.size());
- verifyLookupMsg(received.poll(), jobId, "roll");
- }
-
- /**
- * Tests that lookups are retried when no leader notification is available.
- */
- @Test
- public void testRetryOnUnknownJobManager() throws Exception {
- final Queue<AkkaKvStateLocationLookupService.LookupRetryStrategy> retryStrategies = new LinkedBlockingQueue<>();
-
- AkkaKvStateLocationLookupService.LookupRetryStrategyFactory retryStrategy =
- new AkkaKvStateLocationLookupService.LookupRetryStrategyFactory() {
- @Override
- public AkkaKvStateLocationLookupService.LookupRetryStrategy createRetryStrategy() {
- return retryStrategies.poll();
- }
- };
-
- final TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(
- null,
- null);
-
- AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService(
- leaderRetrievalService,
- testActorSystem,
- TIMEOUT,
- retryStrategy);
-
- lookupService.start();
-
- //
- // Test call to retry
- //
- final AtomicBoolean hasRetried = new AtomicBoolean();
- retryStrategies.add(
- new AkkaKvStateLocationLookupService.LookupRetryStrategy() {
- @Override
- public FiniteDuration getRetryDelay() {
- return FiniteDuration.Zero();
- }
-
- @Override
- public boolean tryRetry() {
- if (hasRetried.compareAndSet(false, true)) {
- return true;
- }
- return false;
- }
- });
-
- Future<KvStateLocation> locationFuture = lookupService.getKvStateLookupInfo(new JobID(), "yessir");
-
- Await.ready(locationFuture, TIMEOUT);
- assertTrue("Did not retry ", hasRetried.get());
-
- //
- // Test leader notification after retry
- //
- Queue<LookupKvStateLocation> received = new LinkedBlockingQueue<>();
-
- KvStateLocation expected = new KvStateLocation(new JobID(), new JobVertexID(), 12122, "garlic");
- ActorRef testActor = LookupResponseActor.create(received, null, expected);
- final String testActorAddress = AkkaUtils.getAkkaURL(testActorSystem, testActor);
-
- retryStrategies.add(new AkkaKvStateLocationLookupService.LookupRetryStrategy() {
- @Override
- public FiniteDuration getRetryDelay() {
- return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
- }
-
- @Override
- public boolean tryRetry() {
- leaderRetrievalService.notifyListener(testActorAddress, HighAvailabilityServices.DEFAULT_LEADER_ID);
- return true;
- }
- });
-
- KvStateLocation location = Await.result(lookupService.getKvStateLookupInfo(new JobID(), "yessir"), TIMEOUT);
- assertEquals(expected, location);
- }
-
- @Test
- public void testUnexpectedResponseType() throws Exception {
- TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(
- "localhost",
- HighAvailabilityServices.DEFAULT_LEADER_ID);
- Queue<LookupKvStateLocation> received = new LinkedBlockingQueue<>();
-
- AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService(
- leaderRetrievalService,
- testActorSystem,
- TIMEOUT,
- new AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory());
-
- lookupService.start();
-
- // Create test actors with random leader session IDs
- String expected = "unexpected-response-type";
- ActorRef testActor = LookupResponseActor.create(received, null, expected);
- String testActorAddress = AkkaUtils.getAkkaURL(testActorSystem, testActor);
-
- leaderRetrievalService.notifyListener(testActorAddress, null);
-
- try {
- Await.result(lookupService.getKvStateLookupInfo(new JobID(), "spicy"), TIMEOUT);
- fail("Did not throw expected Exception");
- } catch (Throwable ignored) {
- // Expected
- }
- }
-
- private static final class LookupResponseActor extends FlinkUntypedActor {
-
- /** Received lookup messages. */
- private final Queue<LookupKvStateLocation> receivedLookups;
-
- /** Responses on KvStateMessage.LookupKvStateLocation messages. */
- private final Queue<Object> lookupResponses;
-
- /** The leader session ID. */
- private UUID leaderSessionId;
-
- public LookupResponseActor(
- Queue<LookupKvStateLocation> receivedLookups,
- UUID leaderSessionId, Object... lookupResponses) {
-
- this.receivedLookups = Preconditions.checkNotNull(receivedLookups, "Received lookups");
- this.leaderSessionId = leaderSessionId;
- this.lookupResponses = new ArrayDeque<>();
-
- if (lookupResponses != null) {
- for (Object resp : lookupResponses) {
- this.lookupResponses.add(resp);
- }
- }
- }
-
- @Override
- public void handleMessage(Object message) throws Exception {
- if (message instanceof LookupKvStateLocation) {
- // Add to received lookups queue
- receivedLookups.add((LookupKvStateLocation) message);
-
- Object msg = lookupResponses.poll();
- if (msg != null) {
- if (msg instanceof Throwable) {
- sender().tell(new Status.Failure((Throwable) msg), self());
- } else {
- sender().tell(new Status.Success(msg), self());
- }
- }
- } else if (message instanceof UUID) {
- this.leaderSessionId = (UUID) message;
- } else {
- LOG.debug("Received unhandled message: {}", message);
- }
- }
-
- @Override
- protected UUID getLeaderSessionID() {
- return leaderSessionId;
- }
-
- private static ActorRef create(
- Queue<LookupKvStateLocation> receivedLookups,
- UUID leaderSessionId,
- Object... lookupResponses) {
-
- return testActorSystem.actorOf(Props.create(
- LookupResponseActor.class,
- receivedLookups,
- leaderSessionId,
- lookupResponses));
- }
- }
-
- private static void verifyLookupMsg(
- LookupKvStateLocation lookUpMsg,
- JobID expectedJobId,
- String expectedName) {
-
- assertNotNull(lookUpMsg);
- assertEquals(expectedJobId, lookUpMsg.getJobId());
- assertEquals(expectedName, lookUpMsg.getRegistrationName());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
new file mode 100644
index 0000000..b6f855e
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
@@ -0,0 +1,784 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
+import org.apache.flink.queryablestate.messages.KvStateResponse;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.network.messages.MessageType;
+import org.apache.flink.queryablestate.server.KvStateServerImpl;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats;
+import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
+import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+
+import org.junit.AfterClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.ConnectException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link Client}.
+ */
+public class ClientTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ClientTest.class);
+
+ // Thread pool for client bootstrap (shared between tests)
+ private static final NioEventLoopGroup NIO_GROUP = new NioEventLoopGroup();
+
+ private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10L, TimeUnit.SECONDS);
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ if (NIO_GROUP != null) {
+ NIO_GROUP.shutdownGracefully();
+ }
+ }
+
+ /**
+ * Tests simple queries, of which half succeed and half fail.
+ */
+ @Test
+ public void testSimpleRequests() throws Exception {
+ Deadline deadline = TEST_TIMEOUT.fromNow();
+ AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
+
+ MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+ new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+
+ Client<KvStateInternalRequest, KvStateResponse> client = null;
+ Channel serverChannel = null;
+
+ try {
+ client = new Client<>("Test Client", 1, serializer, stats);
+
+ // Random result
+ final byte[] expected = new byte[1024];
+ ThreadLocalRandom.current().nextBytes(expected);
+
+ final LinkedBlockingQueue<ByteBuf> received = new LinkedBlockingQueue<>();
+ final AtomicReference<Channel> channel = new AtomicReference<>();
+
+ serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() {
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ channel.set(ctx.channel());
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ received.add((ByteBuf) msg);
+ }
+ });
+
+ KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel);
+
+ long numQueries = 1024L;
+
+ List<CompletableFuture<KvStateResponse>> futures = new ArrayList<>();
+ for (long i = 0L; i < numQueries; i++) {
+ KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]);
+ futures.add(client.sendRequest(serverAddress, request));
+ }
+
+ // Respond to messages
+ Exception testException = new RuntimeException("Expected test Exception");
+
+ for (long i = 0L; i < numQueries; i++) {
+ ByteBuf buf = received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ assertNotNull("Receive timed out", buf);
+
+ Channel ch = channel.get();
+ assertNotNull("Channel not active", ch);
+
+ assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(buf));
+ long requestId = MessageSerializer.getRequestId(buf);
+ KvStateInternalRequest deserRequest = serializer.deserializeRequest(buf);
+
+ buf.release();
+
+ if (i % 2L == 0L) {
+ ByteBuf response = MessageSerializer.serializeResponse(
+ serverChannel.alloc(),
+ requestId,
+ new KvStateResponse(expected));
+
+ ch.writeAndFlush(response);
+ } else {
+ ByteBuf response = MessageSerializer.serializeRequestFailure(
+ serverChannel.alloc(),
+ requestId,
+ testException);
+
+ ch.writeAndFlush(response);
+ }
+ }
+
+ for (long i = 0L; i < numQueries; i++) {
+
+ if (i % 2L == 0L) {
+ KvStateResponse serializedResult = futures.get((int) i).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ assertArrayEquals(expected, serializedResult.getContent());
+ } else {
+ try {
+ futures.get((int) i).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ fail("Did not throw expected Exception");
+ } catch (ExecutionException e) {
+
+ if (!(e.getCause() instanceof RuntimeException)) {
+ fail("Did not throw expected Exception");
+ }
+ // else expected
+ }
+ }
+ }
+
+ assertEquals(numQueries, stats.getNumRequests());
+ long expectedRequests = numQueries / 2L;
+
+ // Counts can take some time to propagate
+ while (deadline.hasTimeLeft() && (stats.getNumSuccessful() != expectedRequests ||
+ stats.getNumFailed() != expectedRequests)) {
+ Thread.sleep(100L);
+ }
+
+ assertEquals(expectedRequests, stats.getNumSuccessful());
+ assertEquals(expectedRequests, stats.getNumFailed());
+ } finally {
+ if (client != null) {
+ client.shutdown();
+ }
+
+ if (serverChannel != null) {
+ serverChannel.close();
+ }
+
+ assertEquals("Channel leak", 0L, stats.getNumConnections());
+ }
+ }
+
+ /**
+ * Tests that a request to an unavailable host is failed with ConnectException.
+ */
+ @Test
+ public void testRequestUnavailableHost() throws Exception {
+ Deadline deadline = TEST_TIMEOUT.fromNow();
+ AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
+
+ MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+ new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+
+ Client<KvStateInternalRequest, KvStateResponse> client = null;
+
+ try {
+ client = new Client<>("Test Client", 1, serializer, stats);
+
+ int availablePort = NetUtils.getAvailablePort();
+
+ KvStateServerAddress serverAddress = new KvStateServerAddress(
+ InetAddress.getLocalHost(),
+ availablePort);
+
+ KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]);
+ CompletableFuture<KvStateResponse> future = client.sendRequest(serverAddress, request);
+
+ try {
+ future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ fail("Did not throw expected ConnectException");
+ } catch (ExecutionException e) {
+ if (!(e.getCause() instanceof ConnectException)) {
+ fail("Did not throw expected ConnectException");
+ }
+ // else expected
+ }
+ } finally {
+ if (client != null) {
+ client.shutdown();
+ }
+
+ assertEquals("Channel leak", 0L, stats.getNumConnections());
+ }
+ }
+
+ /**
+ * Multiple threads concurrently fire queries.
+ */
+ @Test
+ public void testConcurrentQueries() throws Exception {
+ Deadline deadline = TEST_TIMEOUT.fromNow();
+ AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
+
+ final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+ new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+
+ ExecutorService executor = null;
+ Client<KvStateInternalRequest, KvStateResponse> client = null;
+ Channel serverChannel = null;
+
+ final byte[] serializedResult = new byte[1024];
+ ThreadLocalRandom.current().nextBytes(serializedResult);
+
+ try {
+ int numQueryTasks = 4;
+ final int numQueriesPerTask = 1024;
+
+ executor = Executors.newFixedThreadPool(numQueryTasks);
+
+ client = new Client<>("Test Client", 1, serializer, stats);
+
+ serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() {
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ ByteBuf buf = (ByteBuf) msg;
+ assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(buf));
+ long requestId = MessageSerializer.getRequestId(buf);
+ KvStateInternalRequest request = serializer.deserializeRequest(buf);
+
+ buf.release();
+
+ KvStateResponse response = new KvStateResponse(serializedResult);
+ ByteBuf serResponse = MessageSerializer.serializeResponse(
+ ctx.alloc(),
+ requestId,
+ response);
+
+ ctx.channel().writeAndFlush(serResponse);
+ }
+ });
+
+ final KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel);
+
+ final Client<KvStateInternalRequest, KvStateResponse> finalClient = client;
+ Callable<List<CompletableFuture<KvStateResponse>>> queryTask = () -> {
+ List<CompletableFuture<KvStateResponse>> results = new ArrayList<>(numQueriesPerTask);
+
+ for (int i = 0; i < numQueriesPerTask; i++) {
+ KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]);
+ results.add(finalClient.sendRequest(serverAddress, request));
+ }
+
+ return results;
+ };
+
+ // Submit query tasks
+ List<Future<List<CompletableFuture<KvStateResponse>>>> futures = new ArrayList<>();
+ for (int i = 0; i < numQueryTasks; i++) {
+ futures.add(executor.submit(queryTask));
+ }
+
+ // Verify results
+ for (Future<List<CompletableFuture<KvStateResponse>>> future : futures) {
+ List<CompletableFuture<KvStateResponse>> results = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ for (CompletableFuture<KvStateResponse> result : results) {
+ KvStateResponse actual = result.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ assertArrayEquals(serializedResult, actual.getContent());
+ }
+ }
+
+ int totalQueries = numQueryTasks * numQueriesPerTask;
+
+ // Counts can take some time to propagate
+ while (deadline.hasTimeLeft() && stats.getNumSuccessful() != totalQueries) {
+ Thread.sleep(100L);
+ }
+
+ assertEquals(totalQueries, stats.getNumRequests());
+ assertEquals(totalQueries, stats.getNumSuccessful());
+ } finally {
+ if (executor != null) {
+ executor.shutdown();
+ }
+
+ if (serverChannel != null) {
+ serverChannel.close();
+ }
+
+ if (client != null) {
+ client.shutdown();
+ }
+
+ assertEquals("Channel leak", 0L, stats.getNumConnections());
+ }
+ }
+
+ /**
+ * Tests that a server failure closes the connection and removes it from
+ * the established connections.
+ */
+ @Test
+ public void testFailureClosesChannel() throws Exception {
+ Deadline deadline = TEST_TIMEOUT.fromNow();
+ AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
+
+ final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+ new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+
+ Client<KvStateInternalRequest, KvStateResponse> client = null;
+ Channel serverChannel = null;
+
+ try {
+ client = new Client<>("Test Client", 1, serializer, stats);
+
+ final LinkedBlockingQueue<ByteBuf> received = new LinkedBlockingQueue<>();
+ final AtomicReference<Channel> channel = new AtomicReference<>();
+
+ serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() {
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ channel.set(ctx.channel());
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ received.add((ByteBuf) msg);
+ }
+ });
+
+ KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel);
+
+ // Requests
+ List<Future<KvStateResponse>> futures = new ArrayList<>();
+ KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]);
+
+ futures.add(client.sendRequest(serverAddress, request));
+ futures.add(client.sendRequest(serverAddress, request));
+
+ ByteBuf buf = received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ assertNotNull("Receive timed out", buf);
+ buf.release();
+
+ buf = received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ assertNotNull("Receive timed out", buf);
+ buf.release();
+
+ assertEquals(1L, stats.getNumConnections());
+
+ Channel ch = channel.get();
+ assertNotNull("Channel not active", ch);
+
+ // Respond with failure
+ ch.writeAndFlush(MessageSerializer.serializeServerFailure(
+ serverChannel.alloc(),
+ new RuntimeException("Expected test server failure")));
+
+ try {
+ futures.remove(0).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ fail("Did not throw expected server failure");
+ } catch (ExecutionException e) {
+
+ if (!(e.getCause() instanceof RuntimeException)) {
+ fail("Did not throw expected Exception");
+ }
+ // Expected
+ }
+
+ try {
+ futures.remove(0).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ fail("Did not throw expected server failure");
+ } catch (ExecutionException e) {
+
+ if (!(e.getCause() instanceof RuntimeException)) {
+ fail("Did not throw expected Exception");
+ }
+ // Expected
+ }
+
+ assertEquals(0L, stats.getNumConnections());
+
+ // Counts can take some time to propagate
+ while (deadline.hasTimeLeft() && (stats.getNumSuccessful() != 0L || stats.getNumFailed() != 2L)) {
+ Thread.sleep(100L);
+ }
+
+ assertEquals(2L, stats.getNumRequests());
+ assertEquals(0L, stats.getNumSuccessful());
+ assertEquals(2L, stats.getNumFailed());
+ } finally {
+ if (client != null) {
+ client.shutdown();
+ }
+
+ if (serverChannel != null) {
+ serverChannel.close();
+ }
+
+ assertEquals("Channel leak", 0L, stats.getNumConnections());
+ }
+ }
+
+ /**
+ * Tests that a server channel close, closes the connection and removes it
+ * from the established connections.
+ */
+ @Test
+ public void testServerClosesChannel() throws Exception {
+ Deadline deadline = TEST_TIMEOUT.fromNow();
+ AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
+
+ final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+ new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+
+ Client<KvStateInternalRequest, KvStateResponse> client = null;
+ Channel serverChannel = null;
+
+ try {
+ client = new Client<>("Test Client", 1, serializer, stats);
+
+ final AtomicBoolean received = new AtomicBoolean();
+ final AtomicReference<Channel> channel = new AtomicReference<>();
+
+ serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() {
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ channel.set(ctx.channel());
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ received.set(true);
+ }
+ });
+
+ KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel);
+
+ // Requests
+ KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]);
+ Future<KvStateResponse> future = client.sendRequest(serverAddress, request);
+
+ while (!received.get() && deadline.hasTimeLeft()) {
+ Thread.sleep(50L);
+ }
+ assertTrue("Receive timed out", received.get());
+
+ assertEquals(1, stats.getNumConnections());
+
+ channel.get().close().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+
+ try {
+ future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ fail("Did not throw expected server failure");
+ } catch (ExecutionException e) {
+ if (!(e.getCause() instanceof ClosedChannelException)) {
+ fail("Did not throw expected Exception");
+ }
+ // Expected
+ }
+
+ assertEquals(0L, stats.getNumConnections());
+
+ // Counts can take some time to propagate
+ while (deadline.hasTimeLeft() && (stats.getNumSuccessful() != 0L || stats.getNumFailed() != 1L)) {
+ Thread.sleep(100L);
+ }
+
+ assertEquals(1L, stats.getNumRequests());
+ assertEquals(0L, stats.getNumSuccessful());
+ assertEquals(1L, stats.getNumFailed());
+ } finally {
+ if (client != null) {
+ client.shutdown();
+ }
+
+ if (serverChannel != null) {
+ serverChannel.close();
+ }
+
+ assertEquals("Channel leak", 0L, stats.getNumConnections());
+ }
+ }
+
+ /**
+ * Tests multiple clients querying multiple servers until 100k queries have
+ * been processed. At this point, the client is shut down and its verified
+ * that all ongoing requests are failed.
+ */
+ @Test
+ public void testClientServerIntegration() throws Exception {
+ // Config
+ final int numServers = 2;
+ final int numServerEventLoopThreads = 2;
+ final int numServerQueryThreads = 2;
+
+ final int numClientEventLoopThreads = 4;
+ final int numClientsTasks = 8;
+
+ final int batchSize = 16;
+
+ final int numKeyGroups = 1;
+
+ AbstractStateBackend abstractBackend = new MemoryStateBackend();
+ KvStateRegistry dummyRegistry = new KvStateRegistry();
+ DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+ dummyEnv.setKvStateRegistry(dummyRegistry);
+
+ AbstractKeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend(
+ dummyEnv,
+ new JobID(),
+ "test_op",
+ IntSerializer.INSTANCE,
+ numKeyGroups,
+ new KeyGroupRange(0, 0),
+ dummyRegistry.createTaskRegistry(new JobID(), new JobVertexID()));
+
+ final FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
+
+ AtomicKvStateRequestStats clientStats = new AtomicKvStateRequestStats();
+
+ final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+ new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+
+ Client<KvStateInternalRequest, KvStateResponse> client = null;
+ ExecutorService clientTaskExecutor = null;
+ final KvStateServerImpl[] server = new KvStateServerImpl[numServers];
+
+ try {
+ client = new Client<>("Test Client", numClientEventLoopThreads, serializer, clientStats);
+ clientTaskExecutor = Executors.newFixedThreadPool(numClientsTasks);
+
+ // Create state
+ ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
+ desc.setQueryable("any");
+
+ // Create servers
+ KvStateRegistry[] registry = new KvStateRegistry[numServers];
+ AtomicKvStateRequestStats[] serverStats = new AtomicKvStateRequestStats[numServers];
+ final KvStateID[] ids = new KvStateID[numServers];
+
+ for (int i = 0; i < numServers; i++) {
+ registry[i] = new KvStateRegistry();
+ serverStats[i] = new AtomicKvStateRequestStats();
+ server[i] = new KvStateServerImpl(
+ InetAddress.getLocalHost(),
+ 0,
+ numServerEventLoopThreads,
+ numServerQueryThreads,
+ registry[i],
+ serverStats[i]);
+
+ server[i].start();
+
+ backend.setCurrentKey(1010 + i);
+
+ // Value per server
+ ValueState<Integer> state = backend.getPartitionedState(VoidNamespace.INSTANCE,
+ VoidNamespaceSerializer.INSTANCE,
+ desc);
+
+ state.update(201 + i);
+
+ // we know it must be a KvStat but this is not exposed to the user via State
+ InternalKvState<?> kvState = (InternalKvState<?>) state;
+
+ // Register KvState (one state instance for all server)
+ ids[i] = registry[i].registerKvState(new JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "any", kvState);
+ }
+
+ final Client<KvStateInternalRequest, KvStateResponse> finalClient = client;
+ Callable<Void> queryTask = () -> {
+ while (true) {
+ if (Thread.interrupted()) {
+ throw new InterruptedException();
+ }
+
+ // Random server permutation
+ List<Integer> random = new ArrayList<>();
+ for (int j = 0; j < batchSize; j++) {
+ random.add(j);
+ }
+ Collections.shuffle(random);
+
+ // Dispatch queries
+ List<Future<KvStateResponse>> futures = new ArrayList<>(batchSize);
+
+ for (int j = 0; j < batchSize; j++) {
+ int targetServer = random.get(j) % numServers;
+
+ byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(
+ 1010 + targetServer,
+ IntSerializer.INSTANCE,
+ VoidNamespace.INSTANCE,
+ VoidNamespaceSerializer.INSTANCE);
+
+ KvStateInternalRequest request = new KvStateInternalRequest(ids[targetServer], serializedKeyAndNamespace);
+ futures.add(finalClient.sendRequest(server[targetServer].getServerAddress(), request));
+ }
+
+ // Verify results
+ for (int j = 0; j < batchSize; j++) {
+ int targetServer = random.get(j) % numServers;
+
+ Future<KvStateResponse> future = futures.get(j);
+ byte[] buf = future.get(timeout.toMillis(), TimeUnit.MILLISECONDS).getContent();
+ int value = KvStateSerializer.deserializeValue(buf, IntSerializer.INSTANCE);
+ assertEquals(201L + targetServer, value);
+ }
+ }
+ };
+
+ // Submit tasks
+ List<Future<Void>> taskFutures = new ArrayList<>();
+ for (int i = 0; i < numClientsTasks; i++) {
+ taskFutures.add(clientTaskExecutor.submit(queryTask));
+ }
+
+ long numRequests;
+ while ((numRequests = clientStats.getNumRequests()) < 100_000L) {
+ Thread.sleep(100L);
+ LOG.info("Number of requests {}/100_000", numRequests);
+ }
+
+ // Shut down
+ client.shutdown();
+
+ for (Future<Void> future : taskFutures) {
+ try {
+ future.get();
+ fail("Did not throw expected Exception after shut down");
+ } catch (ExecutionException t) {
+ if (t.getCause().getCause() instanceof ClosedChannelException ||
+ t.getCause().getCause() instanceof IllegalStateException) {
+ // Expected
+ } else {
+ t.printStackTrace();
+ fail("Failed with unexpected Exception type: " + t.getClass().getName());
+ }
+ }
+ }
+
+ assertEquals("Connection leak (client)", 0L, clientStats.getNumConnections());
+ for (int i = 0; i < numServers; i++) {
+ boolean success = false;
+ int numRetries = 0;
+ while (!success) {
+ try {
+ assertEquals("Connection leak (server)", 0L, serverStats[i].getNumConnections());
+ success = true;
+ } catch (Throwable t) {
+ if (numRetries < 10) {
+ LOG.info("Retrying connection leak check (server)");
+ Thread.sleep((numRetries + 1) * 50L);
+ numRetries++;
+ } else {
+ throw t;
+ }
+ }
+ }
+ }
+ } finally {
+ if (client != null) {
+ client.shutdown();
+ }
+
+ for (int i = 0; i < numServers; i++) {
+ if (server[i] != null) {
+ server[i].shutdown();
+ }
+ }
+
+ if (clientTaskExecutor != null) {
+ clientTaskExecutor.shutdown();
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ private Channel createServerChannel(final ChannelHandler... handlers) throws UnknownHostException, InterruptedException {
+ ServerBootstrap bootstrap = new ServerBootstrap()
+ // Bind address and port
+ .localAddress(InetAddress.getLocalHost(), 0)
+ // NIO server channels
+ .group(NIO_GROUP)
+ .channel(NioServerSocketChannel.class)
+ // See initializer for pipeline details
+ .childHandler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ protected void initChannel(SocketChannel ch) throws Exception {
+ ch.pipeline()
+ .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
+ .addLast(handlers);
+ }
+ });
+
+ return bootstrap.bind().sync().channel();
+ }
+
+ private KvStateServerAddress getKvStateServerAddress(Channel serverChannel) {
+ InetSocketAddress localAddress = (InetSocketAddress) serverChannel.localAddress();
+
+ return new KvStateServerAddress(localAddress.getAddress(), localAddress.getPort());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java
index 0b97bda..cb490aa 100644
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java
@@ -18,8 +18,8 @@
package org.apache.flink.queryablestate.network;
-import org.apache.flink.queryablestate.client.KvStateClientHandler;
-import org.apache.flink.queryablestate.client.KvStateClientHandlerCallback;
+import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
+import org.apache.flink.queryablestate.messages.KvStateResponse;
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
@@ -37,7 +37,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
/**
- * Tests for {@link KvStateClientHandler}.
+ * Tests for {@link ClientHandler}.
*/
public class KvStateClientHandlerTest {
@@ -47,28 +47,30 @@ public class KvStateClientHandlerTest {
*/
@Test
public void testReadCallbacksAndBufferRecycling() throws Exception {
- KvStateClientHandlerCallback callback = mock(KvStateClientHandlerCallback.class);
+ final ClientHandlerCallback<KvStateResponse> callback = mock(ClientHandlerCallback.class);
- EmbeddedChannel channel = new EmbeddedChannel(new KvStateClientHandler(callback));
+ final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+ new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+ final EmbeddedChannel channel = new EmbeddedChannel(new ClientHandler<>("Test Client", serializer, callback));
+
+ final byte[] content = new byte[0];
+ final KvStateResponse response = new KvStateResponse(content);
//
// Request success
//
- ByteBuf buf = MessageSerializer.serializeKvStateRequestResult(
- channel.alloc(),
- 1222112277,
- new byte[0]);
+ ByteBuf buf = MessageSerializer.serializeResponse(channel.alloc(), 1222112277L, response);
buf.skipBytes(4); // skip frame length
// Verify callback
channel.writeInbound(buf);
- verify(callback, times(1)).onRequestResult(eq(1222112277L), any(byte[].class));
+ verify(callback, times(1)).onRequestResult(eq(1222112277L), any(KvStateResponse.class));
assertEquals("Buffer not recycled", 0, buf.refCnt());
//
// Request failure
//
- buf = MessageSerializer.serializeKvStateRequestFailure(
+ buf = MessageSerializer.serializeRequestFailure(
channel.alloc(),
1222112278,
new RuntimeException("Expected test Exception"));