You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/12 21:43:23 UTC
[3/5] flink git commit: [FLINK-5101] Track pending records in
CassandraSinkBase
[FLINK-5101] Track pending records in CassandraSinkBase
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/948bb9f6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/948bb9f6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/948bb9f6
Branch: refs/heads/master
Commit: 948bb9f674a9c4cf95491f3d9a92b38eed6b64e8
Parents: ef751b2
Author: zentol <ch...@apache.org>
Authored: Wed Nov 23 16:59:22 2016 +0100
Committer: zentol <ch...@apache.org>
Committed: Fri May 12 20:20:53 2017 +0200
----------------------------------------------------------------------
.../connectors/cassandra/CassandraPojoSink.java | 7 +--
.../connectors/cassandra/CassandraSinkBase.java | 56 +++++++++++++++-----
2 files changed, 47 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/948bb9f6/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
index 650c481..9cfb2f8 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
@@ -17,6 +17,7 @@
package org.apache.flink.streaming.connectors.cassandra;
+import com.datastax.driver.core.ResultSet;
import com.datastax.driver.mapping.Mapper;
import com.datastax.driver.mapping.MappingManager;
import com.google.common.util.concurrent.ListenableFuture;
@@ -31,7 +32,7 @@ import org.apache.flink.configuration.Configuration;
*
* @param <IN> Type of the elements emitted by this sink
*/
-public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, Void> {
+public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, ResultSet> {
private static final long serialVersionUID = 1L;
@@ -61,7 +62,7 @@ public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, Void> {
}
@Override
- public ListenableFuture<Void> send(IN value) {
- return mapper.saveAsync(value);
+ public ListenableFuture<ResultSet> send(IN value) {
+ return session.executeAsync(mapper.saveQuery(value));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/948bb9f6/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
index 49b1efa..b281525 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* CassandraSinkBase is the common abstract class of {@link CassandraPojoSink} and {@link CassandraTupleSink}.
@@ -40,11 +41,13 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
protected transient Cluster cluster;
protected transient Session session;
- protected transient Throwable exception = null;
+ protected transient volatile Throwable exception;
protected transient FutureCallback<V> callback;
private final ClusterBuilder builder;
+ private final AtomicInteger updatesPending = new AtomicInteger();
+
protected CassandraSinkBase(ClusterBuilder builder) {
this.builder = builder;
ClosureCleaner.clean(builder, true);
@@ -55,11 +58,24 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
this.callback = new FutureCallback<V>() {
@Override
public void onSuccess(V ignored) {
+ int pending = updatesPending.decrementAndGet();
+ if (pending == 0) {
+ synchronized (updatesPending) {
+ updatesPending.notifyAll();
+ }
+ }
}
@Override
public void onFailure(Throwable t) {
+ int pending = updatesPending.decrementAndGet();
+ if (pending == 0) {
+ synchronized (updatesPending) {
+ updatesPending.notifyAll();
+ }
+ }
exception = t;
+
LOG.error("Error while sending value.", t);
}
};
@@ -70,29 +86,43 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
@Override
public void invoke(IN value) throws Exception {
if (exception != null) {
- throw new IOException("invoke() failed", exception);
+ throw new IOException("Error while sending value.", exception);
}
ListenableFuture<V> result = send(value);
+ updatesPending.incrementAndGet();
Futures.addCallback(result, callback);
}
public abstract ListenableFuture<V> send(IN value);
@Override
- public void close() {
+ public void close() throws Exception {
try {
- if (session != null) {
- session.close();
+ if (exception != null) {
+ throw new IOException("Error while sending value.", exception);
}
- } catch (Exception e) {
- LOG.error("Error while closing session.", e);
- }
- try {
- if (cluster != null) {
- cluster.close();
+
+ while (updatesPending.get() > 0) {
+ synchronized (updatesPending) {
+ updatesPending.wait();
+ }
+ }
+
+ } finally {
+ try {
+ if (session != null) {
+ session.close();
+ }
+ } catch (Exception e) {
+ LOG.error("Error while closing session.", e);
+ }
+ try {
+ if (cluster != null) {
+ cluster.close();
+ }
+ } catch (Exception e) {
+ LOG.error("Error while closing cluster.", e);
}
- } catch (Exception e) {
- LOG.error("Error while closing cluster.", e);
}
}
}