You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/09/03 09:50:46 UTC
[flink] branch release-1.8 updated: [FLINK-13059][cassandra]
Release semaphore on exception in send()
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.8 by this push:
new b7ce7b8 [FLINK-13059][cassandra] Release semaphore on exception in send()
b7ce7b8 is described below
commit b7ce7b8ff14807e4981591a7e26c99d5051d529f
Author: Mads Chr. Olesen <mc...@trackunit.com>
AuthorDate: Tue Sep 3 11:49:10 2019 +0200
[FLINK-13059][cassandra] Release semaphore on exception in send()
---
.../connectors/cassandra/CassandraSinkBase.java | 21 +++++++----
.../cassandra/CassandraSinkBaseTest.java | 42 ++++++++++++++++++++--
2 files changed, 54 insertions(+), 9 deletions(-)
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
index ede5586..0e7eb6f 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
@@ -128,8 +128,14 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> impl
@Override
public void invoke(IN value) throws Exception {
checkAsyncErrors();
- tryAcquire();
- final ListenableFuture<V> result = send(value);
+ tryAcquire(1);
+ final ListenableFuture<V> result;
+ try {
+ result = send(value);
+ } catch (Exception e) {
+ semaphore.release();
+ throw e;
+ }
Futures.addCallback(result, callback);
}
@@ -139,11 +145,12 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> impl
public abstract ListenableFuture<V> send(IN value);
- private void tryAcquire() throws InterruptedException, TimeoutException {
- if (!semaphore.tryAcquire(config.getMaxConcurrentRequestsTimeout().toMillis(), TimeUnit.MILLISECONDS)) {
+ private void tryAcquire(int permits) throws InterruptedException, TimeoutException {
+ if (!semaphore.tryAcquire(permits, config.getMaxConcurrentRequestsTimeout().toMillis(), TimeUnit.MILLISECONDS)) {
throw new TimeoutException(
String.format(
- "Failed to acquire 1 permit of %d to send value in %s.",
+ "Failed to acquire %d out of %d permits to send value in %s.",
+ permits,
config.getMaxConcurrentRequests(),
config.getMaxConcurrentRequestsTimeout()
)
@@ -158,8 +165,8 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> impl
}
}
- private void flush() {
- semaphore.acquireUninterruptibly(config.getMaxConcurrentRequests());
+ private void flush() throws InterruptedException, TimeoutException {
+ tryAcquire(config.getMaxConcurrentRequests());
semaphore.release(config.getMaxConcurrentRequests());
}
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
index 2b705a5..b4406ab 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.util.Preconditions;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.InvalidQueryException;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.google.common.util.concurrent.ListenableFuture;
import org.junit.Assert;
@@ -180,7 +181,7 @@ public class CassandraSinkBaseTest {
}
};
t.start();
- while (t.getState() != Thread.State.WAITING) {
+ while (t.getState() != Thread.State.TIMED_WAITING) {
Thread.sleep(5);
}
@@ -212,7 +213,7 @@ public class CassandraSinkBaseTest {
}
};
t.start();
- while (t.getState() != Thread.State.WAITING) {
+ while (t.getState() != Thread.State.TIMED_WAITING) {
Thread.sleep(5);
}
@@ -273,6 +274,26 @@ public class CassandraSinkBaseTest {
}
@Test(timeout = DEFAULT_TEST_TIMEOUT)
+ public void testReleaseOnSendException() throws Exception {
+ final CassandraSinkBaseConfig config = CassandraSinkBaseConfig.newBuilder()
+ .setMaxConcurrentRequests(1)
+ .build();
+
+ try (TestCassandraSink testCassandraSink = createOpenedSendExceptionTestCassandraSink(config)) {
+ Assert.assertEquals(1, testCassandraSink.getAvailablePermits());
+ Assert.assertEquals(0, testCassandraSink.getAcquiredPermits());
+
+ try {
+ testCassandraSink.invoke("N/A");
+ } catch (Exception e) {
+ Assert.assertTrue(e instanceof InvalidQueryException);
+ Assert.assertEquals(1, testCassandraSink.getAvailablePermits());
+ Assert.assertEquals(0, testCassandraSink.getAcquiredPermits());
+ }
+ }
+ }
+
+ @Test(timeout = DEFAULT_TEST_TIMEOUT)
public void testTimeoutExceptionOnInvoke() throws Exception {
final CassandraSinkBaseConfig config = CassandraSinkBaseConfig.newBuilder()
.setMaxConcurrentRequests(1)
@@ -331,6 +352,12 @@ public class CassandraSinkBaseTest {
return testHarness;
}
+ private TestCassandraSink createOpenedSendExceptionTestCassandraSink(CassandraSinkBaseConfig config) {
+ final TestCassandraSink testCassandraSink = new SendExceptionTestCassandraSink(config);
+ testCassandraSink.open(new Configuration());
+ return testCassandraSink;
+ }
+
private static class TestCassandraSink extends CassandraSinkBase<String, ResultSet> implements AutoCloseable {
private static final ClusterBuilder builder;
@@ -379,4 +406,15 @@ public class CassandraSinkBaseTest {
resultSetFutures.offer(ResultSetFutures.fromCompletableFuture(completableFuture));
}
}
+
+ private static class SendExceptionTestCassandraSink extends TestCassandraSink {
+ SendExceptionTestCassandraSink(CassandraSinkBaseConfig config) {
+ super(config, new NoOpCassandraFailureHandler());
+ }
+
+ @Override
+ public ListenableFuture<ResultSet> send(String value) {
+ throw new InvalidQueryException("For test purposes");
+ }
+ }
}