You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/25 18:49:45 UTC

[4/5] flink git commit: [FLINK-4497] [cassandra] Scala Case Classes / Tuple support

[FLINK-4497] [cassandra] Scala Case Classes / Tuple support

This closes #2633.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8dfc9f9d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8dfc9f9d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8dfc9f9d

Branch: refs/heads/master
Commit: 8dfc9f9d0f06e4ea2376b0a58efd623e58735ae5
Parents: d4fba3b
Author: zentol <ch...@apache.org>
Authored: Mon Oct 10 15:30:48 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu May 25 15:58:51 2017 +0200

----------------------------------------------------------------------
 .../flink-connector-cassandra/pom.xml           | 11 +++
 .../cassandra/AbstractCassandraTupleSink.java   | 52 +++++++++++
 .../cassandra/CassandraScalaProductSink.java    | 41 +++++++++
 .../connectors/cassandra/CassandraSink.java     | 91 ++++++++++++++++----
 .../connectors/cassandra/CassandraSinkBase.java | 15 ++--
 .../cassandra/CassandraTupleSink.java           | 27 +-----
 .../cassandra/CassandraConnectorITCase.java     | 55 ++++++++++++
 7 files changed, 245 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8dfc9f9d/flink-connectors/flink-connector-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/pom.xml b/flink-connectors/flink-connector-cassandra/pom.xml
index 2722c30..4a720c4 100644
--- a/flink-connectors/flink-connector-cassandra/pom.xml
+++ b/flink-connectors/flink-connector-cassandra/pom.xml
@@ -99,6 +99,12 @@ under the License.
 			<scope>provided</scope>
 		</dependency>
 		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-scala_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
 			<groupId>com.datastax.cassandra</groupId>
 			<artifactId>cassandra-driver-core</artifactId>
 			<version>${driver.version}</version>
@@ -129,6 +135,11 @@ under the License.
 			</exclusions>
 		</dependency>
 		<dependency>
+			<groupId>org.scala-lang</groupId>
+			<artifactId>scala-library</artifactId>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
 			<groupId>com.google.guava</groupId>
 			<artifactId>guava</artifactId>
 			<version>${guava.version}</version>

http://git-wip-us.apache.org/repos/asf/flink/blob/8dfc9f9d/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java
new file mode 100644
index 0000000..7a8d097
--- /dev/null
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Abstract sink to write tuple-like values into a Cassandra cluster.
+ *
+ * @param <IN> Type of the elements emitted by this sink
+ */
+public abstract class AbstractCassandraTupleSink<IN> extends CassandraSinkBase<IN, ResultSet> {
+	private final String insertQuery;
+	private transient PreparedStatement ps;
+
+	public AbstractCassandraTupleSink(String insertQuery, ClusterBuilder builder) {
+		super(builder);
+		this.insertQuery = insertQuery;
+	}
+
+	@Override
+	public void open(Configuration configuration) {
+		super.open(configuration);
+		this.ps = session.prepare(insertQuery);
+	}
+
+	@Override
+	public ListenableFuture<ResultSet> send(IN value) {
+		Object[] fields = extract(value);
+		return session.executeAsync(ps.bind(fields));
+	}
+
+	protected abstract Object[] extract(IN record);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8dfc9f9d/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java
new file mode 100644
index 0000000..a975985
--- /dev/null
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+
+import scala.Product;
+
+/**
+ * Sink to write scala tuples and case classes into a Cassandra cluster.
+ *
+ * @param <IN> Type of the elements emitted by this sink, it must extend {@link Product}
+ */
+public class CassandraScalaProductSink<IN extends Product> extends AbstractCassandraTupleSink<IN> {
+	public CassandraScalaProductSink(String insertQuery, ClusterBuilder builder) {
+		super(insertQuery, builder);
+	}
+
+	@Override
+	protected Object[] extract(IN record) {
+		Object[] al = new Object[record.productArity()];
+		for (int i = 0; i < record.productArity(); i++) {
+			al[i] = record.productElement(i);
+		}
+		return al;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8dfc9f9d/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
index 9f0079f..6a33601 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
@@ -22,7 +22,9 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -30,6 +32,7 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.transformations.SinkTransformation;
 import org.apache.flink.streaming.api.transformations.StreamTransformation;
 import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import scala.Product;
 
 /**
  * This class wraps different Cassandra sink implementations to provide a common interface for all of them.
@@ -190,13 +193,31 @@ public class CassandraSink<IN> {
 	 * @param <IN>  input type
 	 * @return CassandraSinkBuilder, to further configure the sink
 	 */
+	public static <IN> CassandraSinkBuilder<IN> addSink(org.apache.flink.streaming.api.scala.DataStream<IN> input) {
+		return addSink(input.javaStream());
+	}
+
+	/**
+	 * Writes a DataStream into a Cassandra database.
+	 *
+	 * @param input input DataStream
+	 * @param <IN>  input type
+	 * @return CassandraSinkBuilder, to further configure the sink
+	 */
 	public static <IN, T extends Tuple> CassandraSinkBuilder<IN> addSink(DataStream<IN> input) {
-		if (input.getType() instanceof TupleTypeInfo) {
+		TypeInformation<IN> typeInfo = input.getType();
+		if (typeInfo instanceof TupleTypeInfo) {
 			DataStream<T> tupleInput = (DataStream<T>) input;
 			return (CassandraSinkBuilder<IN>) new CassandraTupleSinkBuilder<>(tupleInput, tupleInput.getType(), tupleInput.getType().createSerializer(tupleInput.getExecutionEnvironment().getConfig()));
-		} else {
+		}
+		if (typeInfo instanceof PojoTypeInfo) {
 			return new CassandraPojoSinkBuilder<>(input, input.getType(), input.getType().createSerializer(input.getExecutionEnvironment().getConfig()));
 		}
+		if (typeInfo instanceof CaseClassTypeInfo) {
+			DataStream<Product> productInput = (DataStream<Product>) input;
+			return (CassandraSinkBuilder<IN>) new CassandraScalaProductSinkBuilder<>(productInput, productInput.getType(), productInput.getType().createSerializer(input.getExecutionEnvironment().getConfig()));
+		}
+		throw new IllegalArgumentException("No support for the type of the given DataStream: " + input.getType());
 	}
 
 	public abstract static class CassandraSinkBuilder<IN> {
@@ -300,7 +321,16 @@ public class CassandraSink<IN> {
 		 * @return finalized sink
 		 * @throws Exception
 		 */
-		public abstract CassandraSink<IN> build() throws Exception;
+		public CassandraSink<IN> build() throws Exception {
+			sanityCheck();
+			return isWriteAheadLogEnabled
+				? createWriteAheadSink()
+				: createSink();
+		}
+		
+		protected abstract CassandraSink<IN> createSink() throws Exception;
+
+		protected abstract CassandraSink<IN> createWriteAheadSink() throws Exception;
 
 		protected void sanityCheck() {
 			if (builder == null) {
@@ -323,15 +353,15 @@ public class CassandraSink<IN> {
 		}
 
 		@Override
-		public CassandraSink<IN> build() throws Exception {
-			sanityCheck();
-			if (isWriteAheadLogEnabled) {
-				return committer == null
-					? new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraTupleWriteAheadSink<>(query, serializer, builder, new CassandraCommitter(builder))))
-					: new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraTupleWriteAheadSink<>(query, serializer, builder, committer)));
-			} else {
-				return new CassandraSink<>(input.addSink(new CassandraTupleSink<IN>(query, builder)).name("Cassandra Sink"));
-			}
+		public CassandraSink<IN> createSink() throws Exception {
+			return new CassandraSink<>(input.addSink(new CassandraTupleSink<IN>(query, builder)).name("Cassandra Sink"));
+		}
+
+		@Override
+		protected CassandraSink<IN> createWriteAheadSink() throws Exception {
+			return committer == null
+				? new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraTupleWriteAheadSink<>(query, serializer, builder, new CassandraCommitter(builder))))
+				: new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraTupleWriteAheadSink<>(query, serializer, builder, committer)));
 		}
 	}
 
@@ -349,13 +379,38 @@ public class CassandraSink<IN> {
 		}
 
 		@Override
-		public CassandraSink<IN> build() throws Exception {
-			sanityCheck();
-			if (isWriteAheadLogEnabled) {
-				throw new IllegalArgumentException("Exactly-once guarantees can only be provided for tuple types.");
-			} else {
-				return new CassandraSink<>(input.addSink(new CassandraPojoSink<>(typeInfo.getTypeClass(), builder)).name("Cassandra Sink"));
+		public CassandraSink<IN> createSink() throws Exception {
+			return new CassandraSink<>(input.addSink(new CassandraPojoSink<>(typeInfo.getTypeClass(), builder)).name("Cassandra Sink"));
+		}
+
+		@Override
+		protected CassandraSink<IN> createWriteAheadSink() throws Exception {
+			throw new IllegalArgumentException("Exactly-once guarantees can only be provided for tuple types.");
+		}
+	}
+
+	public static class CassandraScalaProductSinkBuilder<IN extends Product> extends CassandraSinkBuilder<IN> {
+
+		public CassandraScalaProductSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
+			super(input, typeInfo, serializer);
+		}
+
+		@Override
+		protected void sanityCheck() {
+			super.sanityCheck();
+			if (query == null || query.length() == 0) {
+				throw new IllegalArgumentException("Query must not be null or empty.");
 			}
 		}
+
+		@Override
+		public CassandraSink<IN> createSink() throws Exception {
+			return new CassandraSink<>(input.addSink(new CassandraScalaProductSink<IN>(query, builder)).name("Cassandra Sink"));
+		}
+
+		@Override
+		protected CassandraSink<IN> createWriteAheadSink() throws Exception {
+			throw new IllegalArgumentException("Exactly-once guarantees can only be provided for flink tuple types.");
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8dfc9f9d/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
index b281525..b1b261e 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
@@ -37,7 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  * @param <IN> Type of the elements emitted by this sink
  */
 public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
-	protected static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBase.class);
+	protected final Logger log = LoggerFactory.getLogger(getClass());
 	protected transient Cluster cluster;
 	protected transient Session session;
 
@@ -48,7 +48,7 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
 
 	private final AtomicInteger updatesPending = new AtomicInteger();
 
-	protected CassandraSinkBase(ClusterBuilder builder) {
+	CassandraSinkBase(ClusterBuilder builder) {
 		this.builder = builder;
 		ClosureCleaner.clean(builder, true);
 	}
@@ -76,7 +76,7 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
 				}
 				exception = t;
 				
-				LOG.error("Error while sending value.", t);
+				log.error("Error while sending value.", t);
 			}
 		};
 		this.cluster = builder.getCluster();
@@ -107,21 +107,24 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
 					updatesPending.wait();
 				}
 			}
-			
+
+			if (exception != null) {
+				throw new IOException("Error while sending value.", exception);
+			}
 		} finally {
 			try {
 				if (session != null) {
 					session.close();
 				}
 			} catch (Exception e) {
-				LOG.error("Error while closing session.", e);
+				log.error("Error while closing session.", e);
 			}
 			try {
 				if (cluster != null) {
 					cluster.close();
 				}
 			} catch (Exception e) {
-				LOG.error("Error while closing cluster.", e);
+				log.error("Error while closing cluster.", e);
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8dfc9f9d/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
index 0a9ef06..a7ec1df 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
@@ -17,39 +17,20 @@
 
 package org.apache.flink.streaming.connectors.cassandra;
 
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
-import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.configuration.Configuration;
 
 /**
- * Flink Sink to save data into a Cassandra cluster.
+ * Sink to write Flink {@link Tuple}s into a Cassandra cluster.
  *
  * @param <IN> Type of the elements emitted by this sink, it must extend {@link Tuple}
  */
-public class CassandraTupleSink<IN extends Tuple> extends CassandraSinkBase<IN, ResultSet> {
-	private final String insertQuery;
-	private transient PreparedStatement ps;
-
+public class CassandraTupleSink<IN extends Tuple> extends AbstractCassandraTupleSink<IN> {
 	public CassandraTupleSink(String insertQuery, ClusterBuilder builder) {
-		super(builder);
-		this.insertQuery = insertQuery;
-	}
-
-	@Override
-	public void open(Configuration configuration) {
-		super.open(configuration);
-		this.ps = session.prepare(insertQuery);
+		super(insertQuery, builder);
 	}
 
 	@Override
-	public ListenableFuture<ResultSet> send(IN value) {
-		Object[] fields = extract(value);
-		return session.executeAsync(ps.bind(fields));
-	}
-
-	private Object[] extract(IN record) {
+	protected Object[] extract(IN record) {
 		Object[] al = new Object[record.getArity()];
 		for (int i = 0; i < record.getArity(); i++) {
 			al[i] = record.getField(i);

http://git-wip-us.apache.org/repos/asf/flink/blob/8dfc9f9d/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index 06f3c35..95cd86c 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -31,14 +31,20 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo;
 import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
 import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
 
 import org.junit.AfterClass;
@@ -49,6 +55,8 @@ import org.junit.Test;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
 
 import java.io.BufferedWriter;
 import java.io.File;
@@ -420,4 +428,51 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 	private String injectTableName(String target) {
 		return target.replace(TABLE_NAME_VARIABLE, TABLE_NAME_PREFIX + tableID);
 	}
+
+	@Test
+	public void testCassandraScalaTupleAtLeastOnceSinkBuilderDetection() throws Exception {
+		Class<scala.Tuple1<String>> c = (Class<scala.Tuple1<String>>) new scala.Tuple1<>("hello").getClass();
+		Seq<TypeInformation<?>> typeInfos = JavaConverters.asScalaBufferConverter(
+			Collections.<TypeInformation<?>>singletonList(BasicTypeInfo.STRING_TYPE_INFO)).asScala();
+		Seq<String> fieldNames = JavaConverters.asScalaBufferConverter(
+			Collections.singletonList("_1")).asScala();
+
+		CaseClassTypeInfo<scala.Tuple1<String>> typeInfo = new CaseClassTypeInfo<scala.Tuple1<String>>(c, null, typeInfos, fieldNames) {
+			@Override
+			public TypeSerializer<scala.Tuple1<String>> createSerializer(ExecutionConfig config) {
+				return null;
+			}
+		};
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		DataStream<scala.Tuple1<String>> input = env.fromElements(new scala.Tuple1<>("hello"));
+
+		CassandraSink.CassandraSinkBuilder<scala.Tuple1<String>> sinkBuilder = CassandraSink.addSink(input);
+		assertTrue(sinkBuilder instanceof CassandraSink.CassandraScalaProductSinkBuilder);
+	}
+
+	@Test
+	public void testCassandraScalaTupleAtLeastSink() throws Exception {
+		CassandraScalaProductSink<scala.Tuple3<String, Integer, Integer>> sink = new CassandraScalaProductSink<>(INSERT_DATA_QUERY, builder);
+
+		List<scala.Tuple3<String, Integer, Integer>> scalaTupleCollection = new ArrayList<>(20);
+		for (int i = 0; i < 20; i++) {
+			scalaTupleCollection.add(new scala.Tuple3<>(UUID.randomUUID().toString(), i, 0));
+		}
+
+		sink.open(new Configuration());
+		for (scala.Tuple3<String, Integer, Integer> value : scalaTupleCollection) {
+			sink.invoke(value);
+		}
+		sink.close();
+
+		ResultSet rs = session.execute(SELECT_DATA_QUERY);
+		List<Row> rows = rs.all();
+		assertEquals(scalaTupleCollection.size(), rows.size());
+
+		for (Row row : rows) {
+			scalaTupleCollection.remove(new scala.Tuple3<>(row.getString("id"), row.getInt("counter"), row.getInt("batch_id")));
+		}
+		assertEquals(0, scalaTupleCollection.size());
+	}
 }