You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/12/20 09:56:45 UTC
[flink] 01/03: [FLINK-9083][cassandra] Restructure CassandraSinkBase
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit a53b85e73b44b999d28e6b2986210a644b8408ea
Author: jparkie <pa...@gmail.com>
AuthorDate: Wed Dec 19 14:34:43 2018 +0100
[FLINK-9083][cassandra] Restructure CassandraSinkBase
---
.../connectors/cassandra/CassandraSinkBase.java | 33 +++++++++++-----------
1 file changed, 16 insertions(+), 17 deletions(-)
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
index d24347e..44e3ff7 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
@@ -45,16 +45,15 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> implements CheckpointedFunction {
protected final Logger log = LoggerFactory.getLogger(getClass());
+
protected transient Cluster cluster;
protected transient Session session;
protected transient volatile Throwable exception;
protected transient FutureCallback<V> callback;
-
- private final ClusterBuilder builder;
-
private final AtomicInteger updatesPending = new AtomicInteger();
+ private final ClusterBuilder builder;
private final CassandraFailureHandler failureHandler;
CassandraSinkBase(ClusterBuilder builder, CassandraFailureHandler failureHandler) {
@@ -93,20 +92,6 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> impl
this.session = createSession();
}
- protected Session createSession() {
- return cluster.connect();
- }
-
- @Override
- public void invoke(IN value) throws Exception {
- checkAsyncErrors();
- ListenableFuture<V> result = send(value);
- updatesPending.incrementAndGet();
- Futures.addCallback(result, callback);
- }
-
- public abstract ListenableFuture<V> send(IN value);
-
@Override
public void close() throws Exception {
try {
@@ -142,6 +127,20 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> impl
checkAsyncErrors();
}
+ @Override
+ public void invoke(IN value) throws Exception {
+ checkAsyncErrors();
+ ListenableFuture<V> result = send(value);
+ updatesPending.incrementAndGet();
+ Futures.addCallback(result, callback);
+ }
+
+ protected Session createSession() {
+ return cluster.connect();
+ }
+
+ public abstract ListenableFuture<V> send(IN value);
+
private void waitForPendingUpdates() throws InterruptedException {
synchronized (updatesPending) {
while (updatesPending.get() > 0) {