You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by az...@apache.org on 2019/11/29 18:53:29 UTC
[flink] branch release-1.9 updated: [FLINK-14976][cassandra]
Release semaphore on all Throwable's in send()
This is an automated email from the ASF dual-hosted git repository.
azagrebin pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new 6039e11 [FLINK-14976][cassandra] Release semaphore on all Throwable's in send()
6039e11 is described below
commit 6039e11c1cad20fe3468715ff594a49cbdc8d95e
Author: Mads Chr. Olesen <mc...@trackunit.com>
AuthorDate: Wed Nov 27 14:56:16 2019 +0100
[FLINK-14976][cassandra] Release semaphore on all Throwable's in send()
---
.../connectors/cassandra/CassandraSinkBase.java | 2 +-
.../cassandra/CassandraSinkBaseTest.java | 51 ++++++++++++++--------
2 files changed, 34 insertions(+), 19 deletions(-)
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
index 0e7eb6f..76ed9c7 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
@@ -132,7 +132,7 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> impl
final ListenableFuture<V> result;
try {
result = send(value);
- } catch (Exception e) {
+ } catch (Throwable e) {
semaphore.release();
throw e;
}
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
index 3ce9742..5c0a431 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.util.Preconditions;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
-import com.datastax.driver.core.exceptions.InvalidQueryException;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.google.common.util.concurrent.ListenableFuture;
import org.junit.Assert;
@@ -42,9 +41,13 @@ import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
import static org.apache.flink.util.ExceptionUtils.findSerializedThrowable;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.number.OrderingComparison.greaterThan;
+import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
import static org.powermock.api.mockito.PowerMockito.when;
@@ -81,7 +84,7 @@ public class CassandraSinkBaseTest {
casSinkFunc.enqueueCompletableFuture(CompletableFuture.completedFuture(null));
final int originalPermits = casSinkFunc.getAvailablePermits();
- Assert.assertThat(originalPermits, greaterThan(0));
+ assertThat(originalPermits, greaterThan(0));
Assert.assertEquals(0, casSinkFunc.getAcquiredPermits());
casSinkFunc.invoke("hello");
@@ -277,21 +280,29 @@ public class CassandraSinkBaseTest {
}
@Test(timeout = DEFAULT_TEST_TIMEOUT)
- public void testReleaseOnSendException() throws Exception {
+ public void testReleaseOnThrowingSend() throws Exception {
final CassandraSinkBaseConfig config = CassandraSinkBaseConfig.newBuilder()
.setMaxConcurrentRequests(1)
.build();
- try (TestCassandraSink testCassandraSink = createOpenedSendExceptionTestCassandraSink(config)) {
- Assert.assertEquals(1, testCassandraSink.getAvailablePermits());
- Assert.assertEquals(0, testCassandraSink.getAcquiredPermits());
+ Function<String, ListenableFuture<ResultSet>> failingSendFunction = ignoredMessage -> {
+ throwCheckedAsUnchecked(new Throwable("expected"));
+ //noinspection ReturnOfNull
+ return null;
+ };
+ try (TestCassandraSink testCassandraSink = new MockCassandraSink(config, failingSendFunction)) {
+ testCassandraSink.open(new Configuration());
+ assertThat(testCassandraSink.getAvailablePermits(), is(1));
+ assertThat(testCassandraSink.getAcquiredPermits(), is(0));
+
+ //noinspection OverlyBroadCatchBlock,NestedTryStatement
try {
- testCassandraSink.invoke("N/A");
- } catch (Exception e) {
- Assert.assertTrue(e instanceof InvalidQueryException);
- Assert.assertEquals(1, testCassandraSink.getAvailablePermits());
- Assert.assertEquals(0, testCassandraSink.getAcquiredPermits());
+ testCassandraSink.invoke("none");
+ } catch (Throwable e) {
+ assertThat(e, instanceOf(Throwable.class));
+ assertThat(testCassandraSink.getAvailablePermits(), is(1));
+ assertThat(testCassandraSink.getAcquiredPermits(), is(0));
}
}
}
@@ -355,10 +366,9 @@ public class CassandraSinkBaseTest {
return testHarness;
}
- private TestCassandraSink createOpenedSendExceptionTestCassandraSink(CassandraSinkBaseConfig config) {
- final TestCassandraSink testCassandraSink = new SendExceptionTestCassandraSink(config);
- testCassandraSink.open(new Configuration());
- return testCassandraSink;
+ private static <T extends Throwable> void throwCheckedAsUnchecked(Throwable ex) throws T {
+ //noinspection unchecked
+ throw (T) ex;
}
private static class TestCassandraSink extends CassandraSinkBase<String, ResultSet> implements AutoCloseable {
@@ -410,14 +420,19 @@ public class CassandraSinkBaseTest {
}
}
- private static class SendExceptionTestCassandraSink extends TestCassandraSink {
- SendExceptionTestCassandraSink(CassandraSinkBaseConfig config) {
+ private static class MockCassandraSink extends TestCassandraSink {
+ private static final long serialVersionUID = -3363195776692829911L;
+
+ private final Function<String, ListenableFuture<ResultSet>> sendFunction;
+
+ MockCassandraSink(CassandraSinkBaseConfig config, Function<String, ListenableFuture<ResultSet>> sendFunction) {
super(config, new NoOpCassandraFailureHandler());
+ this.sendFunction = sendFunction;
}
@Override
public ListenableFuture<ResultSet> send(String value) {
- throw new InvalidQueryException("For test purposes");
+ return this.sendFunction.apply(value);
}
}
}