You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/12/20 09:56:45 UTC

[flink] 01/03: [FLINK-9083][cassandra] Restructure CassandraSinkBase

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a53b85e73b44b999d28e6b2986210a644b8408ea
Author: jparkie <pa...@gmail.com>
AuthorDate: Wed Dec 19 14:34:43 2018 +0100

    [FLINK-9083][cassandra] Restructure CassandraSinkBase
---
 .../connectors/cassandra/CassandraSinkBase.java    | 33 +++++++++++-----------
 1 file changed, 16 insertions(+), 17 deletions(-)

diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
index d24347e..44e3ff7 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
@@ -45,16 +45,15 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> implements CheckpointedFunction {
 	protected final Logger log = LoggerFactory.getLogger(getClass());
+
 	protected transient Cluster cluster;
 	protected transient Session session;
 
 	protected transient volatile Throwable exception;
 	protected transient FutureCallback<V> callback;
-
-	private final ClusterBuilder builder;
-
 	private final AtomicInteger updatesPending = new AtomicInteger();
 
+	private final ClusterBuilder builder;
 	private final CassandraFailureHandler failureHandler;
 
 	CassandraSinkBase(ClusterBuilder builder, CassandraFailureHandler failureHandler) {
@@ -93,20 +92,6 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> impl
 		this.session = createSession();
 	}
 
-	protected Session createSession() {
-		return cluster.connect();
-	}
-
-	@Override
-	public void invoke(IN value) throws Exception {
-		checkAsyncErrors();
-		ListenableFuture<V> result = send(value);
-		updatesPending.incrementAndGet();
-		Futures.addCallback(result, callback);
-	}
-
-	public abstract ListenableFuture<V> send(IN value);
-
 	@Override
 	public void close() throws Exception {
 		try {
@@ -142,6 +127,20 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> impl
 		checkAsyncErrors();
 	}
 
+	@Override
+	public void invoke(IN value) throws Exception {
+		checkAsyncErrors();
+		ListenableFuture<V> result = send(value);
+		updatesPending.incrementAndGet();
+		Futures.addCallback(result, callback);
+	}
+
+	protected Session createSession() {
+		return cluster.connect();
+	}
+
+	public abstract ListenableFuture<V> send(IN value);
+
 	private void waitForPendingUpdates() throws InterruptedException {
 		synchronized (updatesPending) {
 			while (updatesPending.get() > 0) {