You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/11/14 13:45:28 UTC

flink git commit: [hotfix] [cassandra] Fix CassandraSinkBase serialization issue

Repository: flink
Updated Branches:
  refs/heads/master 62523acbe -> 5fa389014


[hotfix] [cassandra] Fix CassandraSinkBase serialization issue


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5fa38901
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5fa38901
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5fa38901

Branch: refs/heads/master
Commit: 5fa389014a3ce40534703c8a5731c8a9a955058a
Parents: 62523ac
Author: zentol <ch...@apache.org>
Authored: Mon Nov 14 14:45:16 2016 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Nov 14 14:45:16 2016 +0100

----------------------------------------------------------------------
 .../flink/streaming/connectors/cassandra/CassandraSinkBase.java   | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5fa38901/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
index 9c4c430..713a286 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
@@ -42,7 +42,7 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
 	protected transient Cluster cluster;
 	protected transient Session session;
 
-	protected transient final AtomicReference<Throwable> exception = new AtomicReference<>();
+	protected transient AtomicReference<Throwable> exception;
 	protected transient FutureCallback<V> callback;
 
 	private final ClusterBuilder builder;
@@ -56,6 +56,7 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
 
 	@Override
 	public void open(Configuration configuration) {
+		this.exception = new AtomicReference<>();
 		this.callback = new FutureCallback<V>() {
 			@Override
 			public void onSuccess(V ignored) {