You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/09/03 09:50:46 UTC

[flink] branch release-1.8 updated: [FLINK-13059][cassandra] Release semaphore on exception in send()

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.8 by this push:
     new b7ce7b8  [FLINK-13059][cassandra] Release semaphore on exception in send()
b7ce7b8 is described below

commit b7ce7b8ff14807e4981591a7e26c99d5051d529f
Author: Mads Chr. Olesen <mc...@trackunit.com>
AuthorDate: Tue Sep 3 11:49:10 2019 +0200

    [FLINK-13059][cassandra] Release semaphore on exception in send()
---
 .../connectors/cassandra/CassandraSinkBase.java    | 21 +++++++----
 .../cassandra/CassandraSinkBaseTest.java           | 42 ++++++++++++++++++++--
 2 files changed, 54 insertions(+), 9 deletions(-)

diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
index ede5586..0e7eb6f 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
@@ -128,8 +128,14 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> impl
 	@Override
 	public void invoke(IN value) throws Exception {
 		checkAsyncErrors();
-		tryAcquire();
-		final ListenableFuture<V> result = send(value);
+		tryAcquire(1);
+		final ListenableFuture<V> result;
+		try {
+			result = send(value);
+		} catch (Exception e) {
+			semaphore.release();
+			throw e;
+		}
 		Futures.addCallback(result, callback);
 	}
 
@@ -139,11 +145,12 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> impl
 
 	public abstract ListenableFuture<V> send(IN value);
 
-	private void tryAcquire() throws InterruptedException, TimeoutException {
-		if (!semaphore.tryAcquire(config.getMaxConcurrentRequestsTimeout().toMillis(), TimeUnit.MILLISECONDS)) {
+	private void tryAcquire(int permits) throws InterruptedException, TimeoutException {
+		if (!semaphore.tryAcquire(permits, config.getMaxConcurrentRequestsTimeout().toMillis(), TimeUnit.MILLISECONDS)) {
 			throw new TimeoutException(
 				String.format(
-					"Failed to acquire 1 permit of %d to send value in %s.",
+					"Failed to acquire %d out of %d permits to send value in %s.",
+					permits,
 					config.getMaxConcurrentRequests(),
 					config.getMaxConcurrentRequestsTimeout()
 				)
@@ -158,8 +165,8 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> impl
 		}
 	}
 
-	private void flush() {
-		semaphore.acquireUninterruptibly(config.getMaxConcurrentRequests());
+	private void flush() throws InterruptedException, TimeoutException {
+		tryAcquire(config.getMaxConcurrentRequests());
 		semaphore.release(config.getMaxConcurrentRequests());
 	}
 
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
index 2b705a5..b4406ab 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.util.Preconditions;
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.InvalidQueryException;
 import com.datastax.driver.core.exceptions.NoHostAvailableException;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.junit.Assert;
@@ -180,7 +181,7 @@ public class CassandraSinkBaseTest {
 				}
 			};
 			t.start();
-			while (t.getState() != Thread.State.WAITING) {
+			while (t.getState() != Thread.State.TIMED_WAITING) {
 				Thread.sleep(5);
 			}
 
@@ -212,7 +213,7 @@ public class CassandraSinkBaseTest {
 				}
 			};
 			t.start();
-			while (t.getState() != Thread.State.WAITING) {
+			while (t.getState() != Thread.State.TIMED_WAITING) {
 				Thread.sleep(5);
 			}
 
@@ -273,6 +274,26 @@ public class CassandraSinkBaseTest {
 	}
 
 	@Test(timeout = DEFAULT_TEST_TIMEOUT)
+	public void testReleaseOnSendException() throws Exception {
+		final CassandraSinkBaseConfig config = CassandraSinkBaseConfig.newBuilder()
+			.setMaxConcurrentRequests(1)
+			.build();
+
+		try (TestCassandraSink testCassandraSink = createOpenedSendExceptionTestCassandraSink(config)) {
+			Assert.assertEquals(1, testCassandraSink.getAvailablePermits());
+			Assert.assertEquals(0, testCassandraSink.getAcquiredPermits());
+
+			try {
+				testCassandraSink.invoke("N/A");
+			} catch (Exception e) {
+				Assert.assertTrue(e instanceof InvalidQueryException);
+				Assert.assertEquals(1, testCassandraSink.getAvailablePermits());
+				Assert.assertEquals(0, testCassandraSink.getAcquiredPermits());
+			}
+		}
+	}
+
+	@Test(timeout = DEFAULT_TEST_TIMEOUT)
 	public void testTimeoutExceptionOnInvoke() throws Exception {
 		final CassandraSinkBaseConfig config = CassandraSinkBaseConfig.newBuilder()
 			.setMaxConcurrentRequests(1)
@@ -331,6 +352,12 @@ public class CassandraSinkBaseTest {
 		return testHarness;
 	}
 
+	private TestCassandraSink createOpenedSendExceptionTestCassandraSink(CassandraSinkBaseConfig config) {
+		final TestCassandraSink testCassandraSink = new SendExceptionTestCassandraSink(config);
+		testCassandraSink.open(new Configuration());
+		return testCassandraSink;
+	}
+
 	private static class TestCassandraSink extends CassandraSinkBase<String, ResultSet> implements AutoCloseable {
 
 		private static final ClusterBuilder builder;
@@ -379,4 +406,15 @@ public class CassandraSinkBaseTest {
 			resultSetFutures.offer(ResultSetFutures.fromCompletableFuture(completableFuture));
 		}
 	}
+
+	private static class SendExceptionTestCassandraSink extends TestCassandraSink {
+		SendExceptionTestCassandraSink(CassandraSinkBaseConfig config) {
+			super(config, new NoOpCassandraFailureHandler());
+		}
+
+		@Override
+		public ListenableFuture<ResultSet> send(String value) {
+			throw new InvalidQueryException("For test purposes");
+		}
+	}
 }