You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/11/01 13:53:01 UTC

flink git commit: [FLINK-6225] [cassandra] Add a CassandraAppendTableSink.

Repository: flink
Updated Branches:
  refs/heads/master 0df8e0797 -> 1809cad6d


[FLINK-6225] [cassandra] Add a CassandraAppendTableSink.

This closes #3748.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1809cad6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1809cad6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1809cad6

Branch: refs/heads/master
Commit: 1809cad6d05522d6185a69ca14ddc275d5ebbbf1
Parents: 0df8e07
Author: Jing Fan <ji...@uber.com>
Authored: Thu Apr 20 17:54:36 2017 -0700
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 1 14:53:09 2017 +0100

----------------------------------------------------------------------
 docs/dev/table/sourceSinks.md                   |  44 ++++-
 .../flink-connector-cassandra/pom.xml           |   6 +
 .../cassandra/CassandraAppendTableSink.java     |  88 ++++++++++
 .../connectors/cassandra/CassandraRowSink.java  |  42 +++++
 .../cassandra/CassandraRowWriteAheadSink.java   | 162 +++++++++++++++++++
 .../connectors/cassandra/CassandraSink.java     |  40 ++++-
 .../cassandra/CassandraConnectorITCase.java     |  75 ++++++++-
 .../java/typeutils/runtime/RowSerializer.java   |   7 +
 tools/maven/suppressions.xml                    |   2 +-
 9 files changed, 454 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1809cad6/docs/dev/table/sourceSinks.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md
index 43542f3..dfa7954 100644
--- a/docs/dev/table/sourceSinks.md
+++ b/docs/dev/table/sourceSinks.md
@@ -492,7 +492,8 @@ The following table lists the `TableSink`s which are provided with Flink.
 
 | **Class name** | **Maven dependency** | **Batch?** | **Streaming?** | **Description**
 | `CsvTableSink` | `flink-table` | Y | Append | A simple sink for CSV files.
-| `JDBCAppendTableSink` | `flink-jdbc` | Y | Append | Writes tables to a JDBC database.
+| `JDBCAppendTableSink` | `flink-jdbc` | Y | Append | Writes a Table to a JDBC table.
+| `CassandraAppendTableSink` | `flink-connector-cassandra` | N | Append | Writes a Table to a Cassandra table. 
 | `Kafka08JsonTableSink` | `flink-connector-kafka-0.8` | N | Append | A Kafka 0.8 sink with JSON encoding.
 | `Kafka09JsonTableSink` | `flink-connector-kafka-0.9` | N | Append | A Kafka 0.9 sink with JSON encoding.
 
@@ -583,6 +584,47 @@ Similar to using <code>JDBCOutputFormat</code>, you have to explicitly specify t
 
 {% top %}
 
+### CassandraAppendTableSink
+
+The `CassandraAppendTableSink` emits a `Table` to a Cassandra table. The sink only supports append-only streaming tables. It cannot be used to emit a `Table` that is continuously updated. See the [documentation on Table to Stream conversions](./streaming.html#table-to-stream-conversion) for details. 
+
+The `CassandraAppendTableSink` inserts all rows at least once into the Cassandra table if checkpointing is enabled. However, you can specify the query as upsert query.
+
+To use the `CassandraAppendTableSink`, you have to add the Cassandra connector dependency (<code>flink-connector-cassandra</code>) to your project. The example below shows how to use the `CassandraAppendTableSink`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+ClusterBuilder builder = ... // configure Cassandra cluster connection
+
+CassandraAppendTableSink sink = new CassandraAppendTableSink(
+  builder, 
+  // the query must match the schema of the table
+  INSERT INTO flink.myTable (id, name, value) VALUES (?, ?, ?));
+
+Table table = ...
+table.writeToSink(sink);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val builder: ClusterBuilder = ... // configure Cassandra cluster connection
+
+val sink: CassandraAppendTableSink = new CassandraAppendTableSink(
+  builder, 
+  // the query must match the schema of the table
+  INSERT INTO flink.myTable (id, name, value) VALUES (?, ?, ?))
+
+val table: Table = ???
+table.writeToSink(sink)
+{% endhighlight %}
+</div>
+</div>
+
+{% top %}
+
 Define a TableSource
 --------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1809cad6/flink-connectors/flink-connector-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/pom.xml b/flink-connectors/flink-connector-cassandra/pom.xml
index c97b43f..3c1d3e1 100644
--- a/flink-connectors/flink-connector-cassandra/pom.xml
+++ b/flink-connectors/flink-connector-cassandra/pom.xml
@@ -197,5 +197,11 @@ under the License.
 				</exclusion>
 			</exclusions>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
 	</dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/1809cad6/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java
new file mode 100644
index 0000000..395ff9a
--- /dev/null
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Properties;
+
+/**
+ * An {@link AppendStreamTableSink} to write an append stream Table to a Cassandra table.
+ */
+public class CassandraAppendTableSink implements AppendStreamTableSink<Row> {
+
+	private final ClusterBuilder builder;
+	private final String cql;
+	private String[] fieldNames;
+	private TypeInformation[] fieldTypes;
+	private final Properties properties;
+
+	public CassandraAppendTableSink(ClusterBuilder builder, String cql) {
+		this.builder = Preconditions.checkNotNull(builder, "ClusterBuilder must not be null.");
+		this.cql = Preconditions.checkNotNull(cql, "CQL query must not be null.");
+		this.properties = new Properties();
+	}
+
+	public CassandraAppendTableSink(ClusterBuilder builder, String cql, Properties properties) {
+		this.builder = Preconditions.checkNotNull(builder, "ClusterBuilder must not be null.");
+		this.cql = Preconditions.checkNotNull(cql, "CQL query must not be null.");
+		this.properties = Preconditions.checkNotNull(properties, "Properties must not be null.");
+	}
+
+	@Override
+	public TypeInformation<Row> getOutputType() {
+		return new RowTypeInfo(fieldTypes);
+	}
+
+	@Override
+	public String[] getFieldNames() {
+		return this.fieldNames;
+	}
+
+	@Override
+	public TypeInformation<?>[] getFieldTypes() {
+		return this.fieldTypes;
+	}
+
+	@Override
+	public CassandraAppendTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+		CassandraAppendTableSink cassandraTableSink = new CassandraAppendTableSink(this.builder, this.cql, this.properties);
+		cassandraTableSink.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names must not be null.");
+		cassandraTableSink.fieldTypes = Preconditions.checkNotNull(fieldTypes, "Field types must not be null.");
+		Preconditions.checkArgument(fieldNames.length == fieldTypes.length,
+			"Number of provided field names and types does not match.");
+		return cassandraTableSink;
+	}
+
+	@Override
+	public void emitDataStream(DataStream<Row> dataStream) {
+		try {
+			CassandraSink.addSink(dataStream)
+				.setClusterBuilder(this.builder)
+				.setQuery(this.cql)
+				.build();
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1809cad6/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java
new file mode 100644
index 0000000..fbbeb96
--- /dev/null
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.types.Row;
+
+/**
+ * A SinkFunction to write Row records into a Cassandra table.
+ */
+public class CassandraRowSink extends AbstractCassandraTupleSink<Row> {
+
+	private final int rowArity;
+
+	public CassandraRowSink(int rowArity, String insertQuery, ClusterBuilder builder) {
+		super(insertQuery, builder);
+		this.rowArity = rowArity;
+	}
+
+	@Override
+	protected Object[] extract(Row record) {
+		Object[] al = new Object[rowArity];
+		for (int i = 0; i < rowArity; i++) {
+			al[i] = record.getField(i);
+		}
+		return al;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1809cad6/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowWriteAheadSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowWriteAheadSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowWriteAheadSink.java
new file mode 100644
index 0000000..6b3d418
--- /dev/null
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowWriteAheadSink.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.runtime.RowSerializer;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink;
+import org.apache.flink.types.Row;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Sink that emits its input elements into a Cassandra table. This sink stores incoming records within a
+ * {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only commits them to Cassandra
+ * if a checkpoint is completed.
+ *
+ */
+public class CassandraRowWriteAheadSink extends GenericWriteAheadSink<Row> {
+	private static final long serialVersionUID = 1L;
+
+	protected transient Cluster cluster;
+	protected transient Session session;
+
+	private final String insertQuery;
+	private transient PreparedStatement preparedStatement;
+
+	private ClusterBuilder builder;
+
+	private int arity;
+	private transient Object[] fields;
+
+	protected CassandraRowWriteAheadSink(String insertQuery, TypeSerializer<Row> serializer, ClusterBuilder builder, CheckpointCommitter committer) throws Exception {
+		super(committer, serializer, UUID.randomUUID().toString().replace("-", "_"));
+		this.insertQuery = insertQuery;
+		this.builder = builder;
+		ClosureCleaner.clean(builder, true);
+	}
+
+	public void open() throws Exception {
+		super.open();
+		if (!getRuntimeContext().isCheckpointingEnabled()) {
+			throw new IllegalStateException("The write-ahead log requires checkpointing to be enabled.");
+		}
+		cluster = builder.getCluster();
+		session = cluster.connect();
+		preparedStatement = session.prepare(insertQuery);
+
+		arity = ((RowSerializer) serializer).getArity();
+		fields = new Object[arity];
+	}
+
+	@Override
+	public void close() throws Exception {
+		super.close();
+		try {
+			if (session != null) {
+				session.close();
+			}
+		} catch (Exception e) {
+			LOG.error("Error while closing session.", e);
+		}
+		try {
+			if (cluster != null) {
+				cluster.close();
+			}
+		} catch (Exception e) {
+			LOG.error("Error while closing cluster.", e);
+		}
+	}
+
+	@Override
+	protected boolean sendValues(Iterable<Row> values, long checkpointId, long timestamp) throws Exception {
+		final AtomicInteger updatesCount = new AtomicInteger(0);
+		final AtomicInteger updatesConfirmed = new AtomicInteger(0);
+
+		final AtomicReference<Throwable> exception = new AtomicReference<>();
+
+		FutureCallback<ResultSet> callback = new FutureCallback<ResultSet>() {
+			@Override
+			public void onSuccess(ResultSet resultSet) {
+				updatesConfirmed.incrementAndGet();
+				if (updatesCount.get() > 0) { // only set if all updates have been sent
+					if (updatesCount.get() == updatesConfirmed.get()) {
+						synchronized (updatesConfirmed) {
+							updatesConfirmed.notifyAll();
+						}
+					}
+				}
+			}
+
+			@Override
+			public void onFailure(Throwable throwable) {
+				if (exception.compareAndSet(null, throwable)) {
+					LOG.error("Error while sending value.", throwable);
+					synchronized (updatesConfirmed) {
+						updatesConfirmed.notifyAll();
+					}
+				}
+			}
+		};
+
+		//set values for prepared statement
+		int updatesSent = 0;
+		for (Row value : values) {
+			for (int x = 0; x < arity; x++) {
+				fields[x] = value.getField(x);
+			}
+			//insert values and send to cassandra
+			BoundStatement s = preparedStatement.bind(fields);
+			s.setDefaultTimestamp(timestamp);
+			ResultSetFuture result = session.executeAsync(s);
+			updatesSent++;
+			if (result != null) {
+				//add callback to detect errors
+				Futures.addCallback(result, callback);
+			}
+		}
+		updatesCount.set(updatesSent);
+
+		synchronized (updatesConfirmed) {
+			while (exception.get() == null && updatesSent != updatesConfirmed.get()) {
+				updatesConfirmed.wait();
+			}
+		}
+
+		if (exception.get() != null) {
+			LOG.warn("Sending a value failed.", exception.get());
+			return false;
+		} else {
+			return true;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1809cad6/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
index 29c4b21..3543378 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -32,6 +33,7 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.transformations.SinkTransformation;
 import org.apache.flink.streaming.api.transformations.StreamTransformation;
 import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.types.Row;
 
 import com.datastax.driver.core.Cluster;
 
@@ -205,12 +207,16 @@ public class CassandraSink<IN> {
 	 * @param <IN>  input type
 	 * @return CassandraSinkBuilder, to further configure the sink
 	 */
-	public static <IN, T extends Tuple> CassandraSinkBuilder<IN> addSink(DataStream<IN> input) {
+	public static <IN> CassandraSinkBuilder<IN> addSink(DataStream<IN> input) {
 		TypeInformation<IN> typeInfo = input.getType();
 		if (typeInfo instanceof TupleTypeInfo) {
-			DataStream<T> tupleInput = (DataStream<T>) input;
+			DataStream<Tuple> tupleInput = (DataStream<Tuple>) input;
 			return (CassandraSinkBuilder<IN>) new CassandraTupleSinkBuilder<>(tupleInput, tupleInput.getType(), tupleInput.getType().createSerializer(tupleInput.getExecutionEnvironment().getConfig()));
 		}
+		if (typeInfo instanceof RowTypeInfo) {
+			DataStream<Row> rowInput = (DataStream<Row>) input;
+			return (CassandraSinkBuilder<IN>) new CassandraRowSinkBuilder(rowInput, rowInput.getType(), rowInput.getType().createSerializer(rowInput.getExecutionEnvironment().getConfig()));
+		}
 		if (typeInfo instanceof PojoTypeInfo) {
 			return new CassandraPojoSinkBuilder<>(input, input.getType(), input.getType().createSerializer(input.getExecutionEnvironment().getConfig()));
 		}
@@ -391,6 +397,36 @@ public class CassandraSink<IN> {
 	}
 
 	/**
+	 * Builder for a {@link CassandraRowSink}.
+	 */
+	public static class CassandraRowSinkBuilder extends CassandraSinkBuilder<Row> {
+		public CassandraRowSinkBuilder(DataStream<Row> input, TypeInformation<Row> typeInfo, TypeSerializer<Row> serializer) {
+			super(input, typeInfo, serializer);
+		}
+
+		@Override
+		protected void sanityCheck() {
+			super.sanityCheck();
+			if (query == null || query.length() == 0) {
+				throw new IllegalArgumentException("Query must not be null or empty.");
+			}
+		}
+
+		@Override
+		protected CassandraSink<Row> createSink() throws Exception {
+			return new CassandraSink<>(input.addSink(new CassandraRowSink(typeInfo.getArity(), query, builder)).name("Cassandra Sink"));
+
+		}
+
+		@Override
+		protected CassandraSink<Row> createWriteAheadSink() throws Exception {
+			return committer == null
+				? new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraRowWriteAheadSink(query, serializer, builder, new CassandraCommitter(builder))))
+				: new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraRowWriteAheadSink(query, serializer, builder, committer)));
+		}
+	}
+
+	/**
 	 * Builder for a {@link CassandraPojoSink}.
 	 * @param <IN>
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/1809cad6/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index f52a42c..f1b598f 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -35,15 +35,17 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
 import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
+import org.apache.flink.table.api.StreamTableEnvironment;
+import org.apache.flink.types.Row;
 
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.ConsistencyLevel;
 import com.datastax.driver.core.QueryOptions;
 import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import org.apache.cassandra.service.CassandraDaemon;
 import org.junit.AfterClass;
@@ -58,6 +60,7 @@ import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -83,11 +86,15 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 
 	private static EmbeddedCassandraService cassandra;
 
+	private static final String HOST = "127.0.0.1";
+
+	private static final int PORT = 9042;
+
 	private static ClusterBuilder builder = new ClusterBuilder() {
 		@Override
 		protected Cluster buildCluster(Cluster.Builder builder) {
 			return builder
-				.addContactPoint("127.0.0.1")
+				.addContactPointsWithPorts(new InetSocketAddress(HOST, PORT))
 				.withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.ONE).setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL))
 				.withoutJMXReporting()
 				.withoutMetrics().build();
@@ -108,10 +115,16 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 	private int tableID;
 
 	private static final ArrayList<Tuple3<String, Integer, Integer>> collection = new ArrayList<>(20);
+	private static final ArrayList<Row> rowCollection = new ArrayList<>(20);
+
+	private static final String[] FIELD_NAMES = {"id", "counter", "batch_id"};
+	private static final TypeInformation[] FIELD_TYPES = {
+		BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO};
 
 	static {
 		for (int i = 0; i < 20; i++) {
 			collection.add(new Tuple3<>(UUID.randomUUID().toString(), i, 0));
+			rowCollection.add(Row.of(UUID.randomUUID().toString(), i, 0));
 		}
 	}
 
@@ -245,7 +258,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 			list.add(x);
 		}
 
-		for (Row s : result) {
+		for (com.datastax.driver.core.Row s : result) {
 			list.remove(new Integer(s.getInt("counter")));
 		}
 		Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty());
@@ -260,7 +273,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 			list.add(x);
 		}
 
-		for (Row s : result) {
+		for (com.datastax.driver.core.Row s : result) {
 			list.remove(new Integer(s.getInt("counter")));
 		}
 		Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty());
@@ -278,7 +291,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 			list.add(x);
 		}
 
-		for (Row s : result) {
+		for (com.datastax.driver.core.Row s : result) {
 			list.remove(new Integer(s.getInt("counter")));
 		}
 		Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty());
@@ -300,7 +313,8 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 
 		ArrayList<Integer> actual = new ArrayList<>();
 		ResultSet result = session.execute(injectTableName(SELECT_DATA_QUERY));
-		for (Row s : result) {
+
+		for (com.datastax.driver.core.Row s : result) {
 			actual.add(s.getInt("counter"));
 		}
 
@@ -380,6 +394,22 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 	}
 
 	@Test
+	public void testCassandraRowAtLeastOnceSink() throws Exception {
+		CassandraRowSink sink = new CassandraRowSink(FIELD_TYPES.length, injectTableName(INSERT_DATA_QUERY), builder);
+
+		sink.open(new Configuration());
+
+		for (Row value : rowCollection) {
+			sink.send(value);
+		}
+
+		sink.close();
+
+		ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY));
+		Assert.assertEquals(20, rs.all().size());
+	}
+
+	@Test
 	public void testCassandraPojoAtLeastOnceSink() throws Exception {
 		session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, "test"));
 
@@ -398,6 +428,35 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 	}
 
 	@Test
+	public void testCassandraTableSink() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(4);
+		StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(env);
+
+		DataStreamSource<Row> source = env.fromCollection(rowCollection);
+
+		tEnv.registerDataStreamInternal("testFlinkTable", source);
+
+		tEnv.sql("select * from testFlinkTable").writeToSink(
+			new CassandraAppendTableSink(builder, injectTableName(INSERT_DATA_QUERY)));
+
+		env.execute();
+		ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY));
+
+		// validate that all input was correctly written to Cassandra
+		List<Row> input = new ArrayList<>(rowCollection);
+		List<com.datastax.driver.core.Row> output = rs.all();
+		for (com.datastax.driver.core.Row o : output) {
+			Row cmp = new Row(3);
+			cmp.setField(0, o.getString(0));
+			cmp.setField(1, o.getInt(2));
+			cmp.setField(2, o.getInt(1));
+			Assert.assertTrue("Row " + cmp + " was written to Cassandra but not in input.", input.remove(cmp));
+		}
+		Assert.assertTrue("The input data was not completely written to Cassandra", input.isEmpty());
+	}
+
+	@Test
 	public void testCassandraBatchFormats() throws Exception {
 		OutputFormat<Tuple3<String, Integer, Integer>> sink = new CassandraOutputFormat<>(injectTableName(INSERT_DATA_QUERY), builder);
 		sink.configure(new Configuration());
@@ -465,10 +524,10 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 		sink.close();
 
 		ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY));
-		List<Row> rows = rs.all();
+		List<com.datastax.driver.core.Row> rows = rs.all();
 		Assert.assertEquals(scalaTupleCollection.size(), rows.size());
 
-		for (Row row : rows) {
+		for (com.datastax.driver.core.Row row : rows) {
 			scalaTupleCollection.remove(new scala.Tuple3<>(row.getString("id"), row.getInt("counter"), row.getInt("batch_id")));
 		}
 		Assert.assertEquals(0, scalaTupleCollection.size());

http://git-wip-us.apache.org/repos/asf/flink/blob/1809cad6/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
index bd08b04..7f9cc21 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
@@ -50,11 +50,14 @@ public final class RowSerializer extends TypeSerializer<Row> {
 
 	private final TypeSerializer<Object>[] fieldSerializers;
 
+	private final int arity;
+
 	private transient boolean[] nullMask;
 
 	@SuppressWarnings("unchecked")
 	public RowSerializer(TypeSerializer<?>[] fieldSerializers) {
 		this.fieldSerializers = (TypeSerializer<Object>[]) checkNotNull(fieldSerializers);
+		this.arity = fieldSerializers.length;
 		this.nullMask = new boolean[fieldSerializers.length];
 	}
 
@@ -135,6 +138,10 @@ public final class RowSerializer extends TypeSerializer<Row> {
 		return -1;
 	}
 
+	public int getArity() {
+		return arity;
+	}
+
 	@Override
 	public void serialize(Row record, DataOutputView target) throws IOException {
 		int len = fieldSerializers.length;

http://git-wip-us.apache.org/repos/asf/flink/blob/1809cad6/tools/maven/suppressions.xml
----------------------------------------------------------------------
diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml
index d897137..5d5c455 100644
--- a/tools/maven/suppressions.xml
+++ b/tools/maven/suppressions.xml
@@ -30,7 +30,7 @@ under the License.
 
 		<!-- Cassandra connectors have to use guava directly -->
 		<suppress
-			files="AbstractCassandraTupleSink.java|CassandraInputFormat.java|CassandraOutputFormat.java|CassandraSinkBase.java|CassandraPojoSink.java|CassandraTupleWriteAheadSink.java"
+			files="AbstractCassandraTupleSink.java|CassandraInputFormat.java|CassandraOutputFormat.java|CassandraSinkBase.java|CassandraPojoSink.java|CassandraRowSink.java|CassandraTupleWriteAheadSink.java|CassandraRowWriteAheadSink.java"
 			checks="IllegalImport"/>
 		<!-- Kinesis producer has to use guava directly -->
 		<suppress