You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/11/01 13:53:01 UTC
flink git commit: [FLINK-6225] [cassandra] Add a
CassandraAppendTableSink.
Repository: flink
Updated Branches:
refs/heads/master 0df8e0797 -> 1809cad6d
[FLINK-6225] [cassandra] Add a CassandraAppendTableSink.
This closes #3748.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1809cad6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1809cad6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1809cad6
Branch: refs/heads/master
Commit: 1809cad6d05522d6185a69ca14ddc275d5ebbbf1
Parents: 0df8e07
Author: Jing Fan <ji...@uber.com>
Authored: Thu Apr 20 17:54:36 2017 -0700
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 1 14:53:09 2017 +0100
----------------------------------------------------------------------
docs/dev/table/sourceSinks.md | 44 ++++-
.../flink-connector-cassandra/pom.xml | 6 +
.../cassandra/CassandraAppendTableSink.java | 88 ++++++++++
.../connectors/cassandra/CassandraRowSink.java | 42 +++++
.../cassandra/CassandraRowWriteAheadSink.java | 162 +++++++++++++++++++
.../connectors/cassandra/CassandraSink.java | 40 ++++-
.../cassandra/CassandraConnectorITCase.java | 75 ++++++++-
.../java/typeutils/runtime/RowSerializer.java | 7 +
tools/maven/suppressions.xml | 2 +-
9 files changed, 454 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1809cad6/docs/dev/table/sourceSinks.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md
index 43542f3..dfa7954 100644
--- a/docs/dev/table/sourceSinks.md
+++ b/docs/dev/table/sourceSinks.md
@@ -492,7 +492,8 @@ The following table lists the `TableSink`s which are provided with Flink.
| **Class name** | **Maven dependency** | **Batch?** | **Streaming?** | **Description**
| `CsvTableSink` | `flink-table` | Y | Append | A simple sink for CSV files.
-| `JDBCAppendTableSink` | `flink-jdbc` | Y | Append | Writes tables to a JDBC database.
+| `JDBCAppendTableSink` | `flink-jdbc` | Y | Append | Writes a Table to a JDBC table.
+| `CassandraAppendTableSink` | `flink-connector-cassandra` | N | Append | Writes a Table to a Cassandra table.
| `Kafka08JsonTableSink` | `flink-connector-kafka-0.8` | N | Append | A Kafka 0.8 sink with JSON encoding.
| `Kafka09JsonTableSink` | `flink-connector-kafka-0.9` | N | Append | A Kafka 0.9 sink with JSON encoding.
@@ -583,6 +584,47 @@ Similar to using <code>JDBCOutputFormat</code>, you have to explicitly specify t
{% top %}
+### CassandraAppendTableSink
+
+The `CassandraAppendTableSink` emits a `Table` to a Cassandra table. The sink only supports append-only streaming tables. It cannot be used to emit a `Table` that is continuously updated. See the [documentation on Table to Stream conversions](./streaming.html#table-to-stream-conversion) for details.
+
+The `CassandraAppendTableSink` inserts all rows at least once into the Cassandra table if checkpointing is enabled. However, you can specify the query as upsert query.
+
+To use the `CassandraAppendTableSink`, you have to add the Cassandra connector dependency (<code>flink-connector-cassandra</code>) to your project. The example below shows how to use the `CassandraAppendTableSink`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+ClusterBuilder builder = ... // configure Cassandra cluster connection
+
+CassandraAppendTableSink sink = new CassandraAppendTableSink(
+ builder,
+ // the query must match the schema of the table
+ INSERT INTO flink.myTable (id, name, value) VALUES (?, ?, ?));
+
+Table table = ...
+table.writeToSink(sink);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val builder: ClusterBuilder = ... // configure Cassandra cluster connection
+
+val sink: CassandraAppendTableSink = new CassandraAppendTableSink(
+ builder,
+ // the query must match the schema of the table
+ INSERT INTO flink.myTable (id, name, value) VALUES (?, ?, ?))
+
+val table: Table = ???
+table.writeToSink(sink)
+{% endhighlight %}
+</div>
+</div>
+
+{% top %}
+
Define a TableSource
--------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1809cad6/flink-connectors/flink-connector-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/pom.xml b/flink-connectors/flink-connector-cassandra/pom.xml
index c97b43f..3c1d3e1 100644
--- a/flink-connectors/flink-connector-cassandra/pom.xml
+++ b/flink-connectors/flink-connector-cassandra/pom.xml
@@ -197,5 +197,11 @@ under the License.
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/1809cad6/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java
new file mode 100644
index 0000000..395ff9a
--- /dev/null
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Properties;
+
+/**
+ * An {@link AppendStreamTableSink} to write an append stream Table to a Cassandra table.
+ */
+public class CassandraAppendTableSink implements AppendStreamTableSink<Row> {
+
+ private final ClusterBuilder builder;
+ private final String cql;
+ private String[] fieldNames;
+ private TypeInformation[] fieldTypes;
+ private final Properties properties;
+
+ public CassandraAppendTableSink(ClusterBuilder builder, String cql) {
+ this.builder = Preconditions.checkNotNull(builder, "ClusterBuilder must not be null.");
+ this.cql = Preconditions.checkNotNull(cql, "CQL query must not be null.");
+ this.properties = new Properties();
+ }
+
+ public CassandraAppendTableSink(ClusterBuilder builder, String cql, Properties properties) {
+ this.builder = Preconditions.checkNotNull(builder, "ClusterBuilder must not be null.");
+ this.cql = Preconditions.checkNotNull(cql, "CQL query must not be null.");
+ this.properties = Preconditions.checkNotNull(properties, "Properties must not be null.");
+ }
+
+ @Override
+ public TypeInformation<Row> getOutputType() {
+ return new RowTypeInfo(fieldTypes);
+ }
+
+ @Override
+ public String[] getFieldNames() {
+ return this.fieldNames;
+ }
+
+ @Override
+ public TypeInformation<?>[] getFieldTypes() {
+ return this.fieldTypes;
+ }
+
+ @Override
+ public CassandraAppendTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+ CassandraAppendTableSink cassandraTableSink = new CassandraAppendTableSink(this.builder, this.cql, this.properties);
+ cassandraTableSink.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names must not be null.");
+ cassandraTableSink.fieldTypes = Preconditions.checkNotNull(fieldTypes, "Field types must not be null.");
+ Preconditions.checkArgument(fieldNames.length == fieldTypes.length,
+ "Number of provided field names and types does not match.");
+ return cassandraTableSink;
+ }
+
+ @Override
+ public void emitDataStream(DataStream<Row> dataStream) {
+ try {
+ CassandraSink.addSink(dataStream)
+ .setClusterBuilder(this.builder)
+ .setQuery(this.cql)
+ .build();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1809cad6/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java
new file mode 100644
index 0000000..fbbeb96
--- /dev/null
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.types.Row;
+
+/**
+ * A SinkFunction to write Row records into a Cassandra table.
+ */
+public class CassandraRowSink extends AbstractCassandraTupleSink<Row> {
+
+ private final int rowArity;
+
+ public CassandraRowSink(int rowArity, String insertQuery, ClusterBuilder builder) {
+ super(insertQuery, builder);
+ this.rowArity = rowArity;
+ }
+
+ @Override
+ protected Object[] extract(Row record) {
+ Object[] al = new Object[rowArity];
+ for (int i = 0; i < rowArity; i++) {
+ al[i] = record.getField(i);
+ }
+ return al;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1809cad6/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowWriteAheadSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowWriteAheadSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowWriteAheadSink.java
new file mode 100644
index 0000000..6b3d418
--- /dev/null
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowWriteAheadSink.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.runtime.RowSerializer;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink;
+import org.apache.flink.types.Row;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Sink that emits its input elements into a Cassandra table. This sink stores incoming records within a
+ * {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only commits them to Cassandra
+ * if a checkpoint is completed.
+ *
+ */
+public class CassandraRowWriteAheadSink extends GenericWriteAheadSink<Row> {
+ private static final long serialVersionUID = 1L;
+
+ protected transient Cluster cluster;
+ protected transient Session session;
+
+ private final String insertQuery;
+ private transient PreparedStatement preparedStatement;
+
+ private ClusterBuilder builder;
+
+ private int arity;
+ private transient Object[] fields;
+
+ protected CassandraRowWriteAheadSink(String insertQuery, TypeSerializer<Row> serializer, ClusterBuilder builder, CheckpointCommitter committer) throws Exception {
+ super(committer, serializer, UUID.randomUUID().toString().replace("-", "_"));
+ this.insertQuery = insertQuery;
+ this.builder = builder;
+ ClosureCleaner.clean(builder, true);
+ }
+
+ public void open() throws Exception {
+ super.open();
+ if (!getRuntimeContext().isCheckpointingEnabled()) {
+ throw new IllegalStateException("The write-ahead log requires checkpointing to be enabled.");
+ }
+ cluster = builder.getCluster();
+ session = cluster.connect();
+ preparedStatement = session.prepare(insertQuery);
+
+ arity = ((RowSerializer) serializer).getArity();
+ fields = new Object[arity];
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ try {
+ if (session != null) {
+ session.close();
+ }
+ } catch (Exception e) {
+ LOG.error("Error while closing session.", e);
+ }
+ try {
+ if (cluster != null) {
+ cluster.close();
+ }
+ } catch (Exception e) {
+ LOG.error("Error while closing cluster.", e);
+ }
+ }
+
+ @Override
+ protected boolean sendValues(Iterable<Row> values, long checkpointId, long timestamp) throws Exception {
+ final AtomicInteger updatesCount = new AtomicInteger(0);
+ final AtomicInteger updatesConfirmed = new AtomicInteger(0);
+
+ final AtomicReference<Throwable> exception = new AtomicReference<>();
+
+ FutureCallback<ResultSet> callback = new FutureCallback<ResultSet>() {
+ @Override
+ public void onSuccess(ResultSet resultSet) {
+ updatesConfirmed.incrementAndGet();
+ if (updatesCount.get() > 0) { // only set if all updates have been sent
+ if (updatesCount.get() == updatesConfirmed.get()) {
+ synchronized (updatesConfirmed) {
+ updatesConfirmed.notifyAll();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ if (exception.compareAndSet(null, throwable)) {
+ LOG.error("Error while sending value.", throwable);
+ synchronized (updatesConfirmed) {
+ updatesConfirmed.notifyAll();
+ }
+ }
+ }
+ };
+
+ //set values for prepared statement
+ int updatesSent = 0;
+ for (Row value : values) {
+ for (int x = 0; x < arity; x++) {
+ fields[x] = value.getField(x);
+ }
+ //insert values and send to cassandra
+ BoundStatement s = preparedStatement.bind(fields);
+ s.setDefaultTimestamp(timestamp);
+ ResultSetFuture result = session.executeAsync(s);
+ updatesSent++;
+ if (result != null) {
+ //add callback to detect errors
+ Futures.addCallback(result, callback);
+ }
+ }
+ updatesCount.set(updatesSent);
+
+ synchronized (updatesConfirmed) {
+ while (exception.get() == null && updatesSent != updatesConfirmed.get()) {
+ updatesConfirmed.wait();
+ }
+ }
+
+ if (exception.get() != null) {
+ LOG.warn("Sending a value failed.", exception.get());
+ return false;
+ } else {
+ return true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1809cad6/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
index 29c4b21..3543378 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -32,6 +33,7 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.transformations.SinkTransformation;
import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.types.Row;
import com.datastax.driver.core.Cluster;
@@ -205,12 +207,16 @@ public class CassandraSink<IN> {
* @param <IN> input type
* @return CassandraSinkBuilder, to further configure the sink
*/
- public static <IN, T extends Tuple> CassandraSinkBuilder<IN> addSink(DataStream<IN> input) {
+ public static <IN> CassandraSinkBuilder<IN> addSink(DataStream<IN> input) {
TypeInformation<IN> typeInfo = input.getType();
if (typeInfo instanceof TupleTypeInfo) {
- DataStream<T> tupleInput = (DataStream<T>) input;
+ DataStream<Tuple> tupleInput = (DataStream<Tuple>) input;
return (CassandraSinkBuilder<IN>) new CassandraTupleSinkBuilder<>(tupleInput, tupleInput.getType(), tupleInput.getType().createSerializer(tupleInput.getExecutionEnvironment().getConfig()));
}
+ if (typeInfo instanceof RowTypeInfo) {
+ DataStream<Row> rowInput = (DataStream<Row>) input;
+ return (CassandraSinkBuilder<IN>) new CassandraRowSinkBuilder(rowInput, rowInput.getType(), rowInput.getType().createSerializer(rowInput.getExecutionEnvironment().getConfig()));
+ }
if (typeInfo instanceof PojoTypeInfo) {
return new CassandraPojoSinkBuilder<>(input, input.getType(), input.getType().createSerializer(input.getExecutionEnvironment().getConfig()));
}
@@ -391,6 +397,36 @@ public class CassandraSink<IN> {
}
/**
+ * Builder for a {@link CassandraRowSink}.
+ */
+ public static class CassandraRowSinkBuilder extends CassandraSinkBuilder<Row> {
+ public CassandraRowSinkBuilder(DataStream<Row> input, TypeInformation<Row> typeInfo, TypeSerializer<Row> serializer) {
+ super(input, typeInfo, serializer);
+ }
+
+ @Override
+ protected void sanityCheck() {
+ super.sanityCheck();
+ if (query == null || query.length() == 0) {
+ throw new IllegalArgumentException("Query must not be null or empty.");
+ }
+ }
+
+ @Override
+ protected CassandraSink<Row> createSink() throws Exception {
+ return new CassandraSink<>(input.addSink(new CassandraRowSink(typeInfo.getArity(), query, builder)).name("Cassandra Sink"));
+
+ }
+
+ @Override
+ protected CassandraSink<Row> createWriteAheadSink() throws Exception {
+ return committer == null
+ ? new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraRowWriteAheadSink(query, serializer, builder, new CassandraCommitter(builder))))
+ : new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraRowWriteAheadSink(query, serializer, builder, committer)));
+ }
+ }
+
+ /**
* Builder for a {@link CassandraPojoSink}.
* @param <IN>
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/1809cad6/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index f52a42c..f1b598f 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -35,15 +35,17 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
+import org.apache.flink.table.api.StreamTableEnvironment;
+import org.apache.flink.types.Row;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.QueryOptions;
import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import org.apache.cassandra.service.CassandraDaemon;
import org.junit.AfterClass;
@@ -58,6 +60,7 @@ import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -83,11 +86,15 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
private static EmbeddedCassandraService cassandra;
+ private static final String HOST = "127.0.0.1";
+
+ private static final int PORT = 9042;
+
private static ClusterBuilder builder = new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder
- .addContactPoint("127.0.0.1")
+ .addContactPointsWithPorts(new InetSocketAddress(HOST, PORT))
.withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.ONE).setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL))
.withoutJMXReporting()
.withoutMetrics().build();
@@ -108,10 +115,16 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
private int tableID;
private static final ArrayList<Tuple3<String, Integer, Integer>> collection = new ArrayList<>(20);
+ private static final ArrayList<Row> rowCollection = new ArrayList<>(20);
+
+ private static final String[] FIELD_NAMES = {"id", "counter", "batch_id"};
+ private static final TypeInformation[] FIELD_TYPES = {
+ BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO};
static {
for (int i = 0; i < 20; i++) {
collection.add(new Tuple3<>(UUID.randomUUID().toString(), i, 0));
+ rowCollection.add(Row.of(UUID.randomUUID().toString(), i, 0));
}
}
@@ -245,7 +258,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
list.add(x);
}
- for (Row s : result) {
+ for (com.datastax.driver.core.Row s : result) {
list.remove(new Integer(s.getInt("counter")));
}
Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty());
@@ -260,7 +273,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
list.add(x);
}
- for (Row s : result) {
+ for (com.datastax.driver.core.Row s : result) {
list.remove(new Integer(s.getInt("counter")));
}
Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty());
@@ -278,7 +291,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
list.add(x);
}
- for (Row s : result) {
+ for (com.datastax.driver.core.Row s : result) {
list.remove(new Integer(s.getInt("counter")));
}
Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty());
@@ -300,7 +313,8 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
ArrayList<Integer> actual = new ArrayList<>();
ResultSet result = session.execute(injectTableName(SELECT_DATA_QUERY));
- for (Row s : result) {
+
+ for (com.datastax.driver.core.Row s : result) {
actual.add(s.getInt("counter"));
}
@@ -380,6 +394,22 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
}
@Test
+ public void testCassandraRowAtLeastOnceSink() throws Exception {
+ CassandraRowSink sink = new CassandraRowSink(FIELD_TYPES.length, injectTableName(INSERT_DATA_QUERY), builder);
+
+ sink.open(new Configuration());
+
+ for (Row value : rowCollection) {
+ sink.send(value);
+ }
+
+ sink.close();
+
+ ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY));
+ Assert.assertEquals(20, rs.all().size());
+ }
+
+ @Test
public void testCassandraPojoAtLeastOnceSink() throws Exception {
session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, "test"));
@@ -398,6 +428,35 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
}
@Test
+ public void testCassandraTableSink() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(4);
+ StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(env);
+
+ DataStreamSource<Row> source = env.fromCollection(rowCollection);
+
+ tEnv.registerDataStreamInternal("testFlinkTable", source);
+
+ tEnv.sql("select * from testFlinkTable").writeToSink(
+ new CassandraAppendTableSink(builder, injectTableName(INSERT_DATA_QUERY)));
+
+ env.execute();
+ ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY));
+
+ // validate that all input was correctly written to Cassandra
+ List<Row> input = new ArrayList<>(rowCollection);
+ List<com.datastax.driver.core.Row> output = rs.all();
+ for (com.datastax.driver.core.Row o : output) {
+ Row cmp = new Row(3);
+ cmp.setField(0, o.getString(0));
+ cmp.setField(1, o.getInt(2));
+ cmp.setField(2, o.getInt(1));
+ Assert.assertTrue("Row " + cmp + " was written to Cassandra but not in input.", input.remove(cmp));
+ }
+ Assert.assertTrue("The input data was not completely written to Cassandra", input.isEmpty());
+ }
+
+ @Test
public void testCassandraBatchFormats() throws Exception {
OutputFormat<Tuple3<String, Integer, Integer>> sink = new CassandraOutputFormat<>(injectTableName(INSERT_DATA_QUERY), builder);
sink.configure(new Configuration());
@@ -465,10 +524,10 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
sink.close();
ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY));
- List<Row> rows = rs.all();
+ List<com.datastax.driver.core.Row> rows = rs.all();
Assert.assertEquals(scalaTupleCollection.size(), rows.size());
- for (Row row : rows) {
+ for (com.datastax.driver.core.Row row : rows) {
scalaTupleCollection.remove(new scala.Tuple3<>(row.getString("id"), row.getInt("counter"), row.getInt("batch_id")));
}
Assert.assertEquals(0, scalaTupleCollection.size());
http://git-wip-us.apache.org/repos/asf/flink/blob/1809cad6/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
index bd08b04..7f9cc21 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
@@ -50,11 +50,14 @@ public final class RowSerializer extends TypeSerializer<Row> {
private final TypeSerializer<Object>[] fieldSerializers;
+ private final int arity;
+
private transient boolean[] nullMask;
@SuppressWarnings("unchecked")
public RowSerializer(TypeSerializer<?>[] fieldSerializers) {
this.fieldSerializers = (TypeSerializer<Object>[]) checkNotNull(fieldSerializers);
+ this.arity = fieldSerializers.length;
this.nullMask = new boolean[fieldSerializers.length];
}
@@ -135,6 +138,10 @@ public final class RowSerializer extends TypeSerializer<Row> {
return -1;
}
+ public int getArity() {
+ return arity;
+ }
+
@Override
public void serialize(Row record, DataOutputView target) throws IOException {
int len = fieldSerializers.length;
http://git-wip-us.apache.org/repos/asf/flink/blob/1809cad6/tools/maven/suppressions.xml
----------------------------------------------------------------------
diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml
index d897137..5d5c455 100644
--- a/tools/maven/suppressions.xml
+++ b/tools/maven/suppressions.xml
@@ -30,7 +30,7 @@ under the License.
<!-- Cassandra connectors have to use guava directly -->
<suppress
- files="AbstractCassandraTupleSink.java|CassandraInputFormat.java|CassandraOutputFormat.java|CassandraSinkBase.java|CassandraPojoSink.java|CassandraTupleWriteAheadSink.java"
+ files="AbstractCassandraTupleSink.java|CassandraInputFormat.java|CassandraOutputFormat.java|CassandraSinkBase.java|CassandraPojoSink.java|CassandraRowSink.java|CassandraTupleWriteAheadSink.java|CassandraRowWriteAheadSink.java"
checks="IllegalImport"/>
<!-- Kinesis producer has to use guava directly -->
<suppress