You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2020/05/17 13:47:50 UTC
[flink] branch master updated: [FLINK-17735][streaming] Add an
iterator to collect sink results through coordination rest api
This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 8650e40 [FLINK-17735][streaming] Add an iterator to collect sink results through coordination rest api
8650e40 is described below
commit 8650e409763a9b4eba92c77cc02ca22edc0230af
Author: TsReaper <ts...@gmail.com>
AuthorDate: Sun May 17 21:47:31 2020 +0800
[FLINK-17735][streaming] Add an iterator to collect sink results through coordination rest api
This closes #12073
---
.../operators/collect/CollectResultFetcher.java | 334 +++++++++++++++++++++
.../operators/collect/CollectResultIterator.java | 95 ++++++
.../api/operators/collect/CollectSinkFunction.java | 33 +-
.../collect/CollectSinkOperatorCoordinator.java | 22 +-
.../collect/CollectSinkOperatorFactory.java | 21 +-
.../collect/CollectResultIteratorTest.java | 132 ++++++++
.../operators/collect/CollectSinkFunctionTest.java | 206 ++++++++-----
.../CollectSinkOperatorCoordinatorTest.java | 6 +-
.../collect/utils/CollectRequestSender.java | 31 --
.../operators/collect/utils/TestCollectClient.java | 141 ---------
.../utils/TestCoordinationRequestHandler.java | 213 +++++++++++++
.../api/operators/collect/utils/TestJobClient.java | 133 ++++++++
12 files changed, 1095 insertions(+), 272 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultFetcher.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultFetcher.java
new file mode 100644
index 0000000..ec7286b
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultFetcher.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.collect;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequestGateway;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A fetcher which fetches query results from sink and provides exactly-once semantics.
+ */
+public class CollectResultFetcher<T> {
+
+ private static final int DEFAULT_RETRY_MILLIS = 100;
+ private static final long DEFAULT_ACCUMULATOR_GET_MILLIS = 10000;
+
+ private static final Logger LOG = LoggerFactory.getLogger(CollectResultFetcher.class);
+
+ private final CompletableFuture<OperatorID> operatorIdFuture;
+ private final String accumulatorName;
+ private final int retryMillis;
+
+ private ResultBuffer buffer;
+
+ @Nullable
+ private JobClient jobClient;
+ @Nullable
+ private CoordinationRequestGateway gateway;
+
+ private boolean jobTerminated;
+ private boolean closed;
+
+ public CollectResultFetcher(
+ CompletableFuture<OperatorID> operatorIdFuture,
+ TypeSerializer<T> serializer,
+ String accumulatorName) {
+ this(
+ operatorIdFuture,
+ serializer,
+ accumulatorName,
+ DEFAULT_RETRY_MILLIS);
+ }
+
+ CollectResultFetcher(
+ CompletableFuture<OperatorID> operatorIdFuture,
+ TypeSerializer<T> serializer,
+ String accumulatorName,
+ int retryMillis) {
+ this.operatorIdFuture = operatorIdFuture;
+ this.accumulatorName = accumulatorName;
+ this.retryMillis = retryMillis;
+
+ this.buffer = new ResultBuffer(serializer);
+
+ this.jobTerminated = false;
+ this.closed = false;
+ }
+
+ public void setJobClient(JobClient jobClient) {
+ Preconditions.checkArgument(
+ jobClient instanceof CoordinationRequestGateway,
+ "Job client must be a CoordinationRequestGateway. This is a bug.");
+ this.jobClient = jobClient;
+ this.gateway = (CoordinationRequestGateway) jobClient;
+ }
+
+ public T next() throws IOException {
+ if (closed) {
+ return null;
+ }
+
+ // this is to avoid sleeping before first try
+ boolean beforeFirstTry = true;
+ do {
+ T res = buffer.next();
+ if (res != null) {
+ // we still have user-visible results, just use them
+ return res;
+ } else if (jobTerminated) {
+ // no user-visible results, but job has terminated, we have to return
+ return null;
+ } else if (!beforeFirstTry) {
+ // no results but job is still running, sleep before retry
+ sleepBeforeRetry();
+ }
+ beforeFirstTry = false;
+
+ if (isJobTerminated()) {
+ // job terminated, read results from accumulator
+ jobTerminated = true;
+ Tuple2<Long, CollectCoordinationResponse<T>> accResults = getAccumulatorResults();
+ buffer.dealWithResponse(accResults.f1, accResults.f0);
+ buffer.complete();
+ } else {
+ // job still running, try to fetch some results
+ long requestOffset = buffer.offset;
+ CollectCoordinationResponse<T> response;
+ try {
+ response = sendRequest(buffer.version, requestOffset);
+ } catch (Exception e) {
+ LOG.warn("An exception occurs when fetching query results", e);
+ continue;
+ }
+ // the response will contain data (if any) starting exactly from requested offset
+ buffer.dealWithResponse(response, requestOffset);
+ }
+ } while (true);
+ }
+
+ public void close() {
+ if (closed) {
+ return;
+ }
+
+ cancelJob();
+ closed = true;
+ }
+
+ @SuppressWarnings("unchecked")
+ private CollectCoordinationResponse<T> sendRequest(
+ String version,
+ long offset) throws InterruptedException, ExecutionException {
+ checkJobClientConfigured();
+
+ OperatorID operatorId = operatorIdFuture.getNow(null);
+ Preconditions.checkNotNull(operatorId, "Unknown operator ID. This is a bug.");
+
+ CollectCoordinationRequest request = new CollectCoordinationRequest(version, offset);
+ return (CollectCoordinationResponse<T>) gateway.sendCoordinationRequest(operatorId, request).get();
+ }
+
+ private Tuple2<Long, CollectCoordinationResponse<T>> getAccumulatorResults() throws IOException {
+ checkJobClientConfigured();
+
+ JobExecutionResult executionResult;
+ try {
+ // this timeout is sort of hack, see comments in isJobTerminated for explanation
+ executionResult = jobClient.getJobExecutionResult(getClass().getClassLoader()).get(
+ DEFAULT_ACCUMULATOR_GET_MILLIS, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ throw new IOException("Failed to fetch job execution result", e);
+ }
+
+ ArrayList<byte[]> accResults = executionResult.getAccumulatorResult(accumulatorName);
+ if (accResults == null) {
+ // job terminates abnormally
+ throw new IOException("Job terminated abnormally, no job execution result can be fetched");
+ }
+
+ try {
+ List<byte[]> serializedResults =
+ SerializedListAccumulator.deserializeList(accResults, BytePrimitiveArraySerializer.INSTANCE);
+ byte[] serializedResult = serializedResults.get(0);
+ return CollectSinkFunction.deserializeAccumulatorResult(serializedResult);
+ } catch (ClassNotFoundException | IOException e) {
+ // this is impossible
+ throw new IOException("Failed to deserialize accumulator results", e);
+ }
+ }
+
+ private boolean isJobTerminated() {
+ checkJobClientConfigured();
+
+ try {
+ JobStatus status = jobClient.getJobStatus().get();
+ return status.isGloballyTerminalState();
+ } catch (Exception e) {
+ // TODO
+ // This is sort of hack.
+ // Currently different execution environment will have different behaviors
+ // when fetching a finished job status.
+ // For example, standalone session cluster will return a normal FINISHED,
+ // while mini cluster will throw IllegalStateException,
+ // and yarn per job will throw ApplicationNotFoundException.
+ // We have to assume that job has finished in this case.
+ // Change this when these behaviors are unified.
+ LOG.warn("Failed to get job status so we assume that the job has terminated. Some data might be lost.", e);
+ return true;
+ }
+ }
+
+ private void cancelJob() {
+ checkJobClientConfigured();
+
+ if (!isJobTerminated()) {
+ jobClient.cancel();
+ }
+ }
+
+ private void sleepBeforeRetry() {
+ if (retryMillis <= 0) {
+ return;
+ }
+
+ try {
+ // TODO a more proper retry strategy?
+ Thread.sleep(retryMillis);
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted when sleeping before a retry", e);
+ }
+ }
+
+ private void checkJobClientConfigured() {
+ Preconditions.checkNotNull(jobClient, "Job client must be configured before first use.");
+ Preconditions.checkNotNull(gateway, "Coordination request gateway must be configured before first use.");
+ }
+
+ /**
+ * A buffer which encapsulates the logic of dealing with the response from the {@link CollectSinkFunction}.
+ * See Java doc of {@link CollectSinkFunction} for explanation of this communication protocol.
+ */
+ private class ResultBuffer {
+
+ private static final String INIT_VERSION = "";
+
+ private final LinkedList<T> buffer;
+ private final TypeSerializer<T> serializer;
+
+ // for detailed explanation of the following 3 variables, see Java doc of CollectSinkFunction
+ // `version` is to check if the sink restarts
+ private String version;
+ // `offset` is the offset of the next result we want to fetch
+ private long offset;
+
+ // userVisibleHead <= user visible results offset < userVisibleTail
+ private long userVisibleHead;
+ private long userVisibleTail;
+
+ private ResultBuffer(TypeSerializer<T> serializer) {
+ this.buffer = new LinkedList<>();
+ this.serializer = serializer;
+
+ this.version = INIT_VERSION;
+ this.offset = 0;
+
+ this.userVisibleHead = 0;
+ this.userVisibleTail = 0;
+ }
+
+ private T next() {
+ if (userVisibleHead == userVisibleTail) {
+ return null;
+ }
+ T ret = buffer.removeFirst();
+ userVisibleHead++;
+
+ sanityCheck();
+ return ret;
+ }
+
+ private void dealWithResponse(CollectCoordinationResponse<T> response, long responseOffset) throws IOException {
+ String responseVersion = response.getVersion();
+ long responseLastCheckpointedOffset = response.getLastCheckpointedOffset();
+ List<T> results = response.getResults(serializer);
+
+ // we first check version in the response to decide whether we should throw away dirty results
+ if (!version.equals(responseVersion)) {
+ // sink restarted, we revert back to where the sink tells us
+ for (long i = 0; i < offset - responseLastCheckpointedOffset; i++) {
+ buffer.removeLast();
+ }
+ version = responseVersion;
+ offset = responseLastCheckpointedOffset;
+ }
+
+ // we now check if more results can be seen by the user
+ if (responseLastCheckpointedOffset > userVisibleTail) {
+ // lastCheckpointedOffset increases, this means that more results have been
+ // checkpointed, and we can give these results to the user
+ userVisibleTail = responseLastCheckpointedOffset;
+ }
+
+ if (!results.isEmpty()) {
+ // response contains some data, add them to buffer
+ int addStart = (int) (offset - responseOffset);
+ List<T> addedResults = results.subList(addStart, results.size());
+ buffer.addAll(addedResults);
+ offset += addedResults.size();
+ }
+
+ sanityCheck();
+ }
+
+ private void complete() {
+ userVisibleTail = offset;
+ }
+
+ private void sanityCheck() {
+ Preconditions.checkState(
+ userVisibleHead <= userVisibleTail,
+ "userVisibleHead should not be larger than userVisibleTail. This is a bug.");
+ Preconditions.checkState(
+ userVisibleTail <= offset,
+ "userVisibleTail should not be larger than offset. This is a bug.");
+ }
+ }
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultIterator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultIterator.java
new file mode 100644
index 0000000..bbf41e1
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultIterator.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.collect;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * An iterator which iterates through the results of a query job.
+ *
+ * <p>NOTE: After using this iterator, the close method MUST be called in order to release job related resources.
+ */
+public class CollectResultIterator<T> implements Iterator<T>, AutoCloseable {
+
+ private final CollectResultFetcher<T> fetcher;
+ private T bufferedResult;
+
+ public CollectResultIterator(
+ CompletableFuture<OperatorID> operatorIdFuture,
+ TypeSerializer<T> serializer,
+ String accumulatorName) {
+ this.fetcher = new CollectResultFetcher<>(operatorIdFuture, serializer, accumulatorName);
+ this.bufferedResult = null;
+ }
+
+ @VisibleForTesting
+ public CollectResultIterator(
+ CompletableFuture<OperatorID> operatorIdFuture,
+ TypeSerializer<T> serializer,
+ String accumulatorName,
+ int retryMillis) {
+ this.fetcher = new CollectResultFetcher<>(operatorIdFuture, serializer, accumulatorName, retryMillis);
+ this.bufferedResult = null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ // we have to make sure that the next result exists
+ // it is possible that there is no more result but the job is still running
+ if (bufferedResult == null) {
+ bufferedResult = nextResultFromFetcher();
+ }
+ return bufferedResult != null;
+ }
+
+ @Override
+ public T next() {
+ if (bufferedResult == null) {
+ bufferedResult = nextResultFromFetcher();
+ }
+ T ret = bufferedResult;
+ bufferedResult = null;
+ return ret;
+ }
+
+ @Override
+ public void close() throws Exception {
+ fetcher.close();
+ }
+
+ public void setJobClient(JobClient jobClient) {
+ fetcher.setJobClient(jobClient);
+ }
+
+ private T nextResultFromFetcher() {
+ try {
+ return fetcher.next();
+ } catch (IOException e) {
+ fetcher.close();
+ throw new RuntimeException("Failed to fetch next result", e);
+ }
+ }
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java
index 21e889b..64a3ffe 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.operators.collect;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListState;
@@ -78,7 +79,9 @@ import java.util.concurrent.locks.ReentrantLock;
* before this offset. Sink can safely throw these results away.</li>
* <li><strong>lastCheckpointedOffset</strong>:
* This is the value of <code>offset</code> when the checkpoint happens. This value will be
- * restored from the checkpoint and set back to <code>offset</code> when the sink restarts.</li>
+ * restored from the checkpoint and set back to <code>offset</code> when the sink restarts.
+ * Clients who need exactly-once semantics need to rely on this value for the position to
+ * revert when a failover happens.</li>
* </ol>
*
* <p>Client will put <code>version</code> and <code>offset</code> into the request, indicating that
@@ -97,9 +100,9 @@ import java.util.concurrent.locks.ReentrantLock;
* <ol>
* <li>If the version mismatches, client knows that sink has restarted. It will throw away all uncheckpointed
* results after <code>lastCheckpointedOffset</code>.</li>
- * <li>Otherwise the version matches. If <code>lastCheckpointedOffset</code> increases, client knows that
- * a checkpoint happens. It can now move all results before this offset to a user-visible buffer. If
- * the response also contains new results, client will now move these new results into uncheckpointed
+ * <li>If <code>lastCheckpointedOffset</code> increases, client knows that
+ * a checkpoint happens. It can now move all results before this offset to a user-visible buffer.</li>
+ * <li>If the response also contains new results, client will now move these new results into uncheckpointed
* buffer.</li>
* </ol>
*
@@ -264,7 +267,9 @@ public class CollectSinkFunction<IN> extends RichSinkFunction<IN> implements Che
// put results not consumed by the client into the accumulator
// so that we do not block the closing procedure while not throwing results away
SerializedListAccumulator<byte[]> accumulator = new SerializedListAccumulator<>();
- accumulator.add(serializeAccumulatorResult(), BytePrimitiveArraySerializer.INSTANCE);
+ accumulator.add(
+ serializeAccumulatorResult(offset, version, lastCheckpointedOffset, bufferedResults, serializer),
+ BytePrimitiveArraySerializer.INSTANCE);
getRuntimeContext().addAccumulator(accumulatorName, accumulator);
} finally {
bufferedResultsLock.unlock();
@@ -286,22 +291,28 @@ public class CollectSinkFunction<IN> extends RichSinkFunction<IN> implements Che
this.eventGateway = eventGateway;
}
- private byte[] serializeAccumulatorResult() throws IOException {
+ @VisibleForTesting
+ public static <T> byte[] serializeAccumulatorResult(
+ long offset,
+ String version,
+ long lastCheckpointedOffset,
+ List<T> bufferedResults,
+ TypeSerializer<T> serializer) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos);
wrapper.writeLong(offset);
- CollectCoordinationResponse<IN> finalResponse =
+ CollectCoordinationResponse<T> finalResponse =
new CollectCoordinationResponse<>(version, lastCheckpointedOffset, bufferedResults, serializer);
finalResponse.serialize(wrapper);
return baos.toByteArray();
}
- public static Tuple2<Long, CollectCoordinationResponse> deserializeAccumulatorResult(
+ public static <T> Tuple2<Long, CollectCoordinationResponse<T>> deserializeAccumulatorResult(
byte[] serializedAccResults) throws IOException {
ByteArrayInputStream bais = new ByteArrayInputStream(serializedAccResults);
DataInputViewStreamWrapper wrapper = new DataInputViewStreamWrapper(bais);
long token = wrapper.readLong();
- CollectCoordinationResponse finalResponse = new CollectCoordinationResponse(wrapper);
+ CollectCoordinationResponse<T> finalResponse = new CollectCoordinationResponse<>(wrapper);
return Tuple2.of(token, finalResponse);
}
@@ -395,8 +406,8 @@ public class CollectSinkFunction<IN> extends RichSinkFunction<IN> implements Che
private void close() {
running = false;
+ closeServerSocket();
closeCurrentConnection();
- closeServer();
}
private InetSocketAddress getServerSocketAddress() {
@@ -444,7 +455,7 @@ public class CollectSinkFunction<IN> extends RichSinkFunction<IN> implements Che
}
}
- private void closeServer() {
+ private void closeServerSocket() {
try {
serverSocket.close();
} catch (Exception e) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
index dea9bf3..6c84266 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
@@ -56,6 +56,8 @@ public class CollectSinkOperatorCoordinator implements OperatorCoordinator, Coor
private static final Logger LOG = LoggerFactory.getLogger(CollectSinkOperatorCoordinator.class);
+ private final int socketTimeout;
+
private InetSocketAddress address;
private Socket socket;
private DataInputViewStreamWrapper inStream;
@@ -63,6 +65,10 @@ public class CollectSinkOperatorCoordinator implements OperatorCoordinator, Coor
private ExecutorService executorService;
+ public CollectSinkOperatorCoordinator(int socketTimeout) {
+ this.socketTimeout = socketTimeout;
+ }
+
@Override
public void start() throws Exception {
this.executorService =
@@ -107,12 +113,20 @@ public class CollectSinkOperatorCoordinator implements OperatorCoordinator, Coor
CollectCoordinationRequest request,
CompletableFuture<CoordinationResponse> responseFuture,
InetSocketAddress sinkAddress) {
+ if (sinkAddress == null) {
+ closeConnection();
+ completeWithEmptyResponse(request, responseFuture);
+ return;
+ }
+
try {
if (socket == null) {
- socket = new Socket(sinkAddress.getAddress(), sinkAddress.getPort());
+ socket = new Socket();
+ socket.setSoTimeout(socketTimeout);
socket.setKeepAlive(true);
socket.setTcpNoDelay(true);
+ socket.connect(sinkAddress);
inStream = new DataInputViewStreamWrapper(socket.getInputStream());
outStream = new DataOutputViewStreamWrapper(socket.getOutputStream());
LOG.info("Sink connection established");
@@ -200,9 +214,11 @@ public class CollectSinkOperatorCoordinator implements OperatorCoordinator, Coor
public static class Provider implements OperatorCoordinator.Provider {
private final OperatorID operatorId;
+ private final int socketTimeout;
- public Provider(OperatorID operatorId) {
+ public Provider(OperatorID operatorId, int socketTimeout) {
this.operatorId = operatorId;
+ this.socketTimeout = socketTimeout;
}
@Override
@@ -213,7 +229,7 @@ public class CollectSinkOperatorCoordinator implements OperatorCoordinator, Coor
@Override
public OperatorCoordinator create(Context context) {
// we do not send operator event so we don't need a context
- return new CollectSinkOperatorCoordinator();
+ return new CollectSinkOperatorCoordinator(socketTimeout);
}
}
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorFactory.java
index 8fa9843..e177f79 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorFactory.java
@@ -17,6 +17,7 @@
package org.apache.flink.streaming.api.operators.collect;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
@@ -28,19 +29,25 @@ import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
/**
* The Factory class for {@link CollectSinkOperator}.
*/
-@SuppressWarnings("unchecked")
-public class CollectSinkOperatorFactory extends SimpleUdfStreamOperatorFactory<Object> implements CoordinatedOperatorFactory<Object> {
+public class CollectSinkOperatorFactory<IN> extends SimpleUdfStreamOperatorFactory<Object> implements CoordinatedOperatorFactory<Object> {
private static final long serialVersionUID = 1L;
- private final CollectSinkOperator<?> operator;
+ private final CollectSinkOperator<IN> operator;
+ private final int socketTimeout;
- public CollectSinkOperatorFactory(CollectSinkOperator<?> operator) {
- super(operator);
- this.operator = operator;
+ public CollectSinkOperatorFactory(
+ TypeSerializer<IN> serializer,
+ int maxResultsPerBatch,
+ String accumulatorName,
+ int socketTimeout) {
+ super(new CollectSinkOperator<>(serializer, maxResultsPerBatch, accumulatorName));
+ this.operator = (CollectSinkOperator<IN>) getOperator();
+ this.socketTimeout = socketTimeout;
}
@Override
+ @SuppressWarnings("unchecked")
public <T extends StreamOperator<Object>> T createStreamOperator(StreamOperatorParameters<Object> parameters) {
final OperatorID operatorId = parameters.getStreamConfig().getOperatorID();
final OperatorEventDispatcher eventDispatcher = parameters.getOperatorEventDispatcher();
@@ -55,6 +62,6 @@ public class CollectSinkOperatorFactory extends SimpleUdfStreamOperatorFactory<O
@Override
public OperatorCoordinator.Provider getCoordinatorProvider(String operatorName, OperatorID operatorID) {
operator.getOperatorIdFuture().complete(operatorID);
- return new CollectSinkOperatorCoordinator.Provider(operatorID);
+ return new CollectSinkOperatorCoordinator.Provider(operatorID, socketTimeout);
}
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectResultIteratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectResultIteratorTest.java
new file mode 100644
index 0000000..021689f
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectResultIteratorTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.collect;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.operators.collect.utils.TestCoordinationRequestHandler;
+import org.apache.flink.streaming.api.operators.collect.utils.TestJobClient;
+import org.apache.flink.util.OptionalFailure;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Tests for {@link CollectResultIterator}.
+ */
+public class CollectResultIteratorTest extends TestLogger {
+
+ private static final OperatorID TEST_OPERATOR_ID = new OperatorID();
+ private static final JobID TEST_JOB_ID = new JobID();
+ private static final String ACCUMULATOR_NAME = "accumulatorName";
+
+ @Test
+ public void testIteratorWithCheckpointAndFailure() throws Exception {
+ // run this random test multiple times
+ for (int testCount = 1000; testCount > 0; testCount--) {
+ List<Integer> expected = new ArrayList<>();
+ for (int i = 0; i < 200; i++) {
+ expected.add(i);
+ }
+
+ CollectResultIterator<Integer> iterator = createIteratorAndJobClient(expected, IntSerializer.INSTANCE).f0;
+
+ List<Integer> actual = new ArrayList<>();
+ while (iterator.hasNext()) {
+ actual.add(iterator.next());
+ }
+ Assert.assertEquals(expected.size(), actual.size());
+
+ Collections.sort(expected);
+ Collections.sort(actual);
+ Assert.assertArrayEquals(expected.toArray(new Integer[0]), actual.toArray(new Integer[0]));
+
+ iterator.close();
+ }
+ }
+
+ @Test
+ public void testEarlyClose() throws Exception {
+ List<Integer> expected = new ArrayList<>();
+ for (int i = 0; i < 200; i++) {
+ expected.add(i);
+ }
+
+ Tuple2<CollectResultIterator<Integer>, JobClient> tuple2 =
+ createIteratorAndJobClient(expected, IntSerializer.INSTANCE);
+ CollectResultIterator<Integer> iterator = tuple2.f0;
+ JobClient jobClient = tuple2.f1;
+
+ for (int i = 0; i < 100; i++) {
+ Assert.assertTrue(iterator.hasNext());
+ Assert.assertNotNull(iterator.next());
+ }
+ Assert.assertTrue(iterator.hasNext());
+ iterator.close();
+
+ Assert.assertEquals(JobStatus.CANCELED, jobClient.getJobStatus().get());
+ }
+
+ private Tuple2<CollectResultIterator<Integer>, JobClient> createIteratorAndJobClient(
+ List<Integer> expected,
+ TypeSerializer<Integer> serializer) {
+ CollectResultIterator<Integer> iterator = new CollectResultIterator<>(
+ CompletableFuture.completedFuture(TEST_OPERATOR_ID),
+ serializer,
+ ACCUMULATOR_NAME,
+ 0);
+
+ TestCoordinationRequestHandler<Integer> handler = new TestCoordinationRequestHandler<>(
+ expected, serializer, ACCUMULATOR_NAME);
+
+ TestJobClient.JobInfoProvider infoProvider = new TestJobClient.JobInfoProvider() {
+
+ @Override
+ public boolean isJobFinished() {
+ return handler.isClosed();
+ }
+
+ @Override
+ public Map<String, OptionalFailure<Object>> getAccumulatorResults() {
+ return handler.getAccumulatorResults();
+ }
+ };
+
+ TestJobClient jobClient = new TestJobClient(
+ TEST_JOB_ID,
+ TEST_OPERATOR_ID,
+ handler,
+ infoProvider);
+ iterator.setJobClient(jobClient);
+
+ return Tuple2.of(iterator, jobClient);
+ }
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunctionTest.java
index 1483998..1363d87 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunctionTest.java
@@ -17,25 +17,24 @@
package org.apache.flink.streaming.api.operators.collect;
-import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.api.operators.collect.utils.CollectRequestSender;
import org.apache.flink.streaming.api.operators.collect.utils.MockFunctionInitializationContext;
import org.apache.flink.streaming.api.operators.collect.utils.MockFunctionSnapshotContext;
import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventGateway;
-import org.apache.flink.streaming.api.operators.collect.utils.TestCollectClient;
+import org.apache.flink.streaming.api.operators.collect.utils.TestJobClient;
import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
-import org.apache.flink.types.Row;
+import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.TestLogger;
import org.junit.After;
@@ -47,11 +46,13 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.Comparator;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
+import java.util.Map;
import java.util.Random;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
/**
@@ -61,13 +62,16 @@ public class CollectSinkFunctionTest extends TestLogger {
private static final int MAX_RESULTS_PER_BATCH = 3;
private static final String ACCUMULATOR_NAME = "tableCollectAccumulator";
- private static final long TIME_OUT_MILLIS = 10000;
+ private static final int FUTURE_TIMEOUT_MILLIS = 10000;
+ private static final int SOCKET_TIMEOUT_MILLIS = 1000;
private static final int MAX_RETIRES = 100;
- private static final TypeSerializer<Row> serializer =
- new RowTypeInfo(BasicTypeInfo.INT_TYPE_INFO).createSerializer(new ExecutionConfig());
+ private static final JobID TEST_JOB_ID = new JobID();
+ private static final OperatorID TEST_OPERATOR_ID = new OperatorID();
- private CollectSinkFunction<Row> function;
+ private static final TypeSerializer<Integer> serializer = IntSerializer.INSTANCE;
+
+ private CollectSinkFunction<Integer> function;
private CollectSinkOperatorCoordinator coordinator;
private MockFunctionInitializationContext functionInitializationContext;
private boolean jobFinished;
@@ -81,7 +85,7 @@ public class CollectSinkFunctionTest extends TestLogger {
ioManager = new IOManagerAsync();
runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, ioManager);
gateway = new MockOperatorEventGateway();
- coordinator = new CollectSinkOperatorCoordinator();
+ coordinator = new CollectSinkOperatorCoordinator(SOCKET_TIMEOUT_MILLIS);
coordinator.start();
// only used in checkpointed tests
@@ -101,10 +105,10 @@ public class CollectSinkFunctionTest extends TestLogger {
openFunction();
for (int i = 0; i < 6; i++) {
// CollectSinkFunction never use context when invoked
- function.invoke(Row.of(i), null);
+ function.invoke(i, null);
}
- CollectCoordinationResponse<Row> response = sendRequestAndGetValidResponse("", 0);
+ CollectCoordinationResponse<Integer> response = sendRequestAndGetValidResponse("", 0);
Assert.assertEquals(0, response.getLastCheckpointedOffset());
String version = response.getVersion();
@@ -118,7 +122,7 @@ public class CollectSinkFunctionTest extends TestLogger {
assertResponseEquals(response, version, 0, Collections.emptyList());
for (int i = 6; i < 10; i++) {
- function.invoke(Row.of(i), null);
+ function.invoke(i, null);
}
// invalid request
@@ -135,7 +139,7 @@ public class CollectSinkFunctionTest extends TestLogger {
assertResponseEquals(response, version, 0, Collections.emptyList());
for (int i = 10; i < 16; i++) {
- function.invoke(Row.of(i), null);
+ function.invoke(i, null);
}
response = sendRequestAndGetValidResponse(version, 12);
@@ -151,10 +155,10 @@ public class CollectSinkFunctionTest extends TestLogger {
openFunctionWithState();
for (int i = 0; i < 2; i++) {
// CollectSinkFunction never use context when invoked
- function.invoke(Row.of(i), null);
+ function.invoke(i, null);
}
- CollectCoordinationResponse<Row> response = sendRequestAndGetValidResponse("", 0);
+ CollectCoordinationResponse<Integer> response = sendRequestAndGetValidResponse("", 0);
Assert.assertEquals(0, response.getLastCheckpointedOffset());
String version = response.getVersion();
@@ -162,7 +166,7 @@ public class CollectSinkFunctionTest extends TestLogger {
assertResponseEquals(response, version, 0, Arrays.asList(0, 1));
for (int i = 2; i < 6; i++) {
- function.invoke(Row.of(i), null);
+ function.invoke(i, null);
}
response = sendRequestAndGetValidResponse(version, 3);
@@ -181,7 +185,7 @@ public class CollectSinkFunctionTest extends TestLogger {
assertResponseEquals(response, version, 3, Arrays.asList(4, 5));
for (int i = 6; i < 9; i++) {
- function.invoke(Row.of(i), null);
+ function.invoke(i, null);
}
response = sendRequestAndGetValidResponse(version, 6);
@@ -192,7 +196,7 @@ public class CollectSinkFunctionTest extends TestLogger {
openFunctionWithState();
for (int i = 9; i < 12; i++) {
- function.invoke(Row.of(i), null);
+ function.invoke(i, null);
}
response = sendRequestAndGetValidResponse(version, 4);
@@ -208,7 +212,7 @@ public class CollectSinkFunctionTest extends TestLogger {
checkpointFunction(2);
checkpointComplete(2);
- function.invoke(Row.of(12), null);
+ function.invoke(12, null);
response = sendRequestAndGetValidResponse(version, 7);
assertResponseEquals(response, version, 6, Arrays.asList(10, 11, 12));
@@ -228,7 +232,7 @@ public class CollectSinkFunctionTest extends TestLogger {
assertResponseEquals(response, version, 6, Collections.emptyList());
for (int i = 13; i < 17; i++) {
- function.invoke(Row.of(i), null);
+ function.invoke(i, null);
}
response = sendRequestAndGetValidResponse(version, 9);
@@ -249,7 +253,7 @@ public class CollectSinkFunctionTest extends TestLogger {
assertResponseEquals(response, version, 9, Collections.singletonList(16));
for (int i = 17; i < 20; i++) {
- function.invoke(Row.of(i), null);
+ function.invoke(i, null);
}
response = sendRequestAndGetValidResponse(version, 12);
@@ -270,7 +274,7 @@ public class CollectSinkFunctionTest extends TestLogger {
assertResponseEquals(response, version, 9, Collections.singletonList(16));
for (int i = 20; i < 23; i++) {
- function.invoke(Row.of(i), null);
+ function.invoke(i, null);
}
response = sendRequestAndGetValidResponse(version, 12);
@@ -290,13 +294,9 @@ public class CollectSinkFunctionTest extends TestLogger {
expected.add(i);
}
UncheckpointedDataFeeder feeder = new UncheckpointedDataFeeder(expected);
- TestCollectClient<Row> client = new TestCollectClient<>(
- serializer,
- new TestCollectRequestSender(),
- () -> jobFinished);
- runFunctionWithClient(feeder, client);
- assertResultsEqualAfterSort(expected, client.getResults());
+ List<Integer> actual = runFunctionRandomTest(feeder);
+ assertResultsEqualAfterSort(expected, actual);
after();
before();
@@ -312,23 +312,22 @@ public class CollectSinkFunctionTest extends TestLogger {
expected.add(i);
}
CheckpointedDataFeeder feeder = new CheckpointedDataFeeder(expected);
- TestCollectClient<Row> client = new TestCollectClient<>(
- serializer,
- new TestCollectRequestSender(),
- () -> jobFinished);
- runFunctionWithClient(feeder, client);
- assertResultsEqualAfterSort(expected, client.getResults());
+ List<Integer> actual = runFunctionRandomTest(feeder);
+ assertResultsEqualAfterSort(expected, actual);
after();
before();
}
}
- private void runFunctionWithClient(Thread feeder, Thread client) throws Exception {
+ private List<Integer> runFunctionRandomTest(Thread feeder) throws Exception {
+ CollectClient client = new CollectClient();
+
Thread.UncaughtExceptionHandler exceptionHandler = (t, e) -> {
feeder.interrupt();
client.interrupt();
+ e.printStackTrace();
};
feeder.setUncaughtExceptionHandler(exceptionHandler);
client.setUncaughtExceptionHandler(exceptionHandler);
@@ -337,10 +336,13 @@ public class CollectSinkFunctionTest extends TestLogger {
client.start();
feeder.join();
client.join();
+
+ return client.results;
}
private void openFunction() throws Exception {
- function = new CollectSinkFunction<>(serializer, MAX_RESULTS_PER_BATCH, ACCUMULATOR_NAME);
+ function = new CollectSinkFunction<>(
+ serializer, MAX_RESULTS_PER_BATCH, ACCUMULATOR_NAME);
function.setRuntimeContext(runtimeContext);
function.setOperatorEventGateway(gateway);
function.open(new Configuration());
@@ -349,7 +351,8 @@ public class CollectSinkFunctionTest extends TestLogger {
private void openFunctionWithState() throws Exception {
functionInitializationContext.getOperatorStateStore().revertToLastSuccessCheckpoint();
- function = new CollectSinkFunction<>(serializer, MAX_RESULTS_PER_BATCH, ACCUMULATOR_NAME);
+ function = new CollectSinkFunction<>(
+ serializer, MAX_RESULTS_PER_BATCH, ACCUMULATOR_NAME);
function.setRuntimeContext(runtimeContext);
function.setOperatorEventGateway(gateway);
function.initializeState(functionInitializationContext);
@@ -382,19 +385,19 @@ public class CollectSinkFunctionTest extends TestLogger {
}
@SuppressWarnings("unchecked")
- private CollectCoordinationResponse<Row> sendRequest(
+ private CollectCoordinationResponse<Integer> sendRequest(
String version,
long offset) throws Exception {
CollectCoordinationRequest request = new CollectCoordinationRequest(version, offset);
// we add a timeout to not block the tests
return ((CollectCoordinationResponse) coordinator
- .handleCoordinationRequest(request).get(TIME_OUT_MILLIS, TimeUnit.MILLISECONDS));
+ .handleCoordinationRequest(request).get(FUTURE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
}
- private CollectCoordinationResponse<Row> sendRequestAndGetValidResponse(
+ private CollectCoordinationResponse<Integer> sendRequestAndGetValidResponse(
String version,
long offset) throws Exception {
- CollectCoordinationResponse<Row> response;
+ CollectCoordinationResponse<Integer> response;
for (int i = 0; i < MAX_RETIRES; i++) {
response = sendRequest(version, offset);
if (response.getLastCheckpointedOffset() >= 0) {
@@ -405,7 +408,7 @@ public class CollectSinkFunctionTest extends TestLogger {
}
@SuppressWarnings("unchecked")
- private Tuple2<Long, CollectCoordinationResponse> getAccumualtorResults() throws Exception {
+ private Tuple2<Long, CollectCoordinationResponse<Integer>> getAccumualtorResults() throws Exception {
Accumulator accumulator = runtimeContext.getAccumulator(ACCUMULATOR_NAME);
ArrayList<byte[]> accLocalValue = ((SerializedListAccumulator) accumulator).getLocalValue();
List<byte[]> serializedResults =
@@ -416,41 +419,35 @@ public class CollectSinkFunctionTest extends TestLogger {
}
private void assertResponseEquals(
- CollectCoordinationResponse<Row> response,
+ CollectCoordinationResponse<Integer> response,
String version,
long lastCheckpointedOffset,
List<Integer> expected) throws IOException {
Assert.assertEquals(version, response.getVersion());
Assert.assertEquals(lastCheckpointedOffset, response.getLastCheckpointedOffset());
- List<Row> results = response.getResults(serializer);
+ List<Integer> results = response.getResults(serializer);
assertResultsEqual(expected, results);
}
- private void assertResultsEqual(List<Integer> expected, List<Row> actual) {
- Assert.assertEquals(expected.size(), actual.size());
- for (int i = 0; i < expected.size(); i++) {
- Row row = actual.get(i);
- Assert.assertEquals(1, row.getArity());
- Assert.assertEquals(expected.get(i), row.getField(0));
- }
+ private void assertResultsEqual(List<Integer> expected, List<Integer> actual) {
+ Assert.assertArrayEquals(expected.toArray(new Integer[0]), actual.toArray(new Integer[0]));
}
- private void assertResultsEqualAfterSort(List<Integer> expected, List<Row> actual) {
+ private void assertResultsEqualAfterSort(List<Integer> expected, List<Integer> actual) {
Collections.sort(expected);
- actual.sort(Comparator.comparingInt(row -> (int) row.getField(0)));
+ Collections.sort(actual);
assertResultsEqual(expected, actual);
}
- @SuppressWarnings("unchecked")
private void assertAccumulatorResult(
long expectedOffset,
String expectedVersion,
long expectedLastCheckpointedOffset,
List<Integer> expectedResults) throws Exception {
- Tuple2<Long, CollectCoordinationResponse> accResults = getAccumualtorResults();
+ Tuple2<Long, CollectCoordinationResponse<Integer>> accResults = getAccumualtorResults();
long offset = accResults.f0;
- CollectCoordinationResponse response = accResults.f1;
- List<Row> actualResults = response.getResults(serializer);
+ CollectCoordinationResponse<Integer> response = accResults.f1;
+ List<Integer> actualResults = response.getResults(serializer);
Assert.assertEquals(expectedOffset, offset);
Assert.assertEquals(expectedVersion, response.getVersion());
@@ -458,19 +455,6 @@ public class CollectSinkFunctionTest extends TestLogger {
assertResultsEqual(expectedResults, actualResults);
}
- private class TestCollectRequestSender implements CollectRequestSender<Row> {
-
- @Override
- public CollectCoordinationResponse<Row> sendRequest(String version, long offset) throws Exception {
- return CollectSinkFunctionTest.this.sendRequest(version, offset);
- }
-
- @Override
- public Tuple2<Long, CollectCoordinationResponse> getAccumulatorResults() throws Exception {
- return CollectSinkFunctionTest.this.getAccumualtorResults();
- }
- }
-
/**
* A thread feeding data to the function. It will fail when half of the data is fed.
*/
@@ -496,7 +480,7 @@ public class CollectSinkFunctionTest extends TestLogger {
while (data.size() > 0) {
int size = Math.min(data.size(), random.nextInt(MAX_RESULTS_PER_BATCH * 3) + 1);
for (int i = 0; i < size; i++) {
- function.invoke(Row.of(data.removeFirst()), null);
+ function.invoke(data.removeFirst(), null);
}
if (!failedBefore && data.size() < checkpointedData.size() / 2) {
@@ -519,7 +503,6 @@ public class CollectSinkFunctionTest extends TestLogger {
finishJob();
} catch (Exception e) {
- e.printStackTrace();
throw new RuntimeException(e);
}
}
@@ -571,7 +554,7 @@ public class CollectSinkFunctionTest extends TestLogger {
// with 60% chance we add some data
int size = Math.min(data.size(), random.nextInt(MAX_RESULTS_PER_BATCH * 3) + 1);
for (int i = 0; i < size; i++) {
- function.invoke(Row.of(data.removeFirst()), null);
+ function.invoke(data.removeFirst(), null);
}
} else if (r < 9) {
// with 30% chance we make a checkpoint
@@ -603,12 +586,14 @@ public class CollectSinkFunctionTest extends TestLogger {
finishJob();
} catch (Exception e) {
- e.printStackTrace();
throw new RuntimeException(e);
}
}
}
+ /**
+ * Countdown for a checkpoint which will succeed in the future.
+ */
private static class CheckpointCountdown {
private long id;
@@ -629,4 +614,71 @@ public class CollectSinkFunctionTest extends TestLogger {
return false;
}
}
+
+ /**
+ * A thread collecting results with the collecting iterator.
+ */
+ private class CollectClient extends Thread {
+
+ private List<Integer> results;
+ private CollectResultIterator<Integer> iterator;
+
+ private CollectClient() {
+ this.results = new ArrayList<>();
+
+ this.iterator = new CollectResultIterator<>(
+ CompletableFuture.completedFuture(TEST_OPERATOR_ID),
+ serializer,
+ ACCUMULATOR_NAME,
+ 0
+ );
+
+ TestJobClient.JobInfoProvider infoProvider = new TestJobClient.JobInfoProvider() {
+
+ @Override
+ public boolean isJobFinished() {
+ return jobFinished;
+ }
+
+ @Override
+ public Map<String, OptionalFailure<Object>> getAccumulatorResults() {
+ Map<String, OptionalFailure<Object>> accumulatorResults = new HashMap<>();
+ accumulatorResults.put(
+ ACCUMULATOR_NAME,
+ OptionalFailure.of(runtimeContext.getAccumulator(ACCUMULATOR_NAME).getLocalValue()));
+ return accumulatorResults;
+ }
+ };
+
+ TestJobClient jobClient = new TestJobClient(
+ TEST_JOB_ID,
+ TEST_OPERATOR_ID,
+ coordinator,
+ infoProvider);
+
+ iterator.setJobClient(jobClient);
+ }
+
+ @Override
+ public void run() {
+ Random random = new Random();
+
+ while (iterator.hasNext()) {
+ results.add(iterator.next());
+ if (random.nextBoolean()) {
+ try {
+ Thread.sleep(5);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+
+ try {
+ iterator.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java
index d2ad57b..66a624a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java
@@ -45,6 +45,8 @@ import java.util.concurrent.CompletableFuture;
*/
public class CollectSinkOperatorCoordinatorTest {
+ private static final int SOCKET_TIMEOUT_MILLIS = 1000;
+
private static final TypeSerializer<Row> serializer = new RowTypeInfo(
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO
@@ -52,7 +54,7 @@ public class CollectSinkOperatorCoordinatorTest {
@Test
public void testNoAddress() throws Exception {
- CollectSinkOperatorCoordinator coordinator = new CollectSinkOperatorCoordinator();
+ CollectSinkOperatorCoordinator coordinator = new CollectSinkOperatorCoordinator(SOCKET_TIMEOUT_MILLIS);
coordinator.start();
CollectCoordinationRequest request = new CollectCoordinationRequest("version", 123);
@@ -65,7 +67,7 @@ public class CollectSinkOperatorCoordinatorTest {
@Test
public void testServerFailure() throws Exception {
- CollectSinkOperatorCoordinator coordinator = new CollectSinkOperatorCoordinator();
+ CollectSinkOperatorCoordinator coordinator = new CollectSinkOperatorCoordinator(SOCKET_TIMEOUT_MILLIS);
coordinator.start();
List<List<Row>> expected = Arrays.asList(
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/CollectRequestSender.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/CollectRequestSender.java
deleted file mode 100644
index fc7b759..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/CollectRequestSender.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.collect.utils;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.operators.collect.CollectCoordinationResponse;
-
-/**
- * Testing interface for sending collect requests.
- */
-public interface CollectRequestSender<T> {
-
- CollectCoordinationResponse<T> sendRequest(String version, long offset) throws Exception;
-
- Tuple2<Long, CollectCoordinationResponse> getAccumulatorResults() throws Exception;
-}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/TestCollectClient.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/TestCollectClient.java
deleted file mode 100644
index 9040971..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/TestCollectClient.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.collect.utils;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.operators.collect.CollectCoordinationResponse;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Random;
-import java.util.function.BooleanSupplier;
-
-/**
- * A simple client for fetching collect results.
- */
-public class TestCollectClient<T> extends Thread {
-
- private static final String INIT_VERSION = "";
- private static final int MAX_RETRY_COUNT = 100;
-
- private final TypeSerializer<T> serializer;
- private final CollectRequestSender<T> sender;
- private final BooleanSupplier jobFinishedChecker;
-
- private final LinkedList<T> uncheckpointedResults;
- private final LinkedList<T> checkpointedResults;
-
- private String version;
- private long offset;
- private long lastCheckpointedOffset;
- private int retryCount;
-
- public TestCollectClient(
- TypeSerializer<T> serializer,
- CollectRequestSender<T> sender,
- BooleanSupplier jobFinishedChecker) {
- this.serializer = serializer;
- this.sender = sender;
- this.jobFinishedChecker = jobFinishedChecker;
-
- this.uncheckpointedResults = new LinkedList<>();
- this.checkpointedResults = new LinkedList<>();
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void run() {
- Random random = new Random();
-
- version = INIT_VERSION;
- offset = 0;
- lastCheckpointedOffset = 0;
- retryCount = 0;
-
- try {
- while (!jobFinishedChecker.getAsBoolean()) {
- if (random.nextBoolean()) {
- Thread.sleep(random.nextInt(10));
- }
- CollectCoordinationResponse<T> response = sender.sendRequest(version, offset);
- dealWithResponse(response, offset);
- }
-
- Tuple2<Long, CollectCoordinationResponse> accResults = sender.getAccumulatorResults();
- dealWithResponse(accResults.f1, accResults.f0);
- checkpointedResults.addAll(uncheckpointedResults);
- } catch (Exception e) {
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- }
-
- public List<T> getResults() {
- return checkpointedResults;
- }
-
- private void dealWithResponse(CollectCoordinationResponse<T> response, long responseOffset) throws IOException {
- String responseVersion = response.getVersion();
- long responseLastCheckpointedOffset = response.getLastCheckpointedOffset();
- List<T> responseResults = response.getResults(serializer);
-
- if (responseResults.isEmpty()) {
- retryCount++;
- } else {
- retryCount = 0;
- }
- if (retryCount > MAX_RETRY_COUNT) {
- // not to block the tests
- throw new RuntimeException("Too many retries in TestCollectClient");
- }
-
- if (INIT_VERSION.equals(version)) {
- // first response, update version accordingly
- version = responseVersion;
- } else {
- if (responseLastCheckpointedOffset > lastCheckpointedOffset) {
- // a new checkpoint happens
- int newCheckpointedNum = (int) (responseLastCheckpointedOffset - lastCheckpointedOffset);
- for (int i = 0; i < newCheckpointedNum; i++) {
- T result = uncheckpointedResults.removeFirst();
- checkpointedResults.add(result);
- }
- lastCheckpointedOffset = responseLastCheckpointedOffset;
- }
-
- if (!version.equals(responseVersion)) {
- // sink has restarted
- int removeNum = (int) (offset - lastCheckpointedOffset);
- for (int i = 0; i < removeNum; i++) {
- uncheckpointedResults.removeLast();
- }
- version = responseVersion;
- offset = lastCheckpointedOffset;
- }
-
- if (responseResults.size() > 0) {
- int addStart = (int) (offset - responseOffset);
- List<T> resultsToAdd = responseResults.subList(addStart, responseResults.size());
- uncheckpointedResults.addAll(resultsToAdd);
- offset += resultsToAdd.size();
- }
- }
- }
-}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/TestCoordinationRequestHandler.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/TestCoordinationRequestHandler.java
new file mode 100644
index 0000000..aecf864
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/TestCoordinationRequestHandler.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.collect.utils;
+
+import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.streaming.api.operators.collect.CollectCoordinationRequest;
+import org.apache.flink.streaming.api.operators.collect.CollectCoordinationResponse;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkFunction;
+import org.apache.flink.util.OptionalFailure;
+
+import org.junit.Assert;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A {@link CoordinationRequestHandler} to test fetching SELECT query results.
+ */
+public class TestCoordinationRequestHandler<T> implements CoordinationRequestHandler {
+
+ private static final int BATCH_SIZE = 3;
+
+ private final TypeSerializer<T> serializer;
+ private final String accumulatorName;
+
+ private int checkpointCountDown;
+
+ private LinkedList<T> data;
+ private List<T> checkpointingData;
+ private List<T> checkpointedData;
+
+ private LinkedList<T> buffered;
+ private List<T> checkpointingBuffered;
+ private List<T> checkpointedBuffered;
+
+ private String version;
+
+ private long offset;
+ private long checkpointingOffset;
+ private long checkpointedOffset;
+
+ private Map<String, OptionalFailure<Object>> accumulatorResults;
+
+ private Random random;
+ private boolean closed;
+
+ public TestCoordinationRequestHandler(
+ List<T> data,
+ TypeSerializer<T> serializer,
+ String accumulatorName) {
+ this.serializer = serializer;
+ this.accumulatorName = accumulatorName;
+
+ this.checkpointCountDown = 0;
+
+ this.data = new LinkedList<>(data);
+ this.checkpointedData = new ArrayList<>(data);
+
+ this.buffered = new LinkedList<>();
+ this.checkpointedBuffered = new ArrayList<>();
+
+ this.version = UUID.randomUUID().toString();
+
+ this.offset = 0;
+ this.checkpointingOffset = 0;
+ this.checkpointedOffset = 0;
+
+ this.accumulatorResults = new HashMap<>();
+
+ this.random = new Random();
+ this.closed = false;
+ }
+
+ @Override
+ public CompletableFuture<CoordinationResponse> handleCoordinationRequest(CoordinationRequest request) {
+ if (closed) {
+ throw new RuntimeException("Handler closed");
+ }
+
+ Assert.assertTrue(request instanceof CollectCoordinationRequest);
+ CollectCoordinationRequest collectRequest = (CollectCoordinationRequest) request;
+
+ for (int i = random.nextInt(3) + 1; i > 0; i--) {
+ if (checkpointCountDown > 0) {
+ checkpointCountDown--;
+ if (checkpointCountDown == 0) {
+ checkpointedData = checkpointingData;
+ checkpointedBuffered = checkpointingBuffered;
+ checkpointedOffset = checkpointingOffset;
+ }
+ }
+
+ int r = random.nextInt(10);
+ if (r < 6) {
+ // with 60% chance we add data
+ int size = Math.min(data.size(), BATCH_SIZE * 2 - buffered.size());
+ if (size > 0) {
+ size = random.nextInt(size) + 1;
+ }
+ for (int j = 0; j < size; j++) {
+ buffered.add(data.removeFirst());
+ }
+
+ if (data.isEmpty()) {
+ buildAccumulatorResults();
+ closed = true;
+ break;
+ }
+ } else if (r < 9) {
+ // with 30% chance we do a checkpoint completed in the future
+ if (checkpointCountDown == 0) {
+ checkpointCountDown = random.nextInt(5) + 1;
+ checkpointingData = new ArrayList<>(data);
+ checkpointingBuffered = new ArrayList<>(buffered);
+ checkpointingOffset = offset;
+ }
+ } else {
+ // with 10% chance we fail
+ checkpointCountDown = 0;
+ version = UUID.randomUUID().toString();
+ data = new LinkedList<>(checkpointedData);
+ buffered = new LinkedList<>(checkpointedBuffered);
+ offset = checkpointedOffset;
+ }
+ }
+
+ Assert.assertTrue(offset <= collectRequest.getOffset());
+
+ List<T> subList = Collections.emptyList();
+ if (collectRequest.getVersion().equals(version)) {
+ while (buffered.size() > 0 && collectRequest.getOffset() > offset) {
+ buffered.removeFirst();
+ offset++;
+ }
+ subList = new ArrayList<>();
+ Iterator<T> iterator = buffered.iterator();
+ for (int i = 0; i < BATCH_SIZE && iterator.hasNext(); i++) {
+ subList.add(iterator.next());
+ }
+ }
+
+ CoordinationResponse response;
+ try {
+ if (random.nextBoolean()) {
+ // with 50% chance we return valid result
+ response = new CollectCoordinationResponse<>(version, checkpointedOffset, subList, serializer);
+ } else {
+ // with 50% chance we return invalid result
+ response = new CollectCoordinationResponse<>(
+ collectRequest.getVersion(),
+ -1,
+ Collections.emptyList(),
+ serializer);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return CompletableFuture.completedFuture(response);
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ public Map<String, OptionalFailure<Object>> getAccumulatorResults() {
+ return accumulatorResults;
+ }
+
+ private void buildAccumulatorResults() {
+ List<T> finalResults = new ArrayList<>(buffered);
+ SerializedListAccumulator<byte[]> listAccumulator = new SerializedListAccumulator<>();
+ try {
+ byte[] serializedResult =
+ CollectSinkFunction.serializeAccumulatorResult(
+ offset, version, checkpointedOffset, finalResults, serializer);
+ listAccumulator.add(serializedResult, BytePrimitiveArraySerializer.INSTANCE);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ accumulatorResults.put(accumulatorName, OptionalFailure.of(listAccumulator.getLocalValue()));
+ }
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/TestJobClient.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/TestJobClient.java
new file mode 100644
index 0000000..3a40870
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/TestJobClient.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.collect.utils;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequestGateway;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.util.OptionalFailure;
+
+import org.junit.Assert;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A {@link JobClient} to test fetching SELECT query results.
+ */
+public class TestJobClient implements JobClient, CoordinationRequestGateway {
+
+ private final JobID jobId;
+ private final OperatorID operatorId;
+ private final CoordinationRequestHandler handler;
+ private final JobInfoProvider infoProvider;
+
+ private JobStatus jobStatus;
+ private JobExecutionResult jobExecutionResult;
+
+ public TestJobClient(
+ JobID jobId,
+ OperatorID operatorId,
+ CoordinationRequestHandler handler,
+ JobInfoProvider infoProvider) {
+ this.jobId = jobId;
+ this.operatorId = operatorId;
+ this.handler = handler;
+ this.infoProvider = infoProvider;
+
+ this.jobStatus = JobStatus.RUNNING;
+ this.jobExecutionResult = null;
+ }
+
+ @Override
+ public JobID getJobID() {
+ return jobId;
+ }
+
+ @Override
+ public CompletableFuture<JobStatus> getJobStatus() {
+ return CompletableFuture.completedFuture(jobStatus);
+ }
+
+ @Override
+ public CompletableFuture<Void> cancel() {
+ jobStatus = JobStatus.CANCELED;
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<String> stopWithSavepoint(boolean advanceToEndOfEventTime, @Nullable String savepointDirectory) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<String> triggerSavepoint(@Nullable String savepointDirectory) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<Map<String, Object>> getAccumulators(ClassLoader classLoader) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<JobExecutionResult> getJobExecutionResult(ClassLoader userClassloader) {
+ return CompletableFuture.completedFuture(jobExecutionResult);
+ }
+
+ @Override
+ public CompletableFuture<CoordinationResponse> sendCoordinationRequest(OperatorID operatorId, CoordinationRequest request) {
+ if (jobStatus.isGloballyTerminalState()) {
+ throw new RuntimeException("Job terminated");
+ }
+
+ Assert.assertEquals(this.operatorId, operatorId);
+ CoordinationResponse response;
+ try {
+ response = handler.handleCoordinationRequest(request).get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ if (infoProvider.isJobFinished()) {
+ jobStatus = JobStatus.FINISHED;
+ jobExecutionResult = new JobExecutionResult(jobId, 0, infoProvider.getAccumulatorResults());
+ }
+
+ return CompletableFuture.completedFuture(response);
+ }
+
+ /**
+ * Interface to provide job related info for {@link TestJobClient}.
+ */
+ public interface JobInfoProvider {
+
+ boolean isJobFinished();
+
+ Map<String, OptionalFailure<Object>> getAccumulatorResults();
+ }
+}