You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/07/05 11:05:10 UTC

[flink] branch master updated: [FLINK-26793][documentation] Add recommendations about the write timeout to the javadoc of Cassandra sink and output format.

This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 729da7d4592 [FLINK-26793][documentation] Add recommendations about the write timeout to the javadoc of Cassandra sink and output format.
729da7d4592 is described below

commit 729da7d4592664a6e9d73414f1b561366513f5f4
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Mon Jun 13 11:20:45 2022 +0200

    [FLINK-26793][documentation] Add recommendations about the write timeout to the javadoc of Cassandra sink and output format.
---
 .../connectors/cassandra/CassandraOutputFormatBase.java     | 13 +++++++++++++
 .../connectors/cassandra/CassandraPojoOutputFormat.java     |  1 +
 .../connectors/cassandra/CassandraRowOutputFormat.java      |  5 ++++-
 .../connectors/cassandra/CassandraTupleOutputFormat.java    |  3 ++-
 .../streaming/connectors/cassandra/CassandraPojoSink.java   |  3 ++-
 .../streaming/connectors/cassandra/CassandraRowSink.java    |  5 ++++-
 .../connectors/cassandra/CassandraScalaProductSink.java     |  3 ++-
 .../streaming/connectors/cassandra/CassandraSinkBase.java   | 13 +++++++++++++
 .../streaming/connectors/cassandra/CassandraTupleSink.java  |  3 ++-
 9 files changed, 43 insertions(+), 6 deletions(-)

diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormatBase.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormatBase.java
index 8bda1f6878d..6dba992a0d4 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormatBase.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormatBase.java
@@ -33,6 +33,19 @@ import java.time.Duration;
  * CassandraOutputFormatBase is the common abstract class for writing into Apache Cassandra using
  * output formats.
  *
+ * <p>In case of experiencing the following error: {@code Error while sending value.
+ * com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout during write query
+ * at consistency LOCAL_ONE (1 replica were required but only 0 acknowledged the write)},
+ *
+ * <p>it is recommended to increase the Cassandra write timeout to adapt to your workload in your
+ * Cassandra cluster so that such timeout errors do not happen. For that you need to raise
+ * write_request_timeout_in_ms conf parameter in your cassandra.yml. Indeed, This exception means
+ * that Cassandra coordinator node (internal Cassandra) waited too long for an internal replication
+ * (replication to another node and did not ack the write. It is not recommended to lower the
+ * replication factor in your Cassandra cluster because it is mandatory that you do not loose data
+ * in case of a Cassandra cluster failure. Waiting for a single replica for write acknowledge is the
+ * minimum level for this guarantee in Cassandra.}
+ *
  * @param <OUT> Type of the elements to write.
  */
 abstract class CassandraOutputFormatBase<OUT, V> extends OutputFormatBase<OUT, V> {
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraPojoOutputFormat.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraPojoOutputFormat.java
index 6d27446837e..235cdfe9b19 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraPojoOutputFormat.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraPojoOutputFormat.java
@@ -30,6 +30,7 @@ import java.time.Duration;
 
 /**
  * OutputFormat to write data to Apache Cassandra and from a custom Cassandra annotated object.
+ * Please read the recommendations in {@linkplain CassandraOutputFormatBase}.
  *
  * @param <OUT> type of outputClass
  */
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraRowOutputFormat.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraRowOutputFormat.java
index 678da42fe89..0e7a5712211 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraRowOutputFormat.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraRowOutputFormat.java
@@ -22,7 +22,10 @@ import org.apache.flink.types.Row;
 
 import java.time.Duration;
 
-/** OutputFormat to write Flink {@link Row}s into a Cassandra cluster. */
+/**
+ * OutputFormat to write Flink {@link Row}s into a Cassandra cluster. * Please read the
+ * recommendations in {@linkplain CassandraOutputFormatBase}.
+ */
 public class CassandraRowOutputFormat extends CassandraColumnarOutputFormatBase<Row> {
 
     public CassandraRowOutputFormat(String insertQuery, ClusterBuilder builder) {
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraTupleOutputFormat.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraTupleOutputFormat.java
index 88e0a4a9174..9c038c5be6f 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraTupleOutputFormat.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraTupleOutputFormat.java
@@ -23,7 +23,8 @@ import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
 import java.time.Duration;
 
 /**
- * OutputFormat to write Flink {@link Tuple}s into a Cassandra cluster.
+ * OutputFormat to write Flink {@link Tuple}s into a Cassandra cluster. Please read the
+ * recommendations in {@linkplain CassandraOutputFormatBase}.
  *
  * @param <OUT> Type of {@link Tuple} to write to Cassandra.
  */
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
index 504a43d6d76..8a03c395e60 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
@@ -32,7 +32,8 @@ import javax.annotation.Nullable;
  * href="http://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/mapping/Mapper.html">Mapper</a>,
  * which it uses annotations from <a
  * href="http://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/mapping/annotations/package-summary.html">
- * com.datastax.driver.mapping.annotations</a>.
+ * com.datastax.driver.mapping.annotations</a>. Please read the recommendations in {@linkplain
+ * CassandraSinkBase}.
  *
  * @param <IN> Type of the elements emitted by this sink
  */
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java
index 848f21ccec3..6d474873618 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java
@@ -19,7 +19,10 @@ package org.apache.flink.streaming.connectors.cassandra;
 
 import org.apache.flink.types.Row;
 
-/** A SinkFunction to write Row records into a Cassandra table. */
+/**
+ * A SinkFunction to write Row records into a Cassandra table. Please read the recommendations in
+ * {@linkplain CassandraSinkBase}.
+ */
 public class CassandraRowSink extends AbstractCassandraTupleSink<Row> {
 
     private final int rowArity;
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java
index 8580103c81c..43a1473edb2 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java
@@ -21,7 +21,8 @@ package org.apache.flink.streaming.connectors.cassandra;
 import scala.Product;
 
 /**
- * Sink to write scala tuples and case classes into a Cassandra cluster.
+ * Sink to write scala tuples and case classes into a Cassandra cluster. Please read the
+ * recommendations in {@linkplain CassandraSinkBase}.
  *
  * @param <IN> Type of the elements emitted by this sink, it must extend {@link Product}
  */
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
index 35e9a8f4e87..03508899efb 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
@@ -44,6 +44,19 @@ import java.util.concurrent.atomic.AtomicReference;
  * CassandraSinkBase is the common abstract class of {@link CassandraPojoSink} and {@link
  * CassandraTupleSink}.
  *
+ * <p>In case of experiencing the following error: {@code Error while sending value.
+ * com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout during write query
+ * at consistency LOCAL_ONE (1 replica were required but only 0 acknowledged the write)},
+ *
+ * <p>it is recommended to increase the Cassandra write timeout to adapt to your workload in your
+ * Cassandra cluster so that such timeout errors do not happen. For that you need to raise
+ * write_request_timeout_in_ms conf parameter in your cassandra.yml. Indeed, This exception means
+ * that Cassandra coordinator node (internal Cassandra) waited too long for an internal replication
+ * (replication to another node and did not ack the write. It is not recommended to lower the
+ * replication factor in your Cassandra cluster because it is mandatory that you do not loose data
+ * in case of a Cassandra cluster failure. Waiting for a single replica for write acknowledge is the
+ * minimum level for this guarantee in Cassandra.}
+ *
  * @param <IN> Type of the elements emitted by this sink
  */
 public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN>
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
index b7804779cc9..523b163b1b8 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
@@ -20,7 +20,8 @@ package org.apache.flink.streaming.connectors.cassandra;
 import org.apache.flink.api.java.tuple.Tuple;
 
 /**
- * Sink to write Flink {@link Tuple}s into a Cassandra cluster.
+ * Sink to write Flink {@link Tuple}s into a Cassandra cluster. Please read the recommendations in
+ * {@linkplain CassandraSinkBase}.
  *
  * @param <IN> Type of the elements emitted by this sink, it must extend {@link Tuple}
  */