You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/12 21:43:23 UTC

[3/5] flink git commit: [FLINK-5101] Track pending records in CassandraSinkBase

[FLINK-5101] Track pending records in CassandraSinkBase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/948bb9f6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/948bb9f6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/948bb9f6

Branch: refs/heads/master
Commit: 948bb9f674a9c4cf95491f3d9a92b38eed6b64e8
Parents: ef751b2
Author: zentol <ch...@apache.org>
Authored: Wed Nov 23 16:59:22 2016 +0100
Committer: zentol <ch...@apache.org>
Committed: Fri May 12 20:20:53 2017 +0200

----------------------------------------------------------------------
 .../connectors/cassandra/CassandraPojoSink.java |  7 +--
 .../connectors/cassandra/CassandraSinkBase.java | 56 +++++++++++++++-----
 2 files changed, 47 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/948bb9f6/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
index 650c481..9cfb2f8 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.cassandra;
 
+import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.mapping.Mapper;
 import com.datastax.driver.mapping.MappingManager;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -31,7 +32,7 @@ import org.apache.flink.configuration.Configuration;
  *
  * @param <IN> Type of the elements emitted by this sink
  */
-public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, Void> {
+public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, ResultSet> {
 
 	private static final long serialVersionUID = 1L;
 
@@ -61,7 +62,7 @@ public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, Void> {
 	}
 
 	@Override
-	public ListenableFuture<Void> send(IN value) {
-		return mapper.saveAsync(value);
+	public ListenableFuture<ResultSet> send(IN value) {
+		return session.executeAsync(mapper.saveQuery(value));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/948bb9f6/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
index 49b1efa..b281525 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * CassandraSinkBase is the common abstract class of {@link CassandraPojoSink} and {@link CassandraTupleSink}.
@@ -40,11 +41,13 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
 	protected transient Cluster cluster;
 	protected transient Session session;
 
-	protected transient Throwable exception = null;
+	protected transient volatile Throwable exception;
 	protected transient FutureCallback<V> callback;
 
 	private final ClusterBuilder builder;
 
+	private final AtomicInteger updatesPending = new AtomicInteger();
+
 	protected CassandraSinkBase(ClusterBuilder builder) {
 		this.builder = builder;
 		ClosureCleaner.clean(builder, true);
@@ -55,11 +58,24 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
 		this.callback = new FutureCallback<V>() {
 			@Override
 			public void onSuccess(V ignored) {
+				int pending = updatesPending.decrementAndGet();
+				if (pending == 0) {
+					synchronized (updatesPending) {
+						updatesPending.notifyAll();
+					}
+				}
 			}
 
 			@Override
 			public void onFailure(Throwable t) {
+				int pending = updatesPending.decrementAndGet();
+				if (pending == 0) {
+					synchronized (updatesPending) {
+						updatesPending.notifyAll();
+					}
+				}
 				exception = t;
+				
 				LOG.error("Error while sending value.", t);
 			}
 		};
@@ -70,29 +86,43 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
 	@Override
 	public void invoke(IN value) throws Exception {
 		if (exception != null) {
-			throw new IOException("invoke() failed", exception);
+			throw new IOException("Error while sending value.", exception);
 		}
 		ListenableFuture<V> result = send(value);
+		updatesPending.incrementAndGet();
 		Futures.addCallback(result, callback);
 	}
 
 	public abstract ListenableFuture<V> send(IN value);
 
 	@Override
-	public void close() {
+	public void close() throws Exception {
 		try {
-			if (session != null) {
-				session.close();
+			if (exception != null) {
+				throw new IOException("Error while sending value.", exception);
 			}
-		} catch (Exception e) {
-			LOG.error("Error while closing session.", e);
-		}
-		try {
-			if (cluster != null) {
-				cluster.close();
+
+			while (updatesPending.get() > 0) {
+				synchronized (updatesPending) {
+					updatesPending.wait();
+				}
+			}
+			
+		} finally {
+			try {
+				if (session != null) {
+					session.close();
+				}
+			} catch (Exception e) {
+				LOG.error("Error while closing session.", e);
+			}
+			try {
+				if (cluster != null) {
+					cluster.close();
+				}
+			} catch (Exception e) {
+				LOG.error("Error while closing cluster.", e);
 			}
-		} catch (Exception e) {
-			LOG.error("Error while closing cluster.", e);
 		}
 	}
 }