You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by az...@apache.org on 2019/11/29 18:53:29 UTC

[flink] branch release-1.9 updated: [FLINK-14976][cassandra] Release semaphore on all Throwable's in send()

This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 6039e11  [FLINK-14976][cassandra] Release semaphore on all Throwable's in send()
6039e11 is described below

commit 6039e11c1cad20fe3468715ff594a49cbdc8d95e
Author: Mads Chr. Olesen <mc...@trackunit.com>
AuthorDate: Wed Nov 27 14:56:16 2019 +0100

    [FLINK-14976][cassandra] Release semaphore on all Throwable's in send()
---
 .../connectors/cassandra/CassandraSinkBase.java    |  2 +-
 .../cassandra/CassandraSinkBaseTest.java           | 51 ++++++++++++++--------
 2 files changed, 34 insertions(+), 19 deletions(-)

diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
index 0e7eb6f..76ed9c7 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
@@ -132,7 +132,7 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> impl
 		final ListenableFuture<V> result;
 		try {
 			result = send(value);
-		} catch (Exception e) {
+		} catch (Throwable e) {
 			semaphore.release();
 			throw e;
 		}
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
index 3ce9742..5c0a431 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.util.Preconditions;
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Session;
-import com.datastax.driver.core.exceptions.InvalidQueryException;
 import com.datastax.driver.core.exceptions.NoHostAvailableException;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.junit.Assert;
@@ -42,9 +41,13 @@ import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
 
 import static org.apache.flink.util.ExceptionUtils.findSerializedThrowable;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.number.OrderingComparison.greaterThan;
+import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.powermock.api.mockito.PowerMockito.when;
 
@@ -81,7 +84,7 @@ public class CassandraSinkBaseTest {
 			casSinkFunc.enqueueCompletableFuture(CompletableFuture.completedFuture(null));
 
 			final int originalPermits = casSinkFunc.getAvailablePermits();
-			Assert.assertThat(originalPermits, greaterThan(0));
+			assertThat(originalPermits, greaterThan(0));
 			Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
 
 			casSinkFunc.invoke("hello");
@@ -277,21 +280,29 @@ public class CassandraSinkBaseTest {
 	}
 
 	@Test(timeout = DEFAULT_TEST_TIMEOUT)
-	public void testReleaseOnSendException() throws Exception {
+	public void testReleaseOnThrowingSend() throws Exception {
 		final CassandraSinkBaseConfig config = CassandraSinkBaseConfig.newBuilder()
 			.setMaxConcurrentRequests(1)
 			.build();
 
-		try (TestCassandraSink testCassandraSink = createOpenedSendExceptionTestCassandraSink(config)) {
-			Assert.assertEquals(1, testCassandraSink.getAvailablePermits());
-			Assert.assertEquals(0, testCassandraSink.getAcquiredPermits());
+		Function<String, ListenableFuture<ResultSet>> failingSendFunction = ignoredMessage -> {
+			throwCheckedAsUnchecked(new Throwable("expected"));
+			//noinspection ReturnOfNull
+			return null;
+		};
 
+		try (TestCassandraSink testCassandraSink = new MockCassandraSink(config, failingSendFunction)) {
+			testCassandraSink.open(new Configuration());
+			assertThat(testCassandraSink.getAvailablePermits(), is(1));
+			assertThat(testCassandraSink.getAcquiredPermits(), is(0));
+
+			//noinspection OverlyBroadCatchBlock,NestedTryStatement
 			try {
-				testCassandraSink.invoke("N/A");
-			} catch (Exception e) {
-				Assert.assertTrue(e instanceof InvalidQueryException);
-				Assert.assertEquals(1, testCassandraSink.getAvailablePermits());
-				Assert.assertEquals(0, testCassandraSink.getAcquiredPermits());
+				testCassandraSink.invoke("none");
+			} catch (Throwable e) {
+				assertThat(e, instanceOf(Throwable.class));
+				assertThat(testCassandraSink.getAvailablePermits(), is(1));
+				assertThat(testCassandraSink.getAcquiredPermits(), is(0));
 			}
 		}
 	}
@@ -355,10 +366,9 @@ public class CassandraSinkBaseTest {
 		return testHarness;
 	}
 
-	private TestCassandraSink createOpenedSendExceptionTestCassandraSink(CassandraSinkBaseConfig config) {
-		final TestCassandraSink testCassandraSink = new SendExceptionTestCassandraSink(config);
-		testCassandraSink.open(new Configuration());
-		return testCassandraSink;
+	private static <T extends Throwable> void throwCheckedAsUnchecked(Throwable ex) throws T {
+		//noinspection unchecked
+		throw (T) ex;
 	}
 
 	private static class TestCassandraSink extends CassandraSinkBase<String, ResultSet> implements AutoCloseable {
@@ -410,14 +420,19 @@ public class CassandraSinkBaseTest {
 		}
 	}
 
-	private static class SendExceptionTestCassandraSink extends TestCassandraSink {
-		SendExceptionTestCassandraSink(CassandraSinkBaseConfig config) {
+	private static class MockCassandraSink extends TestCassandraSink {
+		private static final long serialVersionUID = -3363195776692829911L;
+
+		private final Function<String, ListenableFuture<ResultSet>> sendFunction;
+
+		MockCassandraSink(CassandraSinkBaseConfig config, Function<String, ListenableFuture<ResultSet>> sendFunction) {
 			super(config, new NoOpCassandraFailureHandler());
+			this.sendFunction = sendFunction;
 		}
 
 		@Override
 		public ListenableFuture<ResultSet> send(String value) {
-			throw new InvalidQueryException("For test purposes");
+			return this.sendFunction.apply(value);
 		}
 	}
 }