You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/12/20 09:56:38 UTC

[GitHub] zentol closed pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector

zentol closed pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/connectors/cassandra.md b/docs/dev/connectors/cassandra.md
index 2a2acb3bc38..292314dafd2 100644
--- a/docs/dev/connectors/cassandra.md
+++ b/docs/dev/connectors/cassandra.md
@@ -72,13 +72,16 @@ The following configuration methods can be used:
 4. _setMapperOptions(MapperOptions options)_
     * Sets the mapper options that are used to configure the DataStax ObjectMapper.
     * Only applies when processing __POJO__ data types.
-5. _enableWriteAheadLog([CheckpointCommitter committer])_
+5. _setMaxConcurrentRequests(int maxConcurrentRequests, Duration timeout)_
+    * Sets the maximum allowed number of concurrent requests with a timeout for acquiring permits to execute.
+    * Only applies when __enableWriteAheadLog()__ is not configured.
+6. _enableWriteAheadLog([CheckpointCommitter committer])_
     * An __optional__ setting
     * Allows exactly-once processing for non-deterministic algorithms.
-6. _setFailureHandler([CassandraFailureHandler failureHandler])_
+7. _setFailureHandler([CassandraFailureHandler failureHandler])_
     * An __optional__ setting
     * Sets the custom failure handler.
-7. _build()_
+8. _build()_
     * Finalizes the configuration and constructs the CassandraSink instance.
 
 ### Write-ahead Log
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java
index 41826f58203..5e1fcca05e7 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java
@@ -32,8 +32,12 @@
 	private final String insertQuery;
 	private transient PreparedStatement ps;
 
-	public AbstractCassandraTupleSink(String insertQuery, ClusterBuilder builder, CassandraFailureHandler failureHandler) {
-		super(builder, failureHandler);
+	public AbstractCassandraTupleSink(
+			String insertQuery,
+			ClusterBuilder builder,
+			CassandraSinkBaseConfig config,
+			CassandraFailureHandler failureHandler) {
+		super(builder, config, failureHandler);
 		this.insertQuery = insertQuery;
 	}
 
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
index cf4432d4b20..7a3eba091b8 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
@@ -51,24 +51,51 @@
 	 *
 	 * @param clazz Class instance
 	 */
-	public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder) {
+	public CassandraPojoSink(
+			Class<IN> clazz,
+			ClusterBuilder builder) {
 		this(clazz, builder, null, null);
 	}
 
-	public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder, @Nullable MapperOptions options) {
+	public CassandraPojoSink(
+			Class<IN> clazz,
+			ClusterBuilder builder,
+			@Nullable MapperOptions options) {
 		this(clazz, builder, options, null);
 	}
 
-	public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder, String keyspace) {
+	public CassandraPojoSink(
+			Class<IN> clazz,
+			ClusterBuilder builder,
+			String keyspace) {
 		this(clazz, builder, null, keyspace);
 	}
 
-	public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder, @Nullable MapperOptions options, String keyspace) {
-		this(clazz, builder, options, keyspace, new NoOpCassandraFailureHandler());
+	public CassandraPojoSink(
+			Class<IN> clazz,
+			ClusterBuilder builder,
+			@Nullable MapperOptions options,
+			String keyspace) {
+		this(clazz, builder, options, keyspace, CassandraSinkBaseConfig.newBuilder().build());
 	}
 
-	public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder, @Nullable MapperOptions options, String keyspace, CassandraFailureHandler failureHandler) {
-		super(builder, failureHandler);
+	CassandraPojoSink(
+			Class<IN> clazz,
+			ClusterBuilder builder,
+			@Nullable MapperOptions options,
+			String keyspace,
+			CassandraSinkBaseConfig config) {
+		this(clazz, builder, options, keyspace, config, new NoOpCassandraFailureHandler());
+	}
+
+	CassandraPojoSink(
+			Class<IN> clazz,
+			ClusterBuilder builder,
+			@Nullable MapperOptions options,
+			String keyspace,
+			CassandraSinkBaseConfig config,
+			CassandraFailureHandler failureHandler) {
+		super(builder, config, failureHandler);
 		this.clazz = clazz;
 		this.options = options;
 		this.keyspace = keyspace;
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java
index f51506b802c..a60aebae9df 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java
@@ -26,12 +26,28 @@
 
 	private final int rowArity;
 
-	public CassandraRowSink(int rowArity, String insertQuery, ClusterBuilder builder) {
-		this(rowArity, insertQuery, builder, new NoOpCassandraFailureHandler());
+	public CassandraRowSink(
+			int rowArity,
+			String insertQuery,
+			ClusterBuilder builder) {
+		this(rowArity, insertQuery, builder, CassandraSinkBaseConfig.newBuilder().build());
 	}
 
-	public CassandraRowSink(int rowArity, String insertQuery, ClusterBuilder builder, CassandraFailureHandler failureHandler) {
-		super(insertQuery, builder, failureHandler);
+	CassandraRowSink(
+			int rowArity,
+			String insertQuery,
+			ClusterBuilder builder,
+			CassandraSinkBaseConfig config) {
+		this(rowArity, insertQuery, builder, config, new NoOpCassandraFailureHandler());
+	}
+
+	CassandraRowSink(
+			int rowArity,
+			String insertQuery,
+			ClusterBuilder builder,
+			CassandraSinkBaseConfig config,
+			CassandraFailureHandler failureHandler) {
+		super(insertQuery, builder, config, failureHandler);
 		this.rowArity = rowArity;
 	}
 
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java
index 84af78d7a6a..13d1a8f253c 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java
@@ -26,12 +26,25 @@
  * @param <IN> Type of the elements emitted by this sink, it must extend {@link Product}
  */
 public class CassandraScalaProductSink<IN extends Product> extends AbstractCassandraTupleSink<IN> {
-	public CassandraScalaProductSink(String insertQuery, ClusterBuilder builder) {
-		this(insertQuery, builder, new NoOpCassandraFailureHandler());
+	public CassandraScalaProductSink(
+			String insertQuery,
+			ClusterBuilder builder) {
+		this(insertQuery, builder, CassandraSinkBaseConfig.newBuilder().build());
 	}
 
-	public CassandraScalaProductSink(String insertQuery, ClusterBuilder builder, CassandraFailureHandler failureHandler) {
-		super(insertQuery, builder, failureHandler);
+	CassandraScalaProductSink(
+			String insertQuery,
+			ClusterBuilder builder,
+			CassandraSinkBaseConfig config) {
+		this(insertQuery, builder, config, new NoOpCassandraFailureHandler());
+	}
+
+	CassandraScalaProductSink(
+			String insertQuery,
+			ClusterBuilder builder,
+			CassandraSinkBaseConfig config,
+			CassandraFailureHandler failureHandler) {
+		super(insertQuery, builder, config, failureHandler);
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
index e774cd31893..fff8af72ea0 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
@@ -37,6 +37,8 @@
 
 import com.datastax.driver.core.Cluster;
 
+import java.time.Duration;
+
 import scala.Product;
 
 /**
@@ -235,6 +237,7 @@ private CassandraSink(SingleOutputStreamOperator<IN> sink) {
 		protected final DataStream<IN> input;
 		protected final TypeSerializer<IN> serializer;
 		protected final TypeInformation<IN> typeInfo;
+		protected final CassandraSinkBaseConfig.Builder configBuilder;
 		protected ClusterBuilder builder;
 		protected String keyspace;
 		protected MapperOptions mapperOptions;
@@ -247,6 +250,7 @@ public CassandraSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo,
 			this.input = input;
 			this.typeInfo = typeInfo;
 			this.serializer = serializer;
+			this.configBuilder = CassandraSinkBaseConfig.newBuilder();
 		}
 
 		/**
@@ -367,6 +371,34 @@ protected Cluster buildCluster(Cluster.Builder builder) {
 			return this;
 		}
 
+		/**
+		 * Sets the maximum allowed number of concurrent requests for this sink.
+		 *
+		 * <p>This call has no effect if {@link CassandraSinkBuilder#enableWriteAheadLog()} is called.
+		 *
+		 * @param maxConcurrentRequests maximum number of concurrent requests allowed
+		 * @param timeout timeout duration when acquiring a permit to execute
+		 * @return this builder
+		 */
+		public CassandraSinkBuilder<IN> setMaxConcurrentRequests(int maxConcurrentRequests, Duration timeout) {
+			this.configBuilder.setMaxConcurrentRequests(maxConcurrentRequests);
+			this.configBuilder.setMaxConcurrentRequestsTimeout(timeout);
+			return this;
+		}
+
+		/**
+		 * Sets the maximum allowed number of concurrent requests for this sink.
+		 *
+		 * <p>This call has no effect if {@link CassandraSinkBuilder#enableWriteAheadLog()} is called.
+		 *
+		 * @param maxConcurrentRequests maximum number of concurrent requests allowed
+		 * @return this builder
+		 */
+		public CassandraSinkBuilder<IN> setMaxConcurrentRequests(int maxConcurrentRequests) {
+			this.configBuilder.setMaxConcurrentRequests(maxConcurrentRequests);
+			return this;
+		}
+
 		/**
 		 * Finalizes the configuration of this sink.
 		 *
@@ -416,7 +448,12 @@ protected void sanityCheck() {
 
 		@Override
 		public CassandraSink<IN> createSink() throws Exception {
-			return new CassandraSink<>(input.addSink(new CassandraTupleSink<IN>(query, builder, failureHandler)).name("Cassandra Sink"));
+			final CassandraTupleSink<IN> sink = new CassandraTupleSink<>(
+				query,
+				builder,
+				configBuilder.build(),
+				failureHandler);
+			return new CassandraSink<>(input.addSink(sink).name("Cassandra Sink"));
 		}
 
 		@Override
@@ -448,8 +485,13 @@ protected void sanityCheck() {
 
 		@Override
 		protected CassandraSink<Row> createSink() throws Exception {
-			return new CassandraSink<>(input.addSink(new CassandraRowSink(typeInfo.getArity(), query, builder, failureHandler)).name("Cassandra Sink"));
-
+			final CassandraRowSink sink = new CassandraRowSink(
+				typeInfo.getArity(),
+				query,
+				builder,
+				configBuilder.build(),
+				failureHandler);
+			return new CassandraSink<>(input.addSink(sink).name("Cassandra Sink"));
 		}
 
 		@Override
@@ -479,7 +521,14 @@ protected void sanityCheck() {
 
 		@Override
 		public CassandraSink<IN> createSink() throws Exception {
-			return new CassandraSink<>(input.addSink(new CassandraPojoSink<>(typeInfo.getTypeClass(), builder, mapperOptions, keyspace, failureHandler)).name("Cassandra Sink"));
+			final CassandraPojoSink<IN> sink = new CassandraPojoSink<>(
+				typeInfo.getTypeClass(),
+				builder,
+				mapperOptions,
+				keyspace,
+				configBuilder.build(),
+				failureHandler);
+			return new CassandraSink<>(input.addSink(sink).name("Cassandra Sink"));
 		}
 
 		@Override
@@ -493,7 +542,6 @@ protected void sanityCheck() {
 	 * @param <IN>
 	 */
 	public static class CassandraScalaProductSinkBuilder<IN extends Product> extends CassandraSinkBuilder<IN> {
-
 		public CassandraScalaProductSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
 			super(input, typeInfo, serializer);
 		}
@@ -511,7 +559,12 @@ protected void sanityCheck() {
 
 		@Override
 		public CassandraSink<IN> createSink() throws Exception {
-			return new CassandraSink<>(input.addSink(new CassandraScalaProductSink<IN>(query, builder, failureHandler)).name("Cassandra Sink"));
+			final CassandraScalaProductSink<IN> sink = new CassandraScalaProductSink<>(
+				query,
+				builder,
+				configBuilder.build(),
+				failureHandler);
+			return new CassandraSink<>(input.addSink(sink).name("Cassandra Sink"));
 		}
 
 		@Override
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
index d24347ec89d..f18e256ed56 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
@@ -24,6 +24,7 @@
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.Preconditions;
 
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.Session;
@@ -33,9 +34,10 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * CassandraSinkBase is the common abstract class of {@link CassandraPojoSink} and {@link
@@ -45,21 +47,23 @@
  */
 public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> implements CheckpointedFunction {
 	protected final Logger log = LoggerFactory.getLogger(getClass());
+
 	protected transient Cluster cluster;
 	protected transient Session session;
 
-	protected transient volatile Throwable exception;
-	protected transient FutureCallback<V> callback;
+	private AtomicReference<Throwable> throwable;
+	private FutureCallback<V> callback;
+	private Semaphore semaphore;
 
 	private final ClusterBuilder builder;
-
-	private final AtomicInteger updatesPending = new AtomicInteger();
+	private final CassandraSinkBaseConfig config;
 
 	private final CassandraFailureHandler failureHandler;
 
-	CassandraSinkBase(ClusterBuilder builder, CassandraFailureHandler failureHandler) {
+	CassandraSinkBase(ClusterBuilder builder, CassandraSinkBaseConfig config, CassandraFailureHandler failureHandler) {
 		this.builder = builder;
-		this.failureHandler = checkNotNull(failureHandler);
+		this.config = config;
+		this.failureHandler = Preconditions.checkNotNull(failureHandler);
 		ClosureCleaner.clean(builder, true);
 	}
 
@@ -68,50 +72,28 @@ public void open(Configuration configuration) {
 		this.callback = new FutureCallback<V>() {
 			@Override
 			public void onSuccess(V ignored) {
-				int pending = updatesPending.decrementAndGet();
-				if (pending == 0) {
-					synchronized (updatesPending) {
-						updatesPending.notifyAll();
-					}
-				}
+				semaphore.release();
 			}
 
 			@Override
 			public void onFailure(Throwable t) {
-				int pending = updatesPending.decrementAndGet();
-				if (pending == 0) {
-					synchronized (updatesPending) {
-						updatesPending.notifyAll();
-					}
-				}
-				exception = t;
-
+				throwable.compareAndSet(null, t);
 				log.error("Error while sending value.", t);
+				semaphore.release();
 			}
 		};
 		this.cluster = builder.getCluster();
 		this.session = createSession();
-	}
-
-	protected Session createSession() {
-		return cluster.connect();
-	}
 
-	@Override
-	public void invoke(IN value) throws Exception {
-		checkAsyncErrors();
-		ListenableFuture<V> result = send(value);
-		updatesPending.incrementAndGet();
-		Futures.addCallback(result, callback);
+		throwable = new AtomicReference<>();
+		semaphore = new Semaphore(config.getMaxConcurrentRequests());
 	}
 
-	public abstract ListenableFuture<V> send(IN value);
-
 	@Override
 	public void close() throws Exception {
 		try {
 			checkAsyncErrors();
-			waitForPendingUpdates();
+			flush();
 			checkAsyncErrors();
 		} finally {
 			try {
@@ -138,29 +120,55 @@ public void initializeState(FunctionInitializationContext context) throws Except
 	@Override
 	public void snapshotState(FunctionSnapshotContext ctx) throws Exception {
 		checkAsyncErrors();
-		waitForPendingUpdates();
+		flush();
 		checkAsyncErrors();
 	}
 
-	private void waitForPendingUpdates() throws InterruptedException {
-		synchronized (updatesPending) {
-			while (updatesPending.get() > 0) {
-				updatesPending.wait();
-			}
+	@Override
+	public void invoke(IN value) throws Exception {
+		checkAsyncErrors();
+		tryAcquire();
+		final ListenableFuture<V> result = send(value);
+		Futures.addCallback(result, callback);
+	}
+
+	protected Session createSession() {
+		return cluster.connect();
+	}
+
+	public abstract ListenableFuture<V> send(IN value);
+
+	private void tryAcquire() throws InterruptedException, TimeoutException {
+		if (!semaphore.tryAcquire(config.getMaxConcurrentRequestsTimeout().toMillis(), TimeUnit.MILLISECONDS)) {
+			throw new TimeoutException(
+				String.format(
+					"Failed to acquire 1 permit of %d to send value in %s.",
+					config.getMaxConcurrentRequests(),
+					config.getMaxConcurrentRequestsTimeout()
+				)
+			);
 		}
 	}
 
 	private void checkAsyncErrors() throws Exception {
-		Throwable error = exception;
-		if (error != null) {
-			// prevent throwing duplicated error
-			exception = null;
-			failureHandler.onFailure(error);
+		final Throwable currentError = throwable.getAndSet(null);
+		if (currentError != null) {
+			failureHandler.onFailure(currentError);
 		}
 	}
 
+	private void flush() {
+		semaphore.acquireUninterruptibly(config.getMaxConcurrentRequests());
+		semaphore.release(config.getMaxConcurrentRequests());
+	}
+
+	@VisibleForTesting
+	int getAvailablePermits() {
+		return semaphore.availablePermits();
+	}
+
 	@VisibleForTesting
-	int getNumOfPendingRecords() {
-		return updatesPending.get();
+	int getAcquiredPermits() {
+		return config.getMaxConcurrentRequests() - semaphore.availablePermits();
 	}
 }
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseConfig.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseConfig.java
new file mode 100644
index 00000000000..cb8d904fbf9
--- /dev/null
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseConfig.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.time.Duration;
+
+/**
+ * Configuration for {@link CassandraSinkBase}.
+ */
+public final class CassandraSinkBaseConfig implements Serializable  {
+	// ------------------------ Default Configurations ------------------------
+
+	/**
+	 * The default maximum number of concurrent requests. By default, {@code Integer.MAX_VALUE}.
+	 */
+	public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = Integer.MAX_VALUE;
+
+	/**
+	 * The default timeout duration when acquiring a permit to execute. By default, {@code Long.MAX_VALUE}.
+	 */
+	public static final Duration DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = Duration.ofMillis(Long.MAX_VALUE);
+
+	// ------------------------- Configuration Fields -------------------------
+
+	/** Maximum number of concurrent requests allowed. */
+	private final int maxConcurrentRequests;
+
+	/** Timeout duration when acquiring a permit to execute. */
+	private final Duration maxConcurrentRequestsTimeout;
+
+	private CassandraSinkBaseConfig(
+			int maxConcurrentRequests,
+			Duration maxConcurrentRequestsTimeout) {
+		Preconditions.checkArgument(maxConcurrentRequests > 0,
+			"Max concurrent requests is expected to be positive");
+		Preconditions.checkNotNull(maxConcurrentRequestsTimeout,
+			"Max concurrent requests timeout cannot be null");
+		Preconditions.checkArgument(!maxConcurrentRequestsTimeout.isNegative(),
+			"Max concurrent requests timeout is expected to be positive");
+		this.maxConcurrentRequests = maxConcurrentRequests;
+		this.maxConcurrentRequestsTimeout = maxConcurrentRequestsTimeout;
+	}
+
+	public int getMaxConcurrentRequests() {
+		return maxConcurrentRequests;
+	}
+
+	public Duration getMaxConcurrentRequestsTimeout() {
+		return maxConcurrentRequestsTimeout;
+	}
+
+	@Override
+	public String toString() {
+		return "CassandraSinkBaseConfig{" +
+			"maxConcurrentRequests=" + maxConcurrentRequests +
+			", maxConcurrentRequestsTimeout=" + maxConcurrentRequestsTimeout +
+			'}';
+	}
+
+	public static Builder newBuilder() {
+		return new Builder();
+	}
+
+	/**
+	 * Builder for the {@link CassandraSinkBaseConfig}.
+	 */
+	public static class Builder {
+		private int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS;
+		private Duration maxConcurrentRequestsTimeout = DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT;
+
+		Builder() { }
+
+		public Builder setMaxConcurrentRequests(int maxConcurrentRequests) {
+			this.maxConcurrentRequests = maxConcurrentRequests;
+			return this;
+		}
+
+		public Builder setMaxConcurrentRequestsTimeout(Duration timeout) {
+			this.maxConcurrentRequestsTimeout = timeout;
+			return this;
+		}
+
+		public CassandraSinkBaseConfig build() {
+			return new CassandraSinkBaseConfig(
+				maxConcurrentRequests,
+				maxConcurrentRequestsTimeout);
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
index 116acfd8569..4164bce6afd 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
@@ -25,12 +25,25 @@
  * @param <IN> Type of the elements emitted by this sink, it must extend {@link Tuple}
  */
 public class CassandraTupleSink<IN extends Tuple> extends AbstractCassandraTupleSink<IN> {
-	public CassandraTupleSink(String insertQuery, ClusterBuilder builder) {
-		this(insertQuery, builder, new NoOpCassandraFailureHandler());
+	public CassandraTupleSink(
+			String insertQuery,
+			ClusterBuilder builder) {
+		this(insertQuery, builder, CassandraSinkBaseConfig.newBuilder().build());
 	}
 
-	public CassandraTupleSink(String insertQuery, ClusterBuilder builder, CassandraFailureHandler failureHandler) {
-		super(insertQuery, builder, failureHandler);
+	CassandraTupleSink(
+			String insertQuery,
+			ClusterBuilder builder,
+			CassandraSinkBaseConfig config) {
+		this(insertQuery, builder, config, new NoOpCassandraFailureHandler());
+	}
+
+	CassandraTupleSink(
+			String insertQuery,
+			ClusterBuilder builder,
+			CassandraSinkBaseConfig config,
+			CassandraFailureHandler failureHandler) {
+		super(insertQuery, builder, config, failureHandler);
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
index 8c66882611a..2b705a56e7b 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
@@ -22,11 +22,11 @@
 import org.apache.flink.queryablestate.FutureUtils;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.ResultSetFuture;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.exceptions.NoHostAvailableException;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -34,8 +34,14 @@
 import org.junit.Test;
 
 import java.io.IOException;
+import java.time.Duration;
+import java.util.LinkedList;
+import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeoutException;
 
+import static org.hamcrest.number.OrderingComparison.greaterThan;
 import static org.mockito.Mockito.mock;
 import static org.powermock.api.mockito.PowerMockito.when;
 
@@ -44,6 +50,8 @@
  */
 public class CassandraSinkBaseTest {
 
+	private static final long DEFAULT_TEST_TIMEOUT = 5000;
+
 	@Test(expected = NoHostAvailableException.class)
 	public void testHostNotFoundErrorHandling() throws Exception {
 		CassandraSinkBase base = new CassandraSinkBase(new ClusterBuilder() {
@@ -54,7 +62,7 @@ protected Cluster buildCluster(Cluster.Builder builder) {
 					.withoutJMXReporting()
 					.withoutMetrics().build();
 			}
-		}, new NoOpCassandraFailureHandler()) {
+		}, CassandraSinkBaseConfig.newBuilder().build(), new NoOpCassandraFailureHandler()) {
 			@Override
 			public ListenableFuture send(Object value) {
 				return null;
@@ -64,166 +72,266 @@ public ListenableFuture send(Object value) {
 		base.open(new Configuration());
 	}
 
-	@Test(timeout = 5000)
+	@Test(timeout = DEFAULT_TEST_TIMEOUT)
 	public void testSuccessfulPath() throws Exception {
-		TestCassandraSink casSinkFunc = new TestCassandraSink();
-		casSinkFunc.open(new Configuration());
+		try (TestCassandraSink casSinkFunc = createOpenedTestCassandraSink()) {
+			casSinkFunc.enqueueCompletableFuture(CompletableFuture.completedFuture(null));
 
-		casSinkFunc.setResultFuture(ResultSetFutures.fromCompletableFuture(CompletableFuture.completedFuture(null)));
-		casSinkFunc.invoke("hello");
+			final int originalPermits = casSinkFunc.getAvailablePermits();
+			Assert.assertThat(originalPermits, greaterThan(0));
+			Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
 
-		Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+			casSinkFunc.invoke("hello");
 
-		casSinkFunc.close();
+			Assert.assertEquals(originalPermits, casSinkFunc.getAvailablePermits());
+			Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
+		}
 	}
 
-	@Test(timeout = 5000)
+	@Test(timeout = DEFAULT_TEST_TIMEOUT)
 	public void testThrowErrorOnClose() throws Exception {
 		TestCassandraSink casSinkFunc = new TestCassandraSink();
 
 		casSinkFunc.open(new Configuration());
 
 		Exception cause = new RuntimeException();
-		casSinkFunc.setResultFuture(ResultSetFutures.fromCompletableFuture(FutureUtils.getFailedFuture(cause)));
+		casSinkFunc.enqueueCompletableFuture(FutureUtils.getFailedFuture(cause));
 		casSinkFunc.invoke("hello");
 		try {
 			casSinkFunc.close();
 
 			Assert.fail("Close should have thrown an exception.");
 		} catch (IOException e) {
-			Assert.assertEquals(cause, e.getCause());
-			Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+			ExceptionUtils.findThrowable(e, candidate -> candidate == cause)
+				.orElseThrow(() -> e);
 		}
 	}
 
-	@Test(timeout = 5000)
+	@Test(timeout = DEFAULT_TEST_TIMEOUT)
 	public void testThrowErrorOnInvoke() throws Exception {
-		TestCassandraSink casSinkFunc = new TestCassandraSink();
-
-		casSinkFunc.open(new Configuration());
-
-		Exception cause = new RuntimeException();
-		casSinkFunc.setResultFuture(ResultSetFutures.fromCompletableFuture(FutureUtils.getFailedFuture(cause)));
-
-		casSinkFunc.invoke("hello");
-
-		try {
-			casSinkFunc.invoke("world");
-			Assert.fail("Sending of second value should have failed.");
-		} catch (IOException e) {
-			Assert.assertEquals(cause, e.getCause());
-			Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+		try (TestCassandraSink casSinkFunc = createOpenedTestCassandraSink()) {
+			Exception cause = new RuntimeException();
+			casSinkFunc.enqueueCompletableFuture(FutureUtils.getFailedFuture(cause));
+
+			casSinkFunc.invoke("hello");
+
+			try {
+				casSinkFunc.invoke("world");
+				Assert.fail("Sending of second value should have failed.");
+			} catch (IOException e) {
+				Assert.assertEquals(cause, e.getCause());
+				Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
+			}
 		}
 	}
 
-	@Test(timeout = 5000)
+	@Test(timeout = DEFAULT_TEST_TIMEOUT)
 	public void testIgnoreError() throws Exception {
 		Exception cause = new RuntimeException();
 		CassandraFailureHandler failureHandler = failure -> Assert.assertEquals(cause, failure);
-		TestCassandraSink casSinkFunc = new TestCassandraSink(failureHandler);
 
-		casSinkFunc.open(new Configuration());
+		try (TestCassandraSink casSinkFunc = createOpenedTestCassandraSink(failureHandler)) {
 
-		casSinkFunc.setResultFuture(ResultSetFutures.fromCompletableFuture(FutureUtils.getFailedFuture(cause)));
+			casSinkFunc.enqueueCompletableFuture(FutureUtils.getFailedFuture(cause));
+			casSinkFunc.enqueueCompletableFuture(FutureUtils.getFailedFuture(cause));
 
-		casSinkFunc.invoke("hello");
-		casSinkFunc.invoke("world");
+			casSinkFunc.invoke("hello");
+			casSinkFunc.invoke("world");
+		}
 	}
 
-	@Test(timeout = 5000)
+	@Test(timeout = DEFAULT_TEST_TIMEOUT)
 	public void testThrowErrorOnSnapshot() throws Exception {
 		TestCassandraSink casSinkFunc = new TestCassandraSink();
 
-		OneInputStreamOperatorTestHarness<String, Object> testHarness =
-			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(casSinkFunc));
+		try (OneInputStreamOperatorTestHarness<String, Object> testHarness = createOpenedTestHarness(casSinkFunc)) {
+			Exception cause = new RuntimeException();
+			casSinkFunc.enqueueCompletableFuture(FutureUtils.getFailedFuture(cause));
 
-		testHarness.open();
+			casSinkFunc.invoke("hello");
 
-		Exception cause = new RuntimeException();
-		casSinkFunc.setResultFuture(ResultSetFutures.fromCompletableFuture(FutureUtils.getFailedFuture(cause)));
+			try {
+				testHarness.snapshot(123L, 123L);
 
-		casSinkFunc.invoke("hello");
+				Assert.fail();
+			} catch (Exception e) {
+				Assert.assertTrue(e.getCause() instanceof IOException);
+			}
+		}
+	}
 
-		try {
-			testHarness.snapshot(123L, 123L);
+	@Test(timeout = DEFAULT_TEST_TIMEOUT)
+	public void testWaitForPendingUpdatesOnSnapshot() throws Exception {
+		final TestCassandraSink casSinkFunc = new TestCassandraSink();
 
-			Assert.fail();
-		} catch (Exception e) {
-			Assert.assertTrue(e.getCause() instanceof IOException);
-			Assert.assertEquals(cause, e.getCause().getCause());
-			Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
-		}
+		try (OneInputStreamOperatorTestHarness<String, Object> testHarness = createOpenedTestHarness(casSinkFunc)) {
+			CompletableFuture<ResultSet> completableFuture = new CompletableFuture<>();
+			casSinkFunc.enqueueCompletableFuture(completableFuture);
+
+			casSinkFunc.invoke("hello");
+			Assert.assertEquals(1, casSinkFunc.getAcquiredPermits());
+
+			final CountDownLatch latch = new CountDownLatch(1);
+			Thread t = new CheckedThread("Flink-CassandraSinkBaseTest") {
+				@Override
+				public void go() throws Exception {
+					testHarness.snapshot(123L, 123L);
+					latch.countDown();
+				}
+			};
+			t.start();
+			while (t.getState() != Thread.State.WAITING) {
+				Thread.sleep(5);
+			}
 
-		testHarness.close();
+			Assert.assertEquals(1, casSinkFunc.getAcquiredPermits());
+			completableFuture.complete(null);
+			latch.await();
+			Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
+		}
 	}
 
-	@Test(timeout = 5000)
-	public void testWaitForPendingUpdatesOnSnapshot() throws Exception {
+	@Test(timeout = DEFAULT_TEST_TIMEOUT)
+	public void testWaitForPendingUpdatesOnClose() throws Exception {
 		TestCassandraSink casSinkFunc = new TestCassandraSink();
 
-		OneInputStreamOperatorTestHarness<String, Object> testHarness =
-			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(casSinkFunc));
+		try (OneInputStreamOperatorTestHarness<String, Object> testHarness = createOpenedTestHarness(casSinkFunc)) {
 
-		testHarness.open();
-
-		CompletableFuture<ResultSet> completableFuture = new CompletableFuture<>();
-		ResultSetFuture resultSetFuture = ResultSetFutures.fromCompletableFuture(completableFuture);
-		casSinkFunc.setResultFuture(resultSetFuture);
+			CompletableFuture<ResultSet> completableFuture = new CompletableFuture<>();
+			casSinkFunc.enqueueCompletableFuture(completableFuture);
 
-		casSinkFunc.invoke("hello");
-		Assert.assertEquals(1, casSinkFunc.getNumOfPendingRecords());
+			casSinkFunc.invoke("hello");
+			Assert.assertEquals(1, casSinkFunc.getAcquiredPermits());
 
-		Thread t = new CheckedThread("Flink-CassandraSinkBaseTest") {
-			@Override
-			public void go() throws Exception {
-				testHarness.snapshot(123L, 123L);
+			final CountDownLatch latch = new CountDownLatch(1);
+			Thread t = new CheckedThread("Flink-CassandraSinkBaseTest") {
+				@Override
+				public void go() throws Exception {
+					testHarness.close();
+					latch.countDown();
+				}
+			};
+			t.start();
+			while (t.getState() != Thread.State.WAITING) {
+				Thread.sleep(5);
 			}
-		};
-		t.start();
-		while (t.getState() != Thread.State.WAITING) {
-			Thread.sleep(5);
+
+			Assert.assertEquals(1, casSinkFunc.getAcquiredPermits());
+			completableFuture.complete(null);
+			latch.await();
+			Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
 		}
+	}
+
+	@Test(timeout = DEFAULT_TEST_TIMEOUT)
+	public void testReleaseOnSuccess() throws Exception {
+		final CassandraSinkBaseConfig config = CassandraSinkBaseConfig.newBuilder()
+			.setMaxConcurrentRequests(1)
+			.build();
+
+		try (TestCassandraSink testCassandraSink = createOpenedTestCassandraSink(config)) {
+			Assert.assertEquals(1, testCassandraSink.getAvailablePermits());
+			Assert.assertEquals(0, testCassandraSink.getAcquiredPermits());
+
+			CompletableFuture<ResultSet> completableFuture = new CompletableFuture<>();
+			testCassandraSink.enqueueCompletableFuture(completableFuture);
+			testCassandraSink.invoke("N/A");
+
+			Assert.assertEquals(0, testCassandraSink.getAvailablePermits());
+			Assert.assertEquals(1, testCassandraSink.getAcquiredPermits());
 
-		Assert.assertEquals(1, casSinkFunc.getNumOfPendingRecords());
-		completableFuture.complete(null);
-		Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+			completableFuture.complete(null);
 
-		testHarness.close();
+			Assert.assertEquals(1, testCassandraSink.getAvailablePermits());
+			Assert.assertEquals(0, testCassandraSink.getAcquiredPermits());
+		}
 	}
 
-	@Test(timeout = 5000)
-	public void testWaitForPendingUpdatesOnClose() throws Exception {
-		TestCassandraSink casSinkFunc = new TestCassandraSink();
+	@Test(timeout = DEFAULT_TEST_TIMEOUT)
+	public void testReleaseOnFailure() throws Exception {
+		final CassandraSinkBaseConfig config = CassandraSinkBaseConfig.newBuilder()
+			.setMaxConcurrentRequests(1)
+			.build();
+		final CassandraFailureHandler failureHandler = ignored -> {};
 
-		OneInputStreamOperatorTestHarness<String, Object> testHarness =
-			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(casSinkFunc));
+		try (TestCassandraSink testCassandraSink = createOpenedTestCassandraSink(config, failureHandler)) {
+			Assert.assertEquals(1, testCassandraSink.getAvailablePermits());
+			Assert.assertEquals(0, testCassandraSink.getAcquiredPermits());
 
-		testHarness.open();
+			CompletableFuture<ResultSet> completableFuture = new CompletableFuture<>();
+			testCassandraSink.enqueueCompletableFuture(completableFuture);
+			testCassandraSink.invoke("N/A");
 
-		CompletableFuture<ResultSet> completableFuture = new CompletableFuture<>();
-		ResultSetFuture resultSetFuture = ResultSetFutures.fromCompletableFuture(completableFuture);
-		casSinkFunc.setResultFuture(resultSetFuture);
+			Assert.assertEquals(0, testCassandraSink.getAvailablePermits());
+			Assert.assertEquals(1, testCassandraSink.getAcquiredPermits());
 
-		casSinkFunc.invoke("hello");
-		Assert.assertEquals(1, casSinkFunc.getNumOfPendingRecords());
+			completableFuture.completeExceptionally(new RuntimeException());
 
-		Thread t = new CheckedThread("Flink-CassandraSinkBaseTest") {
-			@Override
-			public void go() throws Exception {
-				testHarness.close();
+			Assert.assertEquals(1, testCassandraSink.getAvailablePermits());
+			Assert.assertEquals(0, testCassandraSink.getAcquiredPermits());
+		}
+	}
+
+	@Test(timeout = DEFAULT_TEST_TIMEOUT)
+	public void testTimeoutExceptionOnInvoke() throws Exception {
+		final CassandraSinkBaseConfig config = CassandraSinkBaseConfig.newBuilder()
+			.setMaxConcurrentRequests(1)
+			.setMaxConcurrentRequestsTimeout(Duration.ofMillis(1))
+			.build();
+
+		try (TestCassandraSink testCassandraSink = createOpenedTestCassandraSink(config)) {
+			CompletableFuture<ResultSet> completableFuture = new CompletableFuture<>();
+			testCassandraSink.enqueueCompletableFuture(completableFuture);
+			testCassandraSink.enqueueCompletableFuture(completableFuture);
+			testCassandraSink.invoke("Invoke #1");
+
+			try {
+				testCassandraSink.invoke("Invoke #2");
+				Assert.fail("Sending value should have experienced a TimeoutException");
+			} catch (Exception e) {
+				Assert.assertTrue(e instanceof TimeoutException);
+			} finally {
+				completableFuture.complete(null);
 			}
-		};
-		t.start();
-		while (t.getState() != Thread.State.WAITING) {
-			Thread.sleep(5);
 		}
+	}
 
-		Assert.assertEquals(1, casSinkFunc.getNumOfPendingRecords());
-		completableFuture.complete(null);
-		Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+	private TestCassandraSink createOpenedTestCassandraSink() {
+		final TestCassandraSink testCassandraSink = new TestCassandraSink();
+		testCassandraSink.open(new Configuration());
+		return testCassandraSink;
 	}
 
-	private static class TestCassandraSink extends CassandraSinkBase<String, ResultSet> {
+	private TestCassandraSink createOpenedTestCassandraSink(CassandraFailureHandler failureHandler) {
+		final TestCassandraSink testCassandraSink = new TestCassandraSink(failureHandler);
+		testCassandraSink.open(new Configuration());
+		return testCassandraSink;
+	}
+
+	private TestCassandraSink createOpenedTestCassandraSink(CassandraSinkBaseConfig config) {
+		final TestCassandraSink testCassandraSink = new TestCassandraSink(config);
+		testCassandraSink.open(new Configuration());
+		return testCassandraSink;
+	}
+
+	private TestCassandraSink createOpenedTestCassandraSink(
+		CassandraSinkBaseConfig config,
+		CassandraFailureHandler failureHandler) {
+		final TestCassandraSink testCassandraSink = new TestCassandraSink(config, failureHandler);
+		testCassandraSink.open(new Configuration());
+		return testCassandraSink;
+	}
+
+	private OneInputStreamOperatorTestHarness<String, Object> createOpenedTestHarness(
+		TestCassandraSink testCassandraSink) throws Exception {
+		final StreamSink<String> testStreamSink = new StreamSink<>(testCassandraSink);
+		final OneInputStreamOperatorTestHarness<String, Object> testHarness =
+			new OneInputStreamOperatorTestHarness<>(testStreamSink);
+		testHarness.open();
+		return testHarness;
+	}
+
+	private static class TestCassandraSink extends CassandraSinkBase<String, ResultSet> implements AutoCloseable {
 
 		private static final ClusterBuilder builder;
 		private static final Cluster cluster;
@@ -243,24 +351,32 @@ protected Cluster buildCluster(Cluster.Builder builder) {
 			};
 		}
 
-		private ResultSetFuture result;
+		private final Queue<ListenableFuture<ResultSet>> resultSetFutures = new LinkedList<>();
 
 		TestCassandraSink() {
-			super(builder, new NoOpCassandraFailureHandler());
+			this(CassandraSinkBaseConfig.newBuilder().build());
+		}
+
+		TestCassandraSink(CassandraSinkBaseConfig config) {
+			this(config, new NoOpCassandraFailureHandler());
 		}
 
 		TestCassandraSink(CassandraFailureHandler failureHandler) {
-			super(builder, failureHandler);
+			this(CassandraSinkBaseConfig.newBuilder().build(), failureHandler);
 		}
 
-		void setResultFuture(ResultSetFuture result) {
-			Preconditions.checkNotNull(result);
-			this.result = result;
+		TestCassandraSink(CassandraSinkBaseConfig config, CassandraFailureHandler failureHandler) {
+			super(builder, config, failureHandler);
 		}
 
 		@Override
 		public ListenableFuture<ResultSet> send(String value) {
-			return result;
+			return resultSetFutures.poll();
+		}
+
+		void enqueueCompletableFuture(CompletableFuture<ResultSet> completableFuture) {
+			Preconditions.checkNotNull(completableFuture);
+			resultSetFutures.offer(ResultSetFutures.fromCompletableFuture(completableFuture));
 		}
 	}
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services