You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/07/03 22:35:01 UTC

[flink] 06/07: [FLINK-12820] [Connectors / Cassandra] Support ignoring writing nulls for tuple types

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a125e2a11444847920911927955be2d119716838
Author: ozan <oz...@gmail.com>
AuthorDate: Wed Jun 12 21:35:54 2019 +0900

    [FLINK-12820] [Connectors / Cassandra] Support ignoring writing nulls for tuple types
    
    This closes #8714
---
 .../cassandra/AbstractCassandraTupleSink.java      | 17 +++++++++++-
 .../connectors/cassandra/CassandraSink.java        | 13 ++++++++++
 .../cassandra/CassandraSinkBaseConfig.java         | 27 +++++++++++++++++--
 .../cassandra/CassandraConnectorITCase.java        | 30 ++++++++++++++++++++++
 4 files changed, 84 insertions(+), 3 deletions(-)

diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java
index 5e1fcca..3cf8aa4 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.cassandra;
 
 import org.apache.flink.configuration.Configuration;
 
+import com.datastax.driver.core.BoundStatement;
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.ResultSet;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -31,6 +32,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 public abstract class AbstractCassandraTupleSink<IN> extends CassandraSinkBase<IN, ResultSet> {
 	private final String insertQuery;
 	private transient PreparedStatement ps;
+	private final boolean ignoreNullFields;
 
 	public AbstractCassandraTupleSink(
 			String insertQuery,
@@ -39,6 +41,7 @@ public abstract class AbstractCassandraTupleSink<IN> extends CassandraSinkBase<I
 			CassandraFailureHandler failureHandler) {
 		super(builder, config, failureHandler);
 		this.insertQuery = insertQuery;
+		this.ignoreNullFields = config.getIgnoreNullFields();
 	}
 
 	@Override
@@ -50,7 +53,19 @@ public abstract class AbstractCassandraTupleSink<IN> extends CassandraSinkBase<I
 	@Override
 	public ListenableFuture<ResultSet> send(IN value) {
 		Object[] fields = extract(value);
-		return session.executeAsync(ps.bind(fields));
+		return session.executeAsync(bind(fields));
+	}
+
+	private BoundStatement bind(Object[] fields) {
+		BoundStatement bs = ps.bind(fields);
+		if (ignoreNullFields) {
+			for (int i = 0; i < fields.length; i++) {
+				if (fields[i] == null) {
+					bs.unset(i);
+				}
+			}
+		}
+		return bs;
 	}
 
 	protected abstract Object[] extract(IN record);
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
index 128e5e0..2a8ebff 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
@@ -399,6 +399,19 @@ public class CassandraSink<IN> {
 		}
 
 		/**
+		 * Enables ignoring null values, treats null values as unset and avoids writing null fields
+		 * and creating tombstones.
+		 *
+		 * <p>This call has no effect if {@link CassandraSinkBuilder#enableWriteAheadLog()} is called.
+		 *
+		 * @return this builder
+		 */
+		public CassandraSinkBuilder<IN> enableIgnoreNullFields() {
+			this.configBuilder.setIgnoreNullFields(true);
+			return this;
+		}
+
+		/**
 		 * Finalizes the configuration of this sink.
 		 *
 		 * @return finalized sink
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseConfig.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseConfig.java
index cb8d904..d48f973 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseConfig.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseConfig.java
@@ -38,6 +38,12 @@ public final class CassandraSinkBaseConfig implements Serializable  {
 	 */
 	public static final Duration DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = Duration.ofMillis(Long.MAX_VALUE);
 
+	/**
+	 * The default option to ignore null fields on insertion. By default, {@code false}.
+	 */
+	public static final boolean DEFAULT_IGNORE_NULL_FIELDS = false;
+
+
 	// ------------------------- Configuration Fields -------------------------
 
 	/** Maximum number of concurrent requests allowed. */
@@ -46,9 +52,13 @@ public final class CassandraSinkBaseConfig implements Serializable  {
 	/** Timeout duration when acquiring a permit to execute. */
 	private final Duration maxConcurrentRequestsTimeout;
 
+	/** Whether to ignore null fields on insert. */
+	private final boolean ignoreNullFields;
+
 	private CassandraSinkBaseConfig(
 			int maxConcurrentRequests,
-			Duration maxConcurrentRequestsTimeout) {
+			Duration maxConcurrentRequestsTimeout,
+			boolean ignoreNullFields) {
 		Preconditions.checkArgument(maxConcurrentRequests > 0,
 			"Max concurrent requests is expected to be positive");
 		Preconditions.checkNotNull(maxConcurrentRequestsTimeout,
@@ -57,6 +67,7 @@ public final class CassandraSinkBaseConfig implements Serializable  {
 			"Max concurrent requests timeout is expected to be positive");
 		this.maxConcurrentRequests = maxConcurrentRequests;
 		this.maxConcurrentRequestsTimeout = maxConcurrentRequestsTimeout;
+		this.ignoreNullFields = ignoreNullFields;
 	}
 
 	public int getMaxConcurrentRequests() {
@@ -67,11 +78,16 @@ public final class CassandraSinkBaseConfig implements Serializable  {
 		return maxConcurrentRequestsTimeout;
 	}
 
+	public boolean getIgnoreNullFields() {
+		return ignoreNullFields;
+	}
+
 	@Override
 	public String toString() {
 		return "CassandraSinkBaseConfig{" +
 			"maxConcurrentRequests=" + maxConcurrentRequests +
 			", maxConcurrentRequestsTimeout=" + maxConcurrentRequestsTimeout +
+			", ignoreNullFields=" + ignoreNullFields +
 			'}';
 	}
 
@@ -85,6 +101,7 @@ public final class CassandraSinkBaseConfig implements Serializable  {
 	public static class Builder {
 		private int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS;
 		private Duration maxConcurrentRequestsTimeout = DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT;
+		private boolean ignoreNullFields = DEFAULT_IGNORE_NULL_FIELDS;
 
 		Builder() { }
 
@@ -98,10 +115,16 @@ public final class CassandraSinkBaseConfig implements Serializable  {
 			return this;
 		}
 
+		public Builder setIgnoreNullFields(boolean ignoreNullFields) {
+			this.ignoreNullFields = ignoreNullFields;
+			return this;
+		}
+
 		public CassandraSinkBaseConfig build() {
 			return new CassandraSinkBaseConfig(
 				maxConcurrentRequests,
-				maxConcurrentRequestsTimeout);
+				maxConcurrentRequestsTimeout,
+				ignoreNullFields);
 		}
 	}
 }
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index b3d8f65..0119c16 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -637,4 +637,34 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 		}
 		Assert.assertEquals(0, scalaTupleCollection.size());
 	}
+
+	@Test
+	public void testCassandraScalaTuplePartialColumnUpdate() throws Exception {
+		CassandraSinkBaseConfig config = CassandraSinkBaseConfig.newBuilder().setIgnoreNullFields(true).build();
+		CassandraScalaProductSink<scala.Tuple3<String, Integer, Integer>> sink = new CassandraScalaProductSink<>(injectTableName(INSERT_DATA_QUERY), builder, config);
+
+		String id = UUID.randomUUID().toString();
+		Integer counter = 1;
+		Integer batchId = 0;
+
+		// Send partial records across multiple request
+		scala.Tuple3<String, Integer, Integer> scalaTupleRecordFirst = new scala.Tuple3<>(id, counter, null);
+		scala.Tuple3<String, Integer, Integer> scalaTupleRecordSecond = new scala.Tuple3<>(id, null, batchId);
+
+		try {
+			sink.open(new Configuration());
+			sink.invoke(scalaTupleRecordFirst, SinkContextUtil.forTimestamp(0));
+			sink.invoke(scalaTupleRecordSecond, SinkContextUtil.forTimestamp(0));
+		} finally {
+			sink.close();
+		}
+
+		ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY));
+		List<com.datastax.driver.core.Row> rows = rs.all();
+		Assert.assertEquals(1, rows.size());
+		// Since nulls are ignored, we should be reading one complete record
+		for (com.datastax.driver.core.Row row : rows) {
+			Assert.assertEquals(new scala.Tuple3<>(id, counter, batchId), new scala.Tuple3<>(row.getString("id"), row.getInt("counter"), row.getInt("batch_id")));
+		}
+	}
 }