You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/07/03 22:35:01 UTC
[flink] 06/07: [FLINK-12820] [Connectors / Cassandra] Support
ignoring writing nulls for tuple types
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit a125e2a11444847920911927955be2d119716838
Author: ozan <oz...@gmail.com>
AuthorDate: Wed Jun 12 21:35:54 2019 +0900
[FLINK-12820] [Connectors / Cassandra] Support ignoring writing nulls for tuple types
This closes #8714
---
.../cassandra/AbstractCassandraTupleSink.java | 17 +++++++++++-
.../connectors/cassandra/CassandraSink.java | 13 ++++++++++
.../cassandra/CassandraSinkBaseConfig.java | 27 +++++++++++++++++--
.../cassandra/CassandraConnectorITCase.java | 30 ++++++++++++++++++++++
4 files changed, 84 insertions(+), 3 deletions(-)
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java
index 5e1fcca..3cf8aa4 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.cassandra;
import org.apache.flink.configuration.Configuration;
+import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.google.common.util.concurrent.ListenableFuture;
@@ -31,6 +32,7 @@ import com.google.common.util.concurrent.ListenableFuture;
public abstract class AbstractCassandraTupleSink<IN> extends CassandraSinkBase<IN, ResultSet> {
private final String insertQuery;
private transient PreparedStatement ps;
+ private final boolean ignoreNullFields;
public AbstractCassandraTupleSink(
String insertQuery,
@@ -39,6 +41,7 @@ public abstract class AbstractCassandraTupleSink<IN> extends CassandraSinkBase<I
CassandraFailureHandler failureHandler) {
super(builder, config, failureHandler);
this.insertQuery = insertQuery;
+ this.ignoreNullFields = config.getIgnoreNullFields();
}
@Override
@@ -50,7 +53,19 @@ public abstract class AbstractCassandraTupleSink<IN> extends CassandraSinkBase<I
@Override
public ListenableFuture<ResultSet> send(IN value) {
Object[] fields = extract(value);
- return session.executeAsync(ps.bind(fields));
+ return session.executeAsync(bind(fields));
+ }
+
+ private BoundStatement bind(Object[] fields) {
+ BoundStatement bs = ps.bind(fields);
+ if (ignoreNullFields) {
+ for (int i = 0; i < fields.length; i++) {
+ if (fields[i] == null) {
+ bs.unset(i);
+ }
+ }
+ }
+ return bs;
}
protected abstract Object[] extract(IN record);
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
index 128e5e0..2a8ebff 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
@@ -399,6 +399,19 @@ public class CassandraSink<IN> {
}
/**
+ * Enables ignoring null values, treats null values as unset and avoids writing null fields
+ * and creating tombstones.
+ *
+ * <p>This call has no effect if {@link CassandraSinkBuilder#enableWriteAheadLog()} is called.
+ *
+ * @return this builder
+ */
+ public CassandraSinkBuilder<IN> enableIgnoreNullFields() {
+ this.configBuilder.setIgnoreNullFields(true);
+ return this;
+ }
+
+ /**
* Finalizes the configuration of this sink.
*
* @return finalized sink
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseConfig.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseConfig.java
index cb8d904..d48f973 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseConfig.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseConfig.java
@@ -38,6 +38,12 @@ public final class CassandraSinkBaseConfig implements Serializable {
*/
public static final Duration DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = Duration.ofMillis(Long.MAX_VALUE);
+ /**
+ * The default option to ignore null fields on insertion. By default, {@code false}.
+ */
+ public static final boolean DEFAULT_IGNORE_NULL_FIELDS = false;
+
+
// ------------------------- Configuration Fields -------------------------
/** Maximum number of concurrent requests allowed. */
@@ -46,9 +52,13 @@ public final class CassandraSinkBaseConfig implements Serializable {
/** Timeout duration when acquiring a permit to execute. */
private final Duration maxConcurrentRequestsTimeout;
+ /** Whether to ignore null fields on insert. */
+ private final boolean ignoreNullFields;
+
private CassandraSinkBaseConfig(
int maxConcurrentRequests,
- Duration maxConcurrentRequestsTimeout) {
+ Duration maxConcurrentRequestsTimeout,
+ boolean ignoreNullFields) {
Preconditions.checkArgument(maxConcurrentRequests > 0,
"Max concurrent requests is expected to be positive");
Preconditions.checkNotNull(maxConcurrentRequestsTimeout,
@@ -57,6 +67,7 @@ public final class CassandraSinkBaseConfig implements Serializable {
"Max concurrent requests timeout is expected to be positive");
this.maxConcurrentRequests = maxConcurrentRequests;
this.maxConcurrentRequestsTimeout = maxConcurrentRequestsTimeout;
+ this.ignoreNullFields = ignoreNullFields;
}
public int getMaxConcurrentRequests() {
@@ -67,11 +78,16 @@ public final class CassandraSinkBaseConfig implements Serializable {
return maxConcurrentRequestsTimeout;
}
+ public boolean getIgnoreNullFields() {
+ return ignoreNullFields;
+ }
+
@Override
public String toString() {
return "CassandraSinkBaseConfig{" +
"maxConcurrentRequests=" + maxConcurrentRequests +
", maxConcurrentRequestsTimeout=" + maxConcurrentRequestsTimeout +
+ ", ignoreNullFields=" + ignoreNullFields +
'}';
}
@@ -85,6 +101,7 @@ public final class CassandraSinkBaseConfig implements Serializable {
public static class Builder {
private int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS;
private Duration maxConcurrentRequestsTimeout = DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT;
+ private boolean ignoreNullFields = DEFAULT_IGNORE_NULL_FIELDS;
Builder() { }
@@ -98,10 +115,16 @@ public final class CassandraSinkBaseConfig implements Serializable {
return this;
}
+ public Builder setIgnoreNullFields(boolean ignoreNullFields) {
+ this.ignoreNullFields = ignoreNullFields;
+ return this;
+ }
+
public CassandraSinkBaseConfig build() {
return new CassandraSinkBaseConfig(
maxConcurrentRequests,
- maxConcurrentRequestsTimeout);
+ maxConcurrentRequestsTimeout,
+ ignoreNullFields);
}
}
}
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index b3d8f65..0119c16 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -637,4 +637,34 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
}
Assert.assertEquals(0, scalaTupleCollection.size());
}
+
+ @Test
+ public void testCassandraScalaTuplePartialColumnUpdate() throws Exception {
+ CassandraSinkBaseConfig config = CassandraSinkBaseConfig.newBuilder().setIgnoreNullFields(true).build();
+ CassandraScalaProductSink<scala.Tuple3<String, Integer, Integer>> sink = new CassandraScalaProductSink<>(injectTableName(INSERT_DATA_QUERY), builder, config);
+
+ String id = UUID.randomUUID().toString();
+ Integer counter = 1;
+ Integer batchId = 0;
+
+ // Send partial records across multiple request
+ scala.Tuple3<String, Integer, Integer> scalaTupleRecordFirst = new scala.Tuple3<>(id, counter, null);
+ scala.Tuple3<String, Integer, Integer> scalaTupleRecordSecond = new scala.Tuple3<>(id, null, batchId);
+
+ try {
+ sink.open(new Configuration());
+ sink.invoke(scalaTupleRecordFirst, SinkContextUtil.forTimestamp(0));
+ sink.invoke(scalaTupleRecordSecond, SinkContextUtil.forTimestamp(0));
+ } finally {
+ sink.close();
+ }
+
+ ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY));
+ List<com.datastax.driver.core.Row> rows = rs.all();
+ Assert.assertEquals(1, rows.size());
+ // Since nulls are ignored, we should be reading one complete record
+ for (com.datastax.driver.core.Row row : rows) {
+ Assert.assertEquals(new scala.Tuple3<>(id, counter, batchId), new scala.Tuple3<>(row.getString("id"), row.getInt("counter"), row.getInt("batch_id")));
+ }
+ }
}