You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/10/09 14:21:05 UTC

[GitHub] dawidwys closed pull request #6732: [FLINK-10310] Cassandra Sink - Handling failing requests.

dawidwys closed pull request #6732: [FLINK-10310] Cassandra Sink - Handling failing requests.
URL: https://github.com/apache/flink/pull/6732
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/connectors/cassandra.md b/docs/dev/connectors/cassandra.md
index 6e1470a4367a..c4c3e3a47517 100644
--- a/docs/dev/connectors/cassandra.md
+++ b/docs/dev/connectors/cassandra.md
@@ -75,7 +75,10 @@ The following configuration methods can be used:
 5. _enableWriteAheadLog([CheckpointCommitter committer])_
     * An __optional__ setting
     * Allows exactly-once processing for non-deterministic algorithms.
-6. _build()_
+6. _setFailureHandler([CassandraFailureHandler failureHandler])_
+    * An __optional__ setting
+    * Sets the custom failur handler.
+7. _build()_
     * Finalizes the configuration and constructs the CassandraSink instance.
 
 ### Write-ahead Log
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java
index fda739e2aeb4..41826f58203f 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java
@@ -32,8 +32,8 @@
 	private final String insertQuery;
 	private transient PreparedStatement ps;
 
-	public AbstractCassandraTupleSink(String insertQuery, ClusterBuilder builder) {
-		super(builder);
+	public AbstractCassandraTupleSink(String insertQuery, ClusterBuilder builder, CassandraFailureHandler failureHandler) {
+		super(builder, failureHandler);
 		this.insertQuery = insertQuery;
 	}
 
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraFailureHandler.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraFailureHandler.java
new file mode 100644
index 000000000000..478ba47129e2
--- /dev/null
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraFailureHandler.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * An implementation of {@link CassandraFailureHandler} is provided by the user to define how
+ * {@link Throwable Throwable} should be handled, e.g. dropping them if the failure is only temporary.
+ *
+ * <p>Example:
+ *
+ * <pre>{@code
+ *
+ * 	private static class ExampleFailureHandler implements CassandraFailureHandler {
+ *
+ * 		@Override
+ * 		void onFailure(Throwable failure) throws IOException {
+ * 			if (ExceptionUtils.containsThrowable(failure, WriteTimeoutException.class)) {
+ * 				// drop exception
+ * 			} else {
+ * 				// for all other failures, fail the sink;
+ * 				// here the failure is simply rethrown, but users can also choose to throw custom exceptions
+ * 				throw failure;
+ * 			}
+ * 		}
+ * 	}
+ *
+ * }</pre>
+ *
+ * <p>The above example will let the sink ignore the WriteTimeoutException, without failing the sink. For all other
+ * failures, the sink will fail.
+ */
+@PublicEvolving
+public interface CassandraFailureHandler extends Serializable {
+
+	/**
+	 * Handle a failed {@link Throwable}.
+	 *
+	 * @param failure the cause of failure
+	 * @throws IOException if the sink should fail on this failure, the implementation should rethrow the throwable or a custom one
+	 */
+	void onFailure(Throwable failure) throws IOException;
+
+}
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
index da70da174726..cf4432d4b20f 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
@@ -64,7 +64,11 @@ public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder, String keyspac
 	}
 
 	public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder, @Nullable MapperOptions options, String keyspace) {
-		super(builder);
+		this(clazz, builder, options, keyspace, new NoOpCassandraFailureHandler());
+	}
+
+	public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder, @Nullable MapperOptions options, String keyspace, CassandraFailureHandler failureHandler) {
+		super(builder, failureHandler);
 		this.clazz = clazz;
 		this.options = options;
 		this.keyspace = keyspace;
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java
index fbbeb96cccfe..f51506b802cd 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java
@@ -27,7 +27,11 @@
 	private final int rowArity;
 
 	public CassandraRowSink(int rowArity, String insertQuery, ClusterBuilder builder) {
-		super(insertQuery, builder);
+		this(rowArity, insertQuery, builder, new NoOpCassandraFailureHandler());
+	}
+
+	public CassandraRowSink(int rowArity, String insertQuery, ClusterBuilder builder, CassandraFailureHandler failureHandler) {
+		super(insertQuery, builder, failureHandler);
 		this.rowArity = rowArity;
 	}
 
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java
index 1d1b6341dde7..84af78d7a6ab 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java
@@ -27,7 +27,11 @@
  */
 public class CassandraScalaProductSink<IN extends Product> extends AbstractCassandraTupleSink<IN> {
 	public CassandraScalaProductSink(String insertQuery, ClusterBuilder builder) {
-		super(insertQuery, builder);
+		this(insertQuery, builder, new NoOpCassandraFailureHandler());
+	}
+
+	public CassandraScalaProductSink(String insertQuery, ClusterBuilder builder, CassandraFailureHandler failureHandler) {
+		super(insertQuery, builder, failureHandler);
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
index 5d5e9efc36e0..e774cd318930 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
@@ -241,6 +241,7 @@ private CassandraSink(SingleOutputStreamOperator<IN> sink) {
 		protected String query;
 		protected CheckpointCommitter committer;
 		protected boolean isWriteAheadLogEnabled;
+		protected CassandraFailureHandler failureHandler;
 
 		public CassandraSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
 			this.input = input;
@@ -354,6 +355,18 @@ protected Cluster buildCluster(Cluster.Builder builder) {
 			return this;
 		}
 
+		/**
+		 * Sets the failure handler for this sink. The failure handler is used to provide custom error handling.
+		 *
+		 * @param failureHandler CassandraFailureHandler, that handles any Throwable error.
+		 *
+		 * @return this builder
+		 */
+		public CassandraSinkBuilder<IN> setFailureHandler(CassandraFailureHandler failureHandler) {
+			this.failureHandler = failureHandler;
+			return this;
+		}
+
 		/**
 		 * Finalizes the configuration of this sink.
 		 *
@@ -362,6 +375,9 @@ protected Cluster buildCluster(Cluster.Builder builder) {
 		 */
 		public CassandraSink<IN> build() throws Exception {
 			sanityCheck();
+			if (failureHandler == null) {
+				failureHandler = new NoOpCassandraFailureHandler();
+			}
 			return isWriteAheadLogEnabled
 				? createWriteAheadSink()
 				: createSink();
@@ -400,7 +416,7 @@ protected void sanityCheck() {
 
 		@Override
 		public CassandraSink<IN> createSink() throws Exception {
-			return new CassandraSink<>(input.addSink(new CassandraTupleSink<IN>(query, builder)).name("Cassandra Sink"));
+			return new CassandraSink<>(input.addSink(new CassandraTupleSink<IN>(query, builder, failureHandler)).name("Cassandra Sink"));
 		}
 
 		@Override
@@ -432,7 +448,7 @@ protected void sanityCheck() {
 
 		@Override
 		protected CassandraSink<Row> createSink() throws Exception {
-			return new CassandraSink<>(input.addSink(new CassandraRowSink(typeInfo.getArity(), query, builder)).name("Cassandra Sink"));
+			return new CassandraSink<>(input.addSink(new CassandraRowSink(typeInfo.getArity(), query, builder, failureHandler)).name("Cassandra Sink"));
 
 		}
 
@@ -463,7 +479,7 @@ protected void sanityCheck() {
 
 		@Override
 		public CassandraSink<IN> createSink() throws Exception {
-			return new CassandraSink<>(input.addSink(new CassandraPojoSink<>(typeInfo.getTypeClass(), builder, mapperOptions, keyspace)).name("Cassandra Sink"));
+			return new CassandraSink<>(input.addSink(new CassandraPojoSink<>(typeInfo.getTypeClass(), builder, mapperOptions, keyspace, failureHandler)).name("Cassandra Sink"));
 		}
 
 		@Override
@@ -495,7 +511,7 @@ protected void sanityCheck() {
 
 		@Override
 		public CassandraSink<IN> createSink() throws Exception {
-			return new CassandraSink<>(input.addSink(new CassandraScalaProductSink<IN>(query, builder)).name("Cassandra Sink"));
+			return new CassandraSink<>(input.addSink(new CassandraScalaProductSink<IN>(query, builder, failureHandler)).name("Cassandra Sink"));
 		}
 
 		@Override
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
index d2ba8e53913b..d24347ec89d7 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
@@ -33,11 +33,13 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * CassandraSinkBase is the common abstract class of {@link CassandraPojoSink} and {@link CassandraTupleSink}.
+ * CassandraSinkBase is the common abstract class of {@link CassandraPojoSink} and {@link
+ * CassandraTupleSink}.
  *
  * @param <IN> Type of the elements emitted by this sink
  */
@@ -53,8 +55,11 @@
 
 	private final AtomicInteger updatesPending = new AtomicInteger();
 
-	CassandraSinkBase(ClusterBuilder builder) {
+	private final CassandraFailureHandler failureHandler;
+
+	CassandraSinkBase(ClusterBuilder builder, CassandraFailureHandler failureHandler) {
 		this.builder = builder;
+		this.failureHandler = checkNotNull(failureHandler);
 		ClosureCleaner.clean(builder, true);
 	}
 
@@ -150,7 +155,7 @@ private void checkAsyncErrors() throws Exception {
 		if (error != null) {
 			// prevent throwing duplicated error
 			exception = null;
-			throw new IOException("Error while sending value.", error);
+			failureHandler.onFailure(error);
 		}
 	}
 
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
index a7ec1df5419d..4581aa279041 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.streaming.connectors.cassandra;
 
-import org.apache.flink.api.java.tuple.Tuple;
+	import org.apache.flink.api.java.tuple.Tuple;
 
 /**
  * Sink to write Flink {@link Tuple}s into a Cassandra cluster.
@@ -26,7 +26,11 @@
  */
 public class CassandraTupleSink<IN extends Tuple> extends AbstractCassandraTupleSink<IN> {
 	public CassandraTupleSink(String insertQuery, ClusterBuilder builder) {
-		super(insertQuery, builder);
+		this(insertQuery, builder, new NoOpCassandraFailureHandler());
+	}
+
+	public CassandraTupleSink(String insertQuery, ClusterBuilder builder, CassandraFailureHandler failureHandler) {
+		super(insertQuery, builder, failureHandler);
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/NoOpCassandraFailureHandler.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/NoOpCassandraFailureHandler.java
new file mode 100644
index 000000000000..780bdfc2aa5a
--- /dev/null
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/NoOpCassandraFailureHandler.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.IOException;
+
+/**
+ * A {@link CassandraFailureHandler} that simply fails the sink on any failures.
+ */
+@Internal
+public class NoOpCassandraFailureHandler implements CassandraFailureHandler {
+
+	private static final long serialVersionUID = 737941343410827885L;
+
+	@Override
+	public void onFailure(Throwable failure) throws IOException {
+		// simply fail the sink
+		throw new IOException("Error while sending value.", failure);
+	}
+
+}
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
index 8c2b3679c957..8c66882611ae 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
@@ -54,7 +54,7 @@ protected Cluster buildCluster(Cluster.Builder builder) {
 					.withoutJMXReporting()
 					.withoutMetrics().build();
 			}
-		}) {
+		}, new NoOpCassandraFailureHandler()) {
 			@Override
 			public ListenableFuture send(Object value) {
 				return null;
@@ -116,6 +116,20 @@ public void testThrowErrorOnInvoke() throws Exception {
 		}
 	}
 
+	@Test(timeout = 5000)
+	public void testIgnoreError() throws Exception {
+		Exception cause = new RuntimeException();
+		CassandraFailureHandler failureHandler = failure -> Assert.assertEquals(cause, failure);
+		TestCassandraSink casSinkFunc = new TestCassandraSink(failureHandler);
+
+		casSinkFunc.open(new Configuration());
+
+		casSinkFunc.setResultFuture(ResultSetFutures.fromCompletableFuture(FutureUtils.getFailedFuture(cause)));
+
+		casSinkFunc.invoke("hello");
+		casSinkFunc.invoke("world");
+	}
+
 	@Test(timeout = 5000)
 	public void testThrowErrorOnSnapshot() throws Exception {
 		TestCassandraSink casSinkFunc = new TestCassandraSink();
@@ -232,7 +246,11 @@ protected Cluster buildCluster(Cluster.Builder builder) {
 		private ResultSetFuture result;
 
 		TestCassandraSink() {
-			super(builder);
+			super(builder, new NoOpCassandraFailureHandler());
+		}
+
+		TestCassandraSink(CassandraFailureHandler failureHandler) {
+			super(builder, failureHandler);
 		}
 
 		void setResultFuture(ResultSetFuture result) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services