You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2020/05/17 13:47:50 UTC

[flink] branch master updated: [FLINK-17735][streaming] Add an iterator to collect sink results through coordination rest api

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8650e40   [FLINK-17735][streaming] Add an iterator to collect sink results through coordination rest api
8650e40 is described below

commit 8650e409763a9b4eba92c77cc02ca22edc0230af
Author: TsReaper <ts...@gmail.com>
AuthorDate: Sun May 17 21:47:31 2020 +0800

     [FLINK-17735][streaming] Add an iterator to collect sink results through coordination rest api
    
    This closes #12073
---
 .../operators/collect/CollectResultFetcher.java    | 334 +++++++++++++++++++++
 .../operators/collect/CollectResultIterator.java   |  95 ++++++
 .../api/operators/collect/CollectSinkFunction.java |  33 +-
 .../collect/CollectSinkOperatorCoordinator.java    |  22 +-
 .../collect/CollectSinkOperatorFactory.java        |  21 +-
 .../collect/CollectResultIteratorTest.java         | 132 ++++++++
 .../operators/collect/CollectSinkFunctionTest.java | 206 ++++++++-----
 .../CollectSinkOperatorCoordinatorTest.java        |   6 +-
 .../collect/utils/CollectRequestSender.java        |  31 --
 .../operators/collect/utils/TestCollectClient.java | 141 ---------
 .../utils/TestCoordinationRequestHandler.java      | 213 +++++++++++++
 .../api/operators/collect/utils/TestJobClient.java | 133 ++++++++
 12 files changed, 1095 insertions(+), 272 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultFetcher.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultFetcher.java
new file mode 100644
index 0000000..ec7286b
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultFetcher.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.collect;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequestGateway;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A fetcher which fetches query results from sink and provides exactly-once semantics.
+ */
+public class CollectResultFetcher<T> {
+
+	private static final int DEFAULT_RETRY_MILLIS = 100;
+	private static final long DEFAULT_ACCUMULATOR_GET_MILLIS = 10000;
+
+	private static final Logger LOG = LoggerFactory.getLogger(CollectResultFetcher.class);
+
+	private final CompletableFuture<OperatorID> operatorIdFuture;
+	private final String accumulatorName;
+	private final int retryMillis;
+
+	private ResultBuffer buffer;
+
+	@Nullable
+	private JobClient jobClient;
+	@Nullable
+	private CoordinationRequestGateway gateway;
+
+	private boolean jobTerminated;
+	private boolean closed;
+
+	public CollectResultFetcher(
+			CompletableFuture<OperatorID> operatorIdFuture,
+			TypeSerializer<T> serializer,
+			String accumulatorName) {
+		this(
+			operatorIdFuture,
+			serializer,
+			accumulatorName,
+			DEFAULT_RETRY_MILLIS);
+	}
+
+	CollectResultFetcher(
+			CompletableFuture<OperatorID> operatorIdFuture,
+			TypeSerializer<T> serializer,
+			String accumulatorName,
+			int retryMillis) {
+		this.operatorIdFuture = operatorIdFuture;
+		this.accumulatorName = accumulatorName;
+		this.retryMillis = retryMillis;
+
+		this.buffer = new ResultBuffer(serializer);
+
+		this.jobTerminated = false;
+		this.closed = false;
+	}
+
+	public void setJobClient(JobClient jobClient) {
+		Preconditions.checkArgument(
+			jobClient instanceof CoordinationRequestGateway,
+			"Job client must be a CoordinationRequestGateway. This is a bug.");
+		this.jobClient = jobClient;
+		this.gateway = (CoordinationRequestGateway) jobClient;
+	}
+
+	public T next() throws IOException {
+		if (closed) {
+			return null;
+		}
+
+		// this is to avoid sleeping before first try
+		boolean beforeFirstTry = true;
+		do {
+			T res = buffer.next();
+			if (res != null) {
+				// we still have user-visible results, just use them
+				return res;
+			} else if (jobTerminated) {
+				// no user-visible results, but job has terminated, we have to return
+				return null;
+			} else if (!beforeFirstTry) {
+				// no results but job is still running, sleep before retry
+				sleepBeforeRetry();
+			}
+			beforeFirstTry = false;
+
+			if (isJobTerminated()) {
+				// job terminated, read results from accumulator
+				jobTerminated = true;
+				Tuple2<Long, CollectCoordinationResponse<T>> accResults = getAccumulatorResults();
+				buffer.dealWithResponse(accResults.f1, accResults.f0);
+				buffer.complete();
+			} else {
+				// job still running, try to fetch some results
+				long requestOffset = buffer.offset;
+				CollectCoordinationResponse<T> response;
+				try {
+					response = sendRequest(buffer.version, requestOffset);
+				} catch (Exception e) {
+					LOG.warn("An exception occurs when fetching query results", e);
+					continue;
+				}
+				// the response will contain data (if any) starting exactly from requested offset
+				buffer.dealWithResponse(response, requestOffset);
+			}
+		} while (true);
+	}
+
+	public void close() {
+		if (closed) {
+			return;
+		}
+
+		cancelJob();
+		closed = true;
+	}
+
+	@SuppressWarnings("unchecked")
+	private CollectCoordinationResponse<T> sendRequest(
+			String version,
+			long offset) throws InterruptedException, ExecutionException {
+		checkJobClientConfigured();
+
+		OperatorID operatorId = operatorIdFuture.getNow(null);
+		Preconditions.checkNotNull(operatorId, "Unknown operator ID. This is a bug.");
+
+		CollectCoordinationRequest request = new CollectCoordinationRequest(version, offset);
+		return (CollectCoordinationResponse<T>) gateway.sendCoordinationRequest(operatorId, request).get();
+	}
+
+	private Tuple2<Long, CollectCoordinationResponse<T>> getAccumulatorResults() throws IOException {
+		checkJobClientConfigured();
+
+		JobExecutionResult executionResult;
+		try {
+			// this timeout is sort of hack, see comments in isJobTerminated for explanation
+			executionResult = jobClient.getJobExecutionResult(getClass().getClassLoader()).get(
+				DEFAULT_ACCUMULATOR_GET_MILLIS, TimeUnit.MILLISECONDS);
+		} catch (InterruptedException | ExecutionException | TimeoutException e) {
+			throw new IOException("Failed to fetch job execution result", e);
+		}
+
+		ArrayList<byte[]> accResults = executionResult.getAccumulatorResult(accumulatorName);
+		if (accResults == null) {
+			// job terminates abnormally
+			throw new IOException("Job terminated abnormally, no job execution result can be fetched");
+		}
+
+		try {
+			List<byte[]> serializedResults =
+				SerializedListAccumulator.deserializeList(accResults, BytePrimitiveArraySerializer.INSTANCE);
+			byte[] serializedResult = serializedResults.get(0);
+			return CollectSinkFunction.deserializeAccumulatorResult(serializedResult);
+		} catch (ClassNotFoundException | IOException e) {
+			// this is impossible
+			throw new IOException("Failed to deserialize accumulator results", e);
+		}
+	}
+
+	private boolean isJobTerminated() {
+		checkJobClientConfigured();
+
+		try {
+			JobStatus status = jobClient.getJobStatus().get();
+			return status.isGloballyTerminalState();
+		} catch (Exception e) {
+			// TODO
+			//  This is sort of hack.
+			//  Currently different execution environment will have different behaviors
+			//  when fetching a finished job status.
+			//  For example, standalone session cluster will return a normal FINISHED,
+			//  while mini cluster will throw IllegalStateException,
+			//  and yarn per job will throw ApplicationNotFoundException.
+			//  We have to assume that job has finished in this case.
+			//  Change this when these behaviors are unified.
+			LOG.warn("Failed to get job status so we assume that the job has terminated. Some data might be lost.", e);
+			return true;
+		}
+	}
+
+	private void cancelJob() {
+		checkJobClientConfigured();
+
+		if (!isJobTerminated()) {
+			jobClient.cancel();
+		}
+	}
+
+	private void sleepBeforeRetry() {
+		if (retryMillis <= 0) {
+			return;
+		}
+
+		try {
+			// TODO a more proper retry strategy?
+			Thread.sleep(retryMillis);
+		} catch (InterruptedException e) {
+			LOG.warn("Interrupted when sleeping before a retry", e);
+		}
+	}
+
+	private void checkJobClientConfigured() {
+		Preconditions.checkNotNull(jobClient, "Job client must be configured before first use.");
+		Preconditions.checkNotNull(gateway, "Coordination request gateway must be configured before first use.");
+	}
+
+	/**
+	 * A buffer which encapsulates the logic of dealing with the response from the {@link CollectSinkFunction}.
+	 * See Java doc of {@link CollectSinkFunction} for explanation of this communication protocol.
+	 */
+	private class ResultBuffer {
+
+		private static final String INIT_VERSION = "";
+
+		private final LinkedList<T> buffer;
+		private final TypeSerializer<T> serializer;
+
+		// for detailed explanation of the following 3 variables, see Java doc of CollectSinkFunction
+		// `version` is to check if the sink restarts
+		private String version;
+		// `offset` is the offset of the next result we want to fetch
+		private long offset;
+
+		// userVisibleHead <= user visible results offset < userVisibleTail
+		private long userVisibleHead;
+		private long userVisibleTail;
+
+		private ResultBuffer(TypeSerializer<T> serializer) {
+			this.buffer = new LinkedList<>();
+			this.serializer = serializer;
+
+			this.version = INIT_VERSION;
+			this.offset = 0;
+
+			this.userVisibleHead = 0;
+			this.userVisibleTail = 0;
+		}
+
+		private T next() {
+			if (userVisibleHead == userVisibleTail) {
+				return null;
+			}
+			T ret = buffer.removeFirst();
+			userVisibleHead++;
+
+			sanityCheck();
+			return ret;
+		}
+
+		private void dealWithResponse(CollectCoordinationResponse<T> response, long responseOffset) throws IOException {
+			String responseVersion = response.getVersion();
+			long responseLastCheckpointedOffset = response.getLastCheckpointedOffset();
+			List<T> results = response.getResults(serializer);
+
+			// we first check version in the response to decide whether we should throw away dirty results
+			if (!version.equals(responseVersion)) {
+				// sink restarted, we revert back to where the sink tells us
+				for (long i = 0; i < offset - responseLastCheckpointedOffset; i++) {
+					buffer.removeLast();
+				}
+				version = responseVersion;
+				offset = responseLastCheckpointedOffset;
+			}
+
+			// we now check if more results can be seen by the user
+			if (responseLastCheckpointedOffset > userVisibleTail) {
+				// lastCheckpointedOffset increases, this means that more results have been
+				// checkpointed, and we can give these results to the user
+				userVisibleTail = responseLastCheckpointedOffset;
+			}
+
+			if (!results.isEmpty()) {
+				// response contains some data, add them to buffer
+				int addStart = (int) (offset - responseOffset);
+				List<T> addedResults = results.subList(addStart, results.size());
+				buffer.addAll(addedResults);
+				offset += addedResults.size();
+			}
+
+			sanityCheck();
+		}
+
+		private void complete() {
+			userVisibleTail = offset;
+		}
+
+		private void sanityCheck() {
+			Preconditions.checkState(
+				userVisibleHead <= userVisibleTail,
+				"userVisibleHead should not be larger than userVisibleTail. This is a bug.");
+			Preconditions.checkState(
+				userVisibleTail <= offset,
+				"userVisibleTail should not be larger than offset. This is a bug.");
+		}
+	}
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultIterator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultIterator.java
new file mode 100644
index 0000000..bbf41e1
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultIterator.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.collect;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * An iterator which iterates through the results of a query job.
+ *
+ * <p>NOTE: After using this iterator, the close method MUST be called in order to release job related resources.
+ */
+public class CollectResultIterator<T> implements Iterator<T>, AutoCloseable {
+
+	private final CollectResultFetcher<T> fetcher;
+	private T bufferedResult;
+
+	public CollectResultIterator(
+			CompletableFuture<OperatorID> operatorIdFuture,
+			TypeSerializer<T> serializer,
+			String accumulatorName) {
+		this.fetcher = new CollectResultFetcher<>(operatorIdFuture, serializer, accumulatorName);
+		this.bufferedResult = null;
+	}
+
+	@VisibleForTesting
+	public CollectResultIterator(
+			CompletableFuture<OperatorID> operatorIdFuture,
+			TypeSerializer<T> serializer,
+			String accumulatorName,
+			int retryMillis) {
+		this.fetcher = new CollectResultFetcher<>(operatorIdFuture, serializer, accumulatorName, retryMillis);
+		this.bufferedResult = null;
+	}
+
+	@Override
+	public boolean hasNext() {
+		// we have to make sure that the next result exists
+		// it is possible that there is no more result but the job is still running
+		if (bufferedResult == null) {
+			bufferedResult = nextResultFromFetcher();
+		}
+		return bufferedResult != null;
+	}
+
+	@Override
+	public T next() {
+		if (bufferedResult == null) {
+			bufferedResult = nextResultFromFetcher();
+		}
+		T ret = bufferedResult;
+		bufferedResult = null;
+		return ret;
+	}
+
+	@Override
+	public void close() throws Exception {
+		fetcher.close();
+	}
+
+	public void setJobClient(JobClient jobClient) {
+		fetcher.setJobClient(jobClient);
+	}
+
+	private T nextResultFromFetcher() {
+		try {
+			return fetcher.next();
+		} catch (IOException e) {
+			fetcher.close();
+			throw new RuntimeException("Failed to fetch next result", e);
+		}
+	}
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java
index 21e889b..64a3ffe 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.api.operators.collect;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.ListState;
@@ -78,7 +79,9 @@ import java.util.concurrent.locks.ReentrantLock;
  *         before this offset. Sink can safely throw these results away.</li>
  *     <li><strong>lastCheckpointedOffset</strong>:
  *         This is the value of <code>offset</code> when the checkpoint happens. This value will be
- *         restored from the checkpoint and set back to <code>offset</code> when the sink restarts.</li>
+ *         restored from the checkpoint and set back to <code>offset</code> when the sink restarts.
+ *         Clients who need exactly-once semantics need to rely on this value for the position to
+ *         revert when a failover happens.</li>
  * </ol>
  *
  * <p>Client will put <code>version</code> and <code>offset</code> into the request, indicating that
@@ -97,9 +100,9 @@ import java.util.concurrent.locks.ReentrantLock;
  * <ol>
  *     <li>If the version mismatches, client knows that sink has restarted. It will throw away all uncheckpointed
  *         results after <code>lastCheckpointedOffset</code>.</li>
- *     <li>Otherwise the version matches. If <code>lastCheckpointedOffset</code> increases, client knows that
- *         a checkpoint happens. It can now move all results before this offset to a user-visible buffer. If
- *         the response also contains new results, client will now move these new results into uncheckpointed
+ *     <li>If <code>lastCheckpointedOffset</code> increases, client knows that
+ *         a checkpoint happens. It can now move all results before this offset to a user-visible buffer.</li>
+ *     <li>If the response also contains new results, client will now move these new results into uncheckpointed
  *         buffer.</li>
  * </ol>
  *
@@ -264,7 +267,9 @@ public class CollectSinkFunction<IN> extends RichSinkFunction<IN> implements Che
 			// put results not consumed by the client into the accumulator
 			// so that we do not block the closing procedure while not throwing results away
 			SerializedListAccumulator<byte[]> accumulator = new SerializedListAccumulator<>();
-			accumulator.add(serializeAccumulatorResult(), BytePrimitiveArraySerializer.INSTANCE);
+			accumulator.add(
+				serializeAccumulatorResult(offset, version, lastCheckpointedOffset, bufferedResults, serializer),
+				BytePrimitiveArraySerializer.INSTANCE);
 			getRuntimeContext().addAccumulator(accumulatorName, accumulator);
 		} finally {
 			bufferedResultsLock.unlock();
@@ -286,22 +291,28 @@ public class CollectSinkFunction<IN> extends RichSinkFunction<IN> implements Che
 		this.eventGateway = eventGateway;
 	}
 
-	private byte[] serializeAccumulatorResult() throws IOException {
+	@VisibleForTesting
+	public static <T> byte[] serializeAccumulatorResult(
+			long offset,
+			String version,
+			long lastCheckpointedOffset,
+			List<T> bufferedResults,
+			TypeSerializer<T> serializer) throws IOException {
 		ByteArrayOutputStream baos = new ByteArrayOutputStream();
 		DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos);
 		wrapper.writeLong(offset);
-		CollectCoordinationResponse<IN> finalResponse =
+		CollectCoordinationResponse<T> finalResponse =
 			new CollectCoordinationResponse<>(version, lastCheckpointedOffset, bufferedResults, serializer);
 		finalResponse.serialize(wrapper);
 		return baos.toByteArray();
 	}
 
-	public static Tuple2<Long, CollectCoordinationResponse> deserializeAccumulatorResult(
+	public static <T> Tuple2<Long, CollectCoordinationResponse<T>> deserializeAccumulatorResult(
 			byte[] serializedAccResults) throws IOException {
 		ByteArrayInputStream bais = new ByteArrayInputStream(serializedAccResults);
 		DataInputViewStreamWrapper wrapper = new DataInputViewStreamWrapper(bais);
 		long token = wrapper.readLong();
-		CollectCoordinationResponse finalResponse = new CollectCoordinationResponse(wrapper);
+		CollectCoordinationResponse<T> finalResponse = new CollectCoordinationResponse<>(wrapper);
 		return Tuple2.of(token, finalResponse);
 	}
 
@@ -395,8 +406,8 @@ public class CollectSinkFunction<IN> extends RichSinkFunction<IN> implements Che
 
 		private void close() {
 			running = false;
+			closeServerSocket();
 			closeCurrentConnection();
-			closeServer();
 		}
 
 		private InetSocketAddress getServerSocketAddress() {
@@ -444,7 +455,7 @@ public class CollectSinkFunction<IN> extends RichSinkFunction<IN> implements Che
 			}
 		}
 
-		private void closeServer() {
+		private void closeServerSocket() {
 			try {
 				serverSocket.close();
 			} catch (Exception e) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
index dea9bf3..6c84266 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
@@ -56,6 +56,8 @@ public class CollectSinkOperatorCoordinator implements OperatorCoordinator, Coor
 
 	private static final Logger LOG = LoggerFactory.getLogger(CollectSinkOperatorCoordinator.class);
 
+	private final int socketTimeout;
+
 	private InetSocketAddress address;
 	private Socket socket;
 	private DataInputViewStreamWrapper inStream;
@@ -63,6 +65,10 @@ public class CollectSinkOperatorCoordinator implements OperatorCoordinator, Coor
 
 	private ExecutorService executorService;
 
+	public CollectSinkOperatorCoordinator(int socketTimeout) {
+		this.socketTimeout = socketTimeout;
+	}
+
 	@Override
 	public void start() throws Exception {
 		this.executorService =
@@ -107,12 +113,20 @@ public class CollectSinkOperatorCoordinator implements OperatorCoordinator, Coor
 			CollectCoordinationRequest request,
 			CompletableFuture<CoordinationResponse> responseFuture,
 			InetSocketAddress sinkAddress) {
+		if (sinkAddress == null) {
+			closeConnection();
+			completeWithEmptyResponse(request, responseFuture);
+			return;
+		}
+
 		try {
 			if (socket == null) {
-				socket = new Socket(sinkAddress.getAddress(), sinkAddress.getPort());
+				socket = new Socket();
+				socket.setSoTimeout(socketTimeout);
 				socket.setKeepAlive(true);
 				socket.setTcpNoDelay(true);
 
+				socket.connect(sinkAddress);
 				inStream = new DataInputViewStreamWrapper(socket.getInputStream());
 				outStream = new DataOutputViewStreamWrapper(socket.getOutputStream());
 				LOG.info("Sink connection established");
@@ -200,9 +214,11 @@ public class CollectSinkOperatorCoordinator implements OperatorCoordinator, Coor
 	public static class Provider implements OperatorCoordinator.Provider {
 
 		private final OperatorID operatorId;
+		private final int socketTimeout;
 
-		public Provider(OperatorID operatorId) {
+		public Provider(OperatorID operatorId, int socketTimeout) {
 			this.operatorId = operatorId;
+			this.socketTimeout = socketTimeout;
 		}
 
 		@Override
@@ -213,7 +229,7 @@ public class CollectSinkOperatorCoordinator implements OperatorCoordinator, Coor
 		@Override
 		public OperatorCoordinator create(Context context) {
 			// we do not send operator event so we don't need a context
-			return new CollectSinkOperatorCoordinator();
+			return new CollectSinkOperatorCoordinator(socketTimeout);
 		}
 	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorFactory.java
index 8fa9843..e177f79 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorFactory.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.api.operators.collect;
 
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
@@ -28,19 +29,25 @@ import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
 /**
  * The Factory class for {@link CollectSinkOperator}.
  */
-@SuppressWarnings("unchecked")
-public class CollectSinkOperatorFactory extends SimpleUdfStreamOperatorFactory<Object> implements CoordinatedOperatorFactory<Object> {
+public class CollectSinkOperatorFactory<IN> extends SimpleUdfStreamOperatorFactory<Object> implements CoordinatedOperatorFactory<Object> {
 
 	private static final long serialVersionUID = 1L;
 
-	private final CollectSinkOperator<?> operator;
+	private final CollectSinkOperator<IN> operator;
+	private final int socketTimeout;
 
-	public CollectSinkOperatorFactory(CollectSinkOperator<?> operator) {
-		super(operator);
-		this.operator = operator;
+	public CollectSinkOperatorFactory(
+			TypeSerializer<IN> serializer,
+			int maxResultsPerBatch,
+			String accumulatorName,
+			int socketTimeout) {
+		super(new CollectSinkOperator<>(serializer, maxResultsPerBatch, accumulatorName));
+		this.operator = (CollectSinkOperator<IN>) getOperator();
+		this.socketTimeout = socketTimeout;
 	}
 
 	@Override
+	@SuppressWarnings("unchecked")
 	public <T extends StreamOperator<Object>> T  createStreamOperator(StreamOperatorParameters<Object> parameters) {
 		final OperatorID operatorId = parameters.getStreamConfig().getOperatorID();
 		final OperatorEventDispatcher eventDispatcher = parameters.getOperatorEventDispatcher();
@@ -55,6 +62,6 @@ public class CollectSinkOperatorFactory extends SimpleUdfStreamOperatorFactory<O
 	@Override
 	public OperatorCoordinator.Provider getCoordinatorProvider(String operatorName, OperatorID operatorID) {
 		operator.getOperatorIdFuture().complete(operatorID);
-		return new CollectSinkOperatorCoordinator.Provider(operatorID);
+		return new CollectSinkOperatorCoordinator.Provider(operatorID, socketTimeout);
 	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectResultIteratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectResultIteratorTest.java
new file mode 100644
index 0000000..021689f
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectResultIteratorTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.collect;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.operators.collect.utils.TestCoordinationRequestHandler;
+import org.apache.flink.streaming.api.operators.collect.utils.TestJobClient;
+import org.apache.flink.util.OptionalFailure;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Tests for {@link CollectResultIterator}.
+ */
+public class CollectResultIteratorTest extends TestLogger {
+
+	private static final OperatorID TEST_OPERATOR_ID = new OperatorID();
+	private static final JobID TEST_JOB_ID = new JobID();
+	private static final String ACCUMULATOR_NAME = "accumulatorName";
+
+	@Test
+	public void testIteratorWithCheckpointAndFailure() throws Exception {
+		// run this random test multiple times
+		for (int testCount = 1000; testCount > 0; testCount--) {
+			List<Integer> expected = new ArrayList<>();
+			for (int i = 0; i < 200; i++) {
+				expected.add(i);
+			}
+
+			CollectResultIterator<Integer> iterator = createIteratorAndJobClient(expected, IntSerializer.INSTANCE).f0;
+
+			List<Integer> actual = new ArrayList<>();
+			while (iterator.hasNext()) {
+				actual.add(iterator.next());
+			}
+			Assert.assertEquals(expected.size(), actual.size());
+
+			Collections.sort(expected);
+			Collections.sort(actual);
+			Assert.assertArrayEquals(expected.toArray(new Integer[0]), actual.toArray(new Integer[0]));
+
+			iterator.close();
+		}
+	}
+
+	@Test
+	public void testEarlyClose() throws Exception {
+		List<Integer> expected = new ArrayList<>();
+		for (int i = 0; i < 200; i++) {
+			expected.add(i);
+		}
+
+		Tuple2<CollectResultIterator<Integer>, JobClient> tuple2 =
+			createIteratorAndJobClient(expected, IntSerializer.INSTANCE);
+		CollectResultIterator<Integer> iterator = tuple2.f0;
+		JobClient jobClient = tuple2.f1;
+
+		for (int i = 0; i < 100; i++) {
+			Assert.assertTrue(iterator.hasNext());
+			Assert.assertNotNull(iterator.next());
+		}
+		Assert.assertTrue(iterator.hasNext());
+		iterator.close();
+
+		Assert.assertEquals(JobStatus.CANCELED, jobClient.getJobStatus().get());
+	}
+
+	private Tuple2<CollectResultIterator<Integer>, JobClient> createIteratorAndJobClient(
+			List<Integer> expected,
+			TypeSerializer<Integer> serializer) {
+		CollectResultIterator<Integer> iterator = new CollectResultIterator<>(
+			CompletableFuture.completedFuture(TEST_OPERATOR_ID),
+			serializer,
+			ACCUMULATOR_NAME,
+			0);
+
+		TestCoordinationRequestHandler<Integer> handler = new TestCoordinationRequestHandler<>(
+			expected, serializer, ACCUMULATOR_NAME);
+
+		TestJobClient.JobInfoProvider infoProvider = new TestJobClient.JobInfoProvider() {
+
+			@Override
+			public boolean isJobFinished() {
+				return handler.isClosed();
+			}
+
+			@Override
+			public Map<String, OptionalFailure<Object>> getAccumulatorResults() {
+				return handler.getAccumulatorResults();
+			}
+		};
+
+		TestJobClient jobClient = new TestJobClient(
+			TEST_JOB_ID,
+			TEST_OPERATOR_ID,
+			handler,
+			infoProvider);
+		iterator.setJobClient(jobClient);
+
+		return Tuple2.of(iterator, jobClient);
+	}
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunctionTest.java
index 1483998..1363d87 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunctionTest.java
@@ -17,25 +17,24 @@
 
 package org.apache.flink.streaming.api.operators.collect;
 
-import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.api.operators.collect.utils.CollectRequestSender;
 import org.apache.flink.streaming.api.operators.collect.utils.MockFunctionInitializationContext;
 import org.apache.flink.streaming.api.operators.collect.utils.MockFunctionSnapshotContext;
 import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventGateway;
-import org.apache.flink.streaming.api.operators.collect.utils.TestCollectClient;
+import org.apache.flink.streaming.api.operators.collect.utils.TestJobClient;
 import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
-import org.apache.flink.types.Row;
+import org.apache.flink.util.OptionalFailure;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
@@ -47,11 +46,13 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.Comparator;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.ListIterator;
+import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -61,13 +62,16 @@ public class CollectSinkFunctionTest extends TestLogger {
 
 	private static final int MAX_RESULTS_PER_BATCH = 3;
 	private static final String ACCUMULATOR_NAME = "tableCollectAccumulator";
-	private static final long TIME_OUT_MILLIS = 10000;
+	private static final int FUTURE_TIMEOUT_MILLIS = 10000;
+	private static final int SOCKET_TIMEOUT_MILLIS = 1000;
 	private static final int MAX_RETIRES = 100;
 
-	private static final TypeSerializer<Row> serializer =
-		new RowTypeInfo(BasicTypeInfo.INT_TYPE_INFO).createSerializer(new ExecutionConfig());
+	private static final JobID TEST_JOB_ID = new JobID();
+	private static final OperatorID TEST_OPERATOR_ID = new OperatorID();
 
-	private CollectSinkFunction<Row> function;
+	private static final TypeSerializer<Integer> serializer = IntSerializer.INSTANCE;
+
+	private CollectSinkFunction<Integer> function;
 	private CollectSinkOperatorCoordinator coordinator;
 	private MockFunctionInitializationContext functionInitializationContext;
 	private boolean jobFinished;
@@ -81,7 +85,7 @@ public class CollectSinkFunctionTest extends TestLogger {
 		ioManager = new IOManagerAsync();
 		runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, ioManager);
 		gateway = new MockOperatorEventGateway();
-		coordinator = new CollectSinkOperatorCoordinator();
+		coordinator = new CollectSinkOperatorCoordinator(SOCKET_TIMEOUT_MILLIS);
 		coordinator.start();
 
 		// only used in checkpointed tests
@@ -101,10 +105,10 @@ public class CollectSinkFunctionTest extends TestLogger {
 		openFunction();
 		for (int i = 0; i < 6; i++) {
 			// CollectSinkFunction never use context when invoked
-			function.invoke(Row.of(i), null);
+			function.invoke(i, null);
 		}
 
-		CollectCoordinationResponse<Row> response = sendRequestAndGetValidResponse("", 0);
+		CollectCoordinationResponse<Integer> response = sendRequestAndGetValidResponse("", 0);
 		Assert.assertEquals(0, response.getLastCheckpointedOffset());
 		String version = response.getVersion();
 
@@ -118,7 +122,7 @@ public class CollectSinkFunctionTest extends TestLogger {
 		assertResponseEquals(response, version, 0, Collections.emptyList());
 
 		for (int i = 6; i < 10; i++) {
-			function.invoke(Row.of(i), null);
+			function.invoke(i, null);
 		}
 
 		// invalid request
@@ -135,7 +139,7 @@ public class CollectSinkFunctionTest extends TestLogger {
 		assertResponseEquals(response, version, 0, Collections.emptyList());
 
 		for (int i = 10; i < 16; i++) {
-			function.invoke(Row.of(i), null);
+			function.invoke(i, null);
 		}
 
 		response = sendRequestAndGetValidResponse(version, 12);
@@ -151,10 +155,10 @@ public class CollectSinkFunctionTest extends TestLogger {
 		openFunctionWithState();
 		for (int i = 0; i < 2; i++) {
 			// CollectSinkFunction never use context when invoked
-			function.invoke(Row.of(i), null);
+			function.invoke(i, null);
 		}
 
-		CollectCoordinationResponse<Row> response = sendRequestAndGetValidResponse("", 0);
+		CollectCoordinationResponse<Integer> response = sendRequestAndGetValidResponse("", 0);
 		Assert.assertEquals(0, response.getLastCheckpointedOffset());
 		String version = response.getVersion();
 
@@ -162,7 +166,7 @@ public class CollectSinkFunctionTest extends TestLogger {
 		assertResponseEquals(response, version, 0, Arrays.asList(0, 1));
 
 		for (int i = 2; i < 6; i++) {
-			function.invoke(Row.of(i), null);
+			function.invoke(i, null);
 		}
 
 		response = sendRequestAndGetValidResponse(version, 3);
@@ -181,7 +185,7 @@ public class CollectSinkFunctionTest extends TestLogger {
 		assertResponseEquals(response, version, 3, Arrays.asList(4, 5));
 
 		for (int i = 6; i < 9; i++) {
-			function.invoke(Row.of(i), null);
+			function.invoke(i, null);
 		}
 
 		response = sendRequestAndGetValidResponse(version, 6);
@@ -192,7 +196,7 @@ public class CollectSinkFunctionTest extends TestLogger {
 		openFunctionWithState();
 
 		for (int i = 9; i < 12; i++) {
-			function.invoke(Row.of(i), null);
+			function.invoke(i, null);
 		}
 
 		response = sendRequestAndGetValidResponse(version, 4);
@@ -208,7 +212,7 @@ public class CollectSinkFunctionTest extends TestLogger {
 		checkpointFunction(2);
 		checkpointComplete(2);
 
-		function.invoke(Row.of(12), null);
+		function.invoke(12, null);
 
 		response = sendRequestAndGetValidResponse(version, 7);
 		assertResponseEquals(response, version, 6, Arrays.asList(10, 11, 12));
@@ -228,7 +232,7 @@ public class CollectSinkFunctionTest extends TestLogger {
 		assertResponseEquals(response, version, 6, Collections.emptyList());
 
 		for (int i = 13; i < 17; i++) {
-			function.invoke(Row.of(i), null);
+			function.invoke(i, null);
 		}
 
 		response = sendRequestAndGetValidResponse(version, 9);
@@ -249,7 +253,7 @@ public class CollectSinkFunctionTest extends TestLogger {
 		assertResponseEquals(response, version, 9, Collections.singletonList(16));
 
 		for (int i = 17; i < 20; i++) {
-			function.invoke(Row.of(i), null);
+			function.invoke(i, null);
 		}
 
 		response = sendRequestAndGetValidResponse(version, 12);
@@ -270,7 +274,7 @@ public class CollectSinkFunctionTest extends TestLogger {
 		assertResponseEquals(response, version, 9, Collections.singletonList(16));
 
 		for (int i = 20; i < 23; i++) {
-			function.invoke(Row.of(i), null);
+			function.invoke(i, null);
 		}
 
 		response = sendRequestAndGetValidResponse(version, 12);
@@ -290,13 +294,9 @@ public class CollectSinkFunctionTest extends TestLogger {
 				expected.add(i);
 			}
 			UncheckpointedDataFeeder feeder = new UncheckpointedDataFeeder(expected);
-			TestCollectClient<Row> client = new TestCollectClient<>(
-				serializer,
-				new TestCollectRequestSender(),
-				() -> jobFinished);
 
-			runFunctionWithClient(feeder, client);
-			assertResultsEqualAfterSort(expected, client.getResults());
+			List<Integer> actual = runFunctionRandomTest(feeder);
+			assertResultsEqualAfterSort(expected, actual);
 
 			after();
 			before();
@@ -312,23 +312,22 @@ public class CollectSinkFunctionTest extends TestLogger {
 				expected.add(i);
 			}
 			CheckpointedDataFeeder feeder = new CheckpointedDataFeeder(expected);
-			TestCollectClient<Row> client = new TestCollectClient<>(
-				serializer,
-				new TestCollectRequestSender(),
-				() -> jobFinished);
 
-			runFunctionWithClient(feeder, client);
-			assertResultsEqualAfterSort(expected, client.getResults());
+			List<Integer> actual = runFunctionRandomTest(feeder);
+			assertResultsEqualAfterSort(expected, actual);
 
 			after();
 			before();
 		}
 	}
 
-	private void runFunctionWithClient(Thread feeder, Thread client) throws Exception {
+	private List<Integer> runFunctionRandomTest(Thread feeder) throws Exception {
+		CollectClient client = new CollectClient();
+
 		Thread.UncaughtExceptionHandler exceptionHandler = (t, e) -> {
 			feeder.interrupt();
 			client.interrupt();
+			e.printStackTrace();
 		};
 		feeder.setUncaughtExceptionHandler(exceptionHandler);
 		client.setUncaughtExceptionHandler(exceptionHandler);
@@ -337,10 +336,13 @@ public class CollectSinkFunctionTest extends TestLogger {
 		client.start();
 		feeder.join();
 		client.join();
+
+		return client.results;
 	}
 
 	private void openFunction() throws Exception {
-		function = new CollectSinkFunction<>(serializer, MAX_RESULTS_PER_BATCH, ACCUMULATOR_NAME);
+		function = new CollectSinkFunction<>(
+			serializer, MAX_RESULTS_PER_BATCH, ACCUMULATOR_NAME);
 		function.setRuntimeContext(runtimeContext);
 		function.setOperatorEventGateway(gateway);
 		function.open(new Configuration());
@@ -349,7 +351,8 @@ public class CollectSinkFunctionTest extends TestLogger {
 
 	private void openFunctionWithState() throws Exception {
 		functionInitializationContext.getOperatorStateStore().revertToLastSuccessCheckpoint();
-		function = new CollectSinkFunction<>(serializer, MAX_RESULTS_PER_BATCH, ACCUMULATOR_NAME);
+		function = new CollectSinkFunction<>(
+			serializer, MAX_RESULTS_PER_BATCH, ACCUMULATOR_NAME);
 		function.setRuntimeContext(runtimeContext);
 		function.setOperatorEventGateway(gateway);
 		function.initializeState(functionInitializationContext);
@@ -382,19 +385,19 @@ public class CollectSinkFunctionTest extends TestLogger {
 	}
 
 	@SuppressWarnings("unchecked")
-	private CollectCoordinationResponse<Row> sendRequest(
+	private CollectCoordinationResponse<Integer> sendRequest(
 			String version,
 			long offset) throws Exception {
 		CollectCoordinationRequest request = new CollectCoordinationRequest(version, offset);
 		// we add a timeout to not block the tests
 		return ((CollectCoordinationResponse) coordinator
-			.handleCoordinationRequest(request).get(TIME_OUT_MILLIS, TimeUnit.MILLISECONDS));
+			.handleCoordinationRequest(request).get(FUTURE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
 	}
 
-	private CollectCoordinationResponse<Row> sendRequestAndGetValidResponse(
+	private CollectCoordinationResponse<Integer> sendRequestAndGetValidResponse(
 			String version,
 			long offset) throws Exception {
-		CollectCoordinationResponse<Row> response;
+		CollectCoordinationResponse<Integer> response;
 		for (int i = 0; i < MAX_RETIRES; i++) {
 			response = sendRequest(version, offset);
 			if (response.getLastCheckpointedOffset() >= 0) {
@@ -405,7 +408,7 @@ public class CollectSinkFunctionTest extends TestLogger {
 	}
 
 	@SuppressWarnings("unchecked")
-	private Tuple2<Long, CollectCoordinationResponse> getAccumualtorResults() throws Exception {
+	private Tuple2<Long, CollectCoordinationResponse<Integer>> getAccumualtorResults() throws Exception {
 		Accumulator accumulator = runtimeContext.getAccumulator(ACCUMULATOR_NAME);
 		ArrayList<byte[]> accLocalValue = ((SerializedListAccumulator) accumulator).getLocalValue();
 		List<byte[]> serializedResults =
@@ -416,41 +419,35 @@ public class CollectSinkFunctionTest extends TestLogger {
 	}
 
 	private void assertResponseEquals(
-			CollectCoordinationResponse<Row> response,
+			CollectCoordinationResponse<Integer> response,
 			String version,
 			long lastCheckpointedOffset,
 			List<Integer> expected) throws IOException {
 		Assert.assertEquals(version, response.getVersion());
 		Assert.assertEquals(lastCheckpointedOffset, response.getLastCheckpointedOffset());
-		List<Row> results = response.getResults(serializer);
+		List<Integer> results = response.getResults(serializer);
 		assertResultsEqual(expected, results);
 	}
 
-	private void assertResultsEqual(List<Integer> expected, List<Row> actual) {
-		Assert.assertEquals(expected.size(), actual.size());
-		for (int i = 0; i < expected.size(); i++) {
-			Row row = actual.get(i);
-			Assert.assertEquals(1, row.getArity());
-			Assert.assertEquals(expected.get(i), row.getField(0));
-		}
+	private void assertResultsEqual(List<Integer> expected, List<Integer> actual) {
+		Assert.assertArrayEquals(expected.toArray(new Integer[0]), actual.toArray(new Integer[0]));
 	}
 
-	private void assertResultsEqualAfterSort(List<Integer> expected, List<Row> actual) {
+	private void assertResultsEqualAfterSort(List<Integer> expected, List<Integer> actual) {
 		Collections.sort(expected);
-		actual.sort(Comparator.comparingInt(row -> (int) row.getField(0)));
+		Collections.sort(actual);
 		assertResultsEqual(expected, actual);
 	}
 
-	@SuppressWarnings("unchecked")
 	private void assertAccumulatorResult(
 			long expectedOffset,
 			String expectedVersion,
 			long expectedLastCheckpointedOffset,
 			List<Integer> expectedResults) throws Exception {
-		Tuple2<Long, CollectCoordinationResponse> accResults = getAccumualtorResults();
+		Tuple2<Long, CollectCoordinationResponse<Integer>> accResults = getAccumualtorResults();
 		long offset = accResults.f0;
-		CollectCoordinationResponse response = accResults.f1;
-		List<Row> actualResults = response.getResults(serializer);
+		CollectCoordinationResponse<Integer> response = accResults.f1;
+		List<Integer> actualResults = response.getResults(serializer);
 
 		Assert.assertEquals(expectedOffset, offset);
 		Assert.assertEquals(expectedVersion, response.getVersion());
@@ -458,19 +455,6 @@ public class CollectSinkFunctionTest extends TestLogger {
 		assertResultsEqual(expectedResults, actualResults);
 	}
 
-	private class TestCollectRequestSender implements CollectRequestSender<Row> {
-
-		@Override
-		public CollectCoordinationResponse<Row> sendRequest(String version, long offset) throws Exception {
-			return CollectSinkFunctionTest.this.sendRequest(version, offset);
-		}
-
-		@Override
-		public Tuple2<Long, CollectCoordinationResponse> getAccumulatorResults() throws Exception {
-			return CollectSinkFunctionTest.this.getAccumualtorResults();
-		}
-	}
-
 	/**
 	 * A thread feeding data to the function. It will fail when half of the data is fed.
 	 */
@@ -496,7 +480,7 @@ public class CollectSinkFunctionTest extends TestLogger {
 				while (data.size() > 0) {
 					int size = Math.min(data.size(), random.nextInt(MAX_RESULTS_PER_BATCH * 3) + 1);
 					for (int i = 0; i < size; i++) {
-						function.invoke(Row.of(data.removeFirst()), null);
+						function.invoke(data.removeFirst(), null);
 					}
 
 					if (!failedBefore && data.size() < checkpointedData.size() / 2) {
@@ -519,7 +503,6 @@ public class CollectSinkFunctionTest extends TestLogger {
 
 				finishJob();
 			} catch (Exception e) {
-				e.printStackTrace();
 				throw new RuntimeException(e);
 			}
 		}
@@ -571,7 +554,7 @@ public class CollectSinkFunctionTest extends TestLogger {
 						// with 60% chance we add some data
 						int size = Math.min(data.size(), random.nextInt(MAX_RESULTS_PER_BATCH * 3) + 1);
 						for (int i = 0; i < size; i++) {
-							function.invoke(Row.of(data.removeFirst()), null);
+							function.invoke(data.removeFirst(), null);
 						}
 					} else if (r < 9) {
 						// with 30% chance we make a checkpoint
@@ -603,12 +586,14 @@ public class CollectSinkFunctionTest extends TestLogger {
 
 				finishJob();
 			} catch (Exception e) {
-				e.printStackTrace();
 				throw new RuntimeException(e);
 			}
 		}
 	}
 
+	/**
+	 * Countdown for a checkpoint which will succeed in the future.
+	 */
 	private static class CheckpointCountdown {
 
 		private long id;
@@ -629,4 +614,71 @@ public class CollectSinkFunctionTest extends TestLogger {
 			return false;
 		}
 	}
+
+	/**
+	 * A thread collecting results with the collecting iterator.
+	 */
+	private class CollectClient extends Thread {
+
+		private List<Integer> results;
+		private CollectResultIterator<Integer> iterator;
+
+		private CollectClient() {
+			this.results = new ArrayList<>();
+
+			this.iterator = new CollectResultIterator<>(
+				CompletableFuture.completedFuture(TEST_OPERATOR_ID),
+				serializer,
+				ACCUMULATOR_NAME,
+				0
+			);
+
+			TestJobClient.JobInfoProvider infoProvider = new TestJobClient.JobInfoProvider() {
+
+				@Override
+				public boolean isJobFinished() {
+					return jobFinished;
+				}
+
+				@Override
+				public Map<String, OptionalFailure<Object>> getAccumulatorResults() {
+					Map<String, OptionalFailure<Object>> accumulatorResults = new HashMap<>();
+					accumulatorResults.put(
+						ACCUMULATOR_NAME,
+						OptionalFailure.of(runtimeContext.getAccumulator(ACCUMULATOR_NAME).getLocalValue()));
+					return accumulatorResults;
+				}
+			};
+
+			TestJobClient jobClient = new TestJobClient(
+				TEST_JOB_ID,
+				TEST_OPERATOR_ID,
+				coordinator,
+				infoProvider);
+
+			iterator.setJobClient(jobClient);
+		}
+
+		@Override
+		public void run() {
+			Random random = new Random();
+
+			while (iterator.hasNext()) {
+				results.add(iterator.next());
+				if (random.nextBoolean()) {
+					try {
+						Thread.sleep(5);
+					} catch (InterruptedException e) {
+						// ignore
+					}
+				}
+			}
+
+			try {
+				iterator.close();
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}
+		}
+	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java
index d2ad57b..66a624a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java
@@ -45,6 +45,8 @@ import java.util.concurrent.CompletableFuture;
  */
 public class CollectSinkOperatorCoordinatorTest {
 
+	private static final int SOCKET_TIMEOUT_MILLIS = 1000;
+
 	private static final TypeSerializer<Row> serializer = new RowTypeInfo(
 		BasicTypeInfo.INT_TYPE_INFO,
 		BasicTypeInfo.STRING_TYPE_INFO
@@ -52,7 +54,7 @@ public class CollectSinkOperatorCoordinatorTest {
 
 	@Test
 	public void testNoAddress() throws Exception {
-		CollectSinkOperatorCoordinator coordinator = new CollectSinkOperatorCoordinator();
+		CollectSinkOperatorCoordinator coordinator = new CollectSinkOperatorCoordinator(SOCKET_TIMEOUT_MILLIS);
 		coordinator.start();
 
 		CollectCoordinationRequest request = new CollectCoordinationRequest("version", 123);
@@ -65,7 +67,7 @@ public class CollectSinkOperatorCoordinatorTest {
 
 	@Test
 	public void testServerFailure() throws Exception {
-		CollectSinkOperatorCoordinator coordinator = new CollectSinkOperatorCoordinator();
+		CollectSinkOperatorCoordinator coordinator = new CollectSinkOperatorCoordinator(SOCKET_TIMEOUT_MILLIS);
 		coordinator.start();
 
 		List<List<Row>> expected = Arrays.asList(
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/CollectRequestSender.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/CollectRequestSender.java
deleted file mode 100644
index fc7b759..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/CollectRequestSender.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.collect.utils;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.operators.collect.CollectCoordinationResponse;
-
-/**
- * Testing interface for sending collect requests.
- */
-public interface CollectRequestSender<T> {
-
-	CollectCoordinationResponse<T> sendRequest(String version, long offset) throws Exception;
-
-	Tuple2<Long, CollectCoordinationResponse> getAccumulatorResults() throws Exception;
-}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/TestCollectClient.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/TestCollectClient.java
deleted file mode 100644
index 9040971..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/TestCollectClient.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.collect.utils;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.operators.collect.CollectCoordinationResponse;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Random;
-import java.util.function.BooleanSupplier;
-
-/**
- * A simple client for fetching collect results.
- */
-public class TestCollectClient<T> extends Thread {
-
-	private static final String INIT_VERSION = "";
-	private static final int MAX_RETRY_COUNT = 100;
-
-	private final TypeSerializer<T> serializer;
-	private final CollectRequestSender<T> sender;
-	private final BooleanSupplier jobFinishedChecker;
-
-	private final LinkedList<T> uncheckpointedResults;
-	private final LinkedList<T> checkpointedResults;
-
-	private String version;
-	private long offset;
-	private long lastCheckpointedOffset;
-	private int retryCount;
-
-	public TestCollectClient(
-			TypeSerializer<T> serializer,
-			CollectRequestSender<T> sender,
-			BooleanSupplier jobFinishedChecker) {
-		this.serializer = serializer;
-		this.sender = sender;
-		this.jobFinishedChecker = jobFinishedChecker;
-
-		this.uncheckpointedResults = new LinkedList<>();
-		this.checkpointedResults = new LinkedList<>();
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	public void run() {
-		Random random = new Random();
-
-		version = INIT_VERSION;
-		offset = 0;
-		lastCheckpointedOffset = 0;
-		retryCount = 0;
-
-		try {
-			while (!jobFinishedChecker.getAsBoolean()) {
-				if (random.nextBoolean()) {
-					Thread.sleep(random.nextInt(10));
-				}
-				CollectCoordinationResponse<T> response = sender.sendRequest(version, offset);
-				dealWithResponse(response, offset);
-			}
-
-			Tuple2<Long, CollectCoordinationResponse> accResults = sender.getAccumulatorResults();
-			dealWithResponse(accResults.f1, accResults.f0);
-			checkpointedResults.addAll(uncheckpointedResults);
-		} catch (Exception e) {
-			e.printStackTrace();
-			throw new RuntimeException(e);
-		}
-	}
-
-	public List<T> getResults() {
-		return checkpointedResults;
-	}
-
-	private void dealWithResponse(CollectCoordinationResponse<T> response, long responseOffset) throws IOException {
-		String responseVersion = response.getVersion();
-		long responseLastCheckpointedOffset = response.getLastCheckpointedOffset();
-		List<T> responseResults = response.getResults(serializer);
-
-		if (responseResults.isEmpty()) {
-			retryCount++;
-		} else {
-			retryCount = 0;
-		}
-		if (retryCount > MAX_RETRY_COUNT) {
-			// not to block the tests
-			throw new RuntimeException("Too many retries in TestCollectClient");
-		}
-
-		if (INIT_VERSION.equals(version)) {
-			// first response, update version accordingly
-			version = responseVersion;
-		} else {
-			if (responseLastCheckpointedOffset > lastCheckpointedOffset) {
-				// a new checkpoint happens
-				int newCheckpointedNum = (int) (responseLastCheckpointedOffset - lastCheckpointedOffset);
-				for (int i = 0; i < newCheckpointedNum; i++) {
-					T result = uncheckpointedResults.removeFirst();
-					checkpointedResults.add(result);
-				}
-				lastCheckpointedOffset = responseLastCheckpointedOffset;
-			}
-
-			if (!version.equals(responseVersion)) {
-				// sink has restarted
-				int removeNum = (int) (offset - lastCheckpointedOffset);
-				for (int i = 0; i < removeNum; i++) {
-					uncheckpointedResults.removeLast();
-				}
-				version = responseVersion;
-				offset = lastCheckpointedOffset;
-			}
-
-			if (responseResults.size() > 0) {
-				int addStart = (int) (offset - responseOffset);
-				List<T> resultsToAdd = responseResults.subList(addStart, responseResults.size());
-				uncheckpointedResults.addAll(resultsToAdd);
-				offset += resultsToAdd.size();
-			}
-		}
-	}
-}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/TestCoordinationRequestHandler.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/TestCoordinationRequestHandler.java
new file mode 100644
index 0000000..aecf864
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/TestCoordinationRequestHandler.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.collect.utils;
+
+import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.streaming.api.operators.collect.CollectCoordinationRequest;
+import org.apache.flink.streaming.api.operators.collect.CollectCoordinationResponse;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkFunction;
+import org.apache.flink.util.OptionalFailure;
+
+import org.junit.Assert;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A {@link CoordinationRequestHandler} to test fetching SELECT query results.
+ */
+public class TestCoordinationRequestHandler<T> implements CoordinationRequestHandler {
+
+	private static final int BATCH_SIZE = 3;
+
+	private final TypeSerializer<T> serializer;
+	private final String accumulatorName;
+
+	private int checkpointCountDown;
+
+	private LinkedList<T> data;
+	private List<T> checkpointingData;
+	private List<T> checkpointedData;
+
+	private LinkedList<T> buffered;
+	private List<T> checkpointingBuffered;
+	private List<T> checkpointedBuffered;
+
+	private String version;
+
+	private long offset;
+	private long checkpointingOffset;
+	private long checkpointedOffset;
+
+	private Map<String, OptionalFailure<Object>> accumulatorResults;
+
+	private Random random;
+	private boolean closed;
+
+	public TestCoordinationRequestHandler(
+			List<T> data,
+			TypeSerializer<T> serializer,
+			String accumulatorName) {
+		this.serializer = serializer;
+		this.accumulatorName = accumulatorName;
+
+		this.checkpointCountDown = 0;
+
+		this.data = new LinkedList<>(data);
+		this.checkpointedData = new ArrayList<>(data);
+
+		this.buffered = new LinkedList<>();
+		this.checkpointedBuffered = new ArrayList<>();
+
+		this.version = UUID.randomUUID().toString();
+
+		this.offset = 0;
+		this.checkpointingOffset = 0;
+		this.checkpointedOffset = 0;
+
+		this.accumulatorResults = new HashMap<>();
+
+		this.random = new Random();
+		this.closed = false;
+	}
+
+	@Override
+	public CompletableFuture<CoordinationResponse> handleCoordinationRequest(CoordinationRequest request) {
+		if (closed) {
+			throw new RuntimeException("Handler closed");
+		}
+
+		Assert.assertTrue(request instanceof CollectCoordinationRequest);
+		CollectCoordinationRequest collectRequest = (CollectCoordinationRequest) request;
+
+		for (int i = random.nextInt(3) + 1; i > 0; i--) {
+			if (checkpointCountDown > 0) {
+				checkpointCountDown--;
+				if (checkpointCountDown == 0) {
+					checkpointedData = checkpointingData;
+					checkpointedBuffered = checkpointingBuffered;
+					checkpointedOffset = checkpointingOffset;
+				}
+			}
+
+			int r = random.nextInt(10);
+			if (r < 6) {
+				// with 60% chance we add data
+				int size = Math.min(data.size(), BATCH_SIZE * 2 - buffered.size());
+				if (size > 0) {
+					size = random.nextInt(size) + 1;
+				}
+				for (int j = 0; j < size; j++) {
+					buffered.add(data.removeFirst());
+				}
+
+				if (data.isEmpty()) {
+					buildAccumulatorResults();
+					closed = true;
+					break;
+				}
+			} else if (r < 9) {
+				// with 30% chance we do a checkpoint completed in the future
+				if (checkpointCountDown == 0) {
+					checkpointCountDown = random.nextInt(5) + 1;
+					checkpointingData = new ArrayList<>(data);
+					checkpointingBuffered = new ArrayList<>(buffered);
+					checkpointingOffset = offset;
+				}
+			} else {
+				// with 10% chance we fail
+				checkpointCountDown = 0;
+				version = UUID.randomUUID().toString();
+				data = new LinkedList<>(checkpointedData);
+				buffered = new LinkedList<>(checkpointedBuffered);
+				offset = checkpointedOffset;
+			}
+		}
+
+		Assert.assertTrue(offset <= collectRequest.getOffset());
+
+		List<T> subList = Collections.emptyList();
+		if (collectRequest.getVersion().equals(version)) {
+			while (buffered.size() > 0 && collectRequest.getOffset() > offset) {
+				buffered.removeFirst();
+				offset++;
+			}
+			subList = new ArrayList<>();
+			Iterator<T> iterator = buffered.iterator();
+			for (int i = 0; i < BATCH_SIZE && iterator.hasNext(); i++) {
+				subList.add(iterator.next());
+			}
+		}
+
+		CoordinationResponse response;
+		try {
+			if (random.nextBoolean()) {
+				// with 50% chance we return valid result
+				response = new CollectCoordinationResponse<>(version, checkpointedOffset, subList, serializer);
+			} else {
+				// with 50% chance we return invalid result
+				response = new CollectCoordinationResponse<>(
+					collectRequest.getVersion(),
+					-1,
+					Collections.emptyList(),
+					serializer);
+			}
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+		return CompletableFuture.completedFuture(response);
+	}
+
+	public boolean isClosed() {
+		return closed;
+	}
+
+	public Map<String, OptionalFailure<Object>> getAccumulatorResults() {
+		return accumulatorResults;
+	}
+
+	private void buildAccumulatorResults() {
+		List<T> finalResults = new ArrayList<>(buffered);
+		SerializedListAccumulator<byte[]> listAccumulator = new SerializedListAccumulator<>();
+		try {
+			byte[] serializedResult =
+				CollectSinkFunction.serializeAccumulatorResult(
+					offset, version, checkpointedOffset, finalResults, serializer);
+			listAccumulator.add(serializedResult, BytePrimitiveArraySerializer.INSTANCE);
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+
+		accumulatorResults.put(accumulatorName, OptionalFailure.of(listAccumulator.getLocalValue()));
+	}
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/TestJobClient.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/TestJobClient.java
new file mode 100644
index 0000000..3a40870
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/TestJobClient.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.collect.utils;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequestGateway;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.util.OptionalFailure;
+
+import org.junit.Assert;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A {@link JobClient} to test fetching SELECT query results.
+ */
+public class TestJobClient implements JobClient, CoordinationRequestGateway {
+
+	private final JobID jobId;
+	private final OperatorID operatorId;
+	private final CoordinationRequestHandler handler;
+	private final JobInfoProvider infoProvider;
+
+	private JobStatus jobStatus;
+	private JobExecutionResult jobExecutionResult;
+
+	public TestJobClient(
+			JobID jobId,
+			OperatorID operatorId,
+			CoordinationRequestHandler handler,
+			JobInfoProvider infoProvider) {
+		this.jobId = jobId;
+		this.operatorId = operatorId;
+		this.handler = handler;
+		this.infoProvider = infoProvider;
+
+		this.jobStatus = JobStatus.RUNNING;
+		this.jobExecutionResult = null;
+	}
+
+	@Override
+	public JobID getJobID() {
+		return jobId;
+	}
+
+	@Override
+	public CompletableFuture<JobStatus> getJobStatus() {
+		return CompletableFuture.completedFuture(jobStatus);
+	}
+
+	@Override
+	public CompletableFuture<Void> cancel() {
+		jobStatus = JobStatus.CANCELED;
+		return CompletableFuture.completedFuture(null);
+	}
+
+	@Override
+	public CompletableFuture<String> stopWithSavepoint(boolean advanceToEndOfEventTime, @Nullable String savepointDirectory) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public CompletableFuture<String> triggerSavepoint(@Nullable String savepointDirectory) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public CompletableFuture<Map<String, Object>> getAccumulators(ClassLoader classLoader) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public CompletableFuture<JobExecutionResult> getJobExecutionResult(ClassLoader userClassloader) {
+		return CompletableFuture.completedFuture(jobExecutionResult);
+	}
+
+	@Override
+	public CompletableFuture<CoordinationResponse> sendCoordinationRequest(OperatorID operatorId, CoordinationRequest request) {
+		if (jobStatus.isGloballyTerminalState()) {
+			throw new RuntimeException("Job terminated");
+		}
+
+		Assert.assertEquals(this.operatorId, operatorId);
+		CoordinationResponse response;
+		try {
+			response = handler.handleCoordinationRequest(request).get();
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+
+		if (infoProvider.isJobFinished()) {
+			jobStatus = JobStatus.FINISHED;
+			jobExecutionResult = new JobExecutionResult(jobId, 0, infoProvider.getAccumulatorResults());
+		}
+
+		return CompletableFuture.completedFuture(response);
+	}
+
+	/**
+	 * Interface to provide job related info for {@link TestJobClient}.
+	 */
+	public interface JobInfoProvider {
+
+		boolean isJobFinished();
+
+		Map<String, OptionalFailure<Object>> getAccumulatorResults();
+	}
+}