You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/05/29 11:19:14 UTC

[3/6] flink git commit: [FLINK-8655][cassandra] Support default keyspace for POJOs

[FLINK-8655][cassandra] Support default keyspace for POJOs

This closes #5538.
This closes #5964.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0861a784
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0861a784
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0861a784

Branch: refs/heads/master
Commit: 0861a784469662840fc599361b6e3910bb30de38
Parents: 7c90447
Author: Clément Tamisier <cl...@gmail.com>
Authored: Sun May 6 17:04:18 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue May 29 09:33:50 2018 +0200

----------------------------------------------------------------------
 .../connectors/cassandra/CassandraPojoSink.java | 18 ++++-
 .../connectors/cassandra/CassandraSink.java     | 23 ++++++-
 .../connectors/cassandra/CassandraSinkBase.java |  6 +-
 .../cassandra/CassandraConnectorITCase.java     | 19 ++++++
 .../cassandra/PojoNoAnnotatedKeyspace.java      | 69 ++++++++++++++++++++
 5 files changed, 132 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0861a784/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
index e060ce0..da70da1 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.cassandra;
 import org.apache.flink.configuration.Configuration;
 
 import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
 import com.datastax.driver.mapping.Mapper;
 import com.datastax.driver.mapping.MappingManager;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -41,6 +42,7 @@ public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, ResultSet> {
 
 	protected final Class<IN> clazz;
 	private final MapperOptions options;
+	private final String keyspace;
 	protected transient Mapper<IN> mapper;
 	protected transient MappingManager mappingManager;
 
@@ -50,13 +52,22 @@ public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, ResultSet> {
 	 * @param clazz Class instance
 	 */
 	public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder) {
-		this(clazz, builder, null);
+		this(clazz, builder, null, null);
 	}
 
 	public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder, @Nullable MapperOptions options) {
+		this(clazz, builder, options, null);
+	}
+
+	public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder, String keyspace) {
+		this(clazz, builder, null, keyspace);
+	}
+
+	public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder, @Nullable MapperOptions options, String keyspace) {
 		super(builder);
 		this.clazz = clazz;
 		this.options = options;
+		this.keyspace = keyspace;
 	}
 
 	@Override
@@ -77,6 +88,11 @@ public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, ResultSet> {
 	}
 
 	@Override
+	protected Session createSession() {
+		return cluster.connect(keyspace);
+	}
+
+	@Override
 	public ListenableFuture<ResultSet> send(IN value) {
 		return session.executeAsync(mapper.saveQuery(value));
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0861a784/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
index 3543378..5d5e9ef 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
@@ -236,6 +236,7 @@ public class CassandraSink<IN> {
 		protected final TypeSerializer<IN> serializer;
 		protected final TypeInformation<IN> typeInfo;
 		protected ClusterBuilder builder;
+		protected String keyspace;
 		protected MapperOptions mapperOptions;
 		protected String query;
 		protected CheckpointCommitter committer;
@@ -259,6 +260,17 @@ public class CassandraSink<IN> {
 		}
 
 		/**
+		 * Sets the keyspace to be used.
+		 *
+		 * @param keyspace keyspace to use
+		 * @return this builder
+		 */
+		public CassandraSinkBuilder<IN> setDefaultKeyspace(String keyspace) {
+			this.keyspace = keyspace;
+			return this;
+		}
+
+		/**
 		 * Sets the cassandra host to connect to.
 		 *
 		 * @param host host to connect to
@@ -381,6 +393,9 @@ public class CassandraSink<IN> {
 			if (query == null || query.length() == 0) {
 				throw new IllegalArgumentException("Query must not be null or empty.");
 			}
+			if (keyspace != null) {
+				throw new IllegalArgumentException("Specifying a default keyspace is only allowed when using a Pojo-Stream as input.");
+			}
 		}
 
 		@Override
@@ -410,6 +425,9 @@ public class CassandraSink<IN> {
 			if (query == null || query.length() == 0) {
 				throw new IllegalArgumentException("Query must not be null or empty.");
 			}
+			if (keyspace != null) {
+				throw new IllegalArgumentException("Specifying a default keyspace is only allowed when using a Pojo-Stream as input.");
+			}
 		}
 
 		@Override
@@ -445,7 +463,7 @@ public class CassandraSink<IN> {
 
 		@Override
 		public CassandraSink<IN> createSink() throws Exception {
-			return new CassandraSink<>(input.addSink(new CassandraPojoSink<>(typeInfo.getTypeClass(), builder, mapperOptions)).name("Cassandra Sink"));
+			return new CassandraSink<>(input.addSink(new CassandraPojoSink<>(typeInfo.getTypeClass(), builder, mapperOptions, keyspace)).name("Cassandra Sink"));
 		}
 
 		@Override
@@ -470,6 +488,9 @@ public class CassandraSink<IN> {
 			if (query == null || query.length() == 0) {
 				throw new IllegalArgumentException("Query must not be null or empty.");
 			}
+			if (keyspace != null) {
+				throw new IllegalArgumentException("Specifying a default keyspace is only allowed when using a Pojo-Stream as input.");
+			}
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0861a784/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
index 6d1b095..d2ba8e5 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
@@ -85,7 +85,11 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> impl
 			}
 		};
 		this.cluster = builder.getCluster();
-		this.session = cluster.connect();
+		this.session = createSession();
+	}
+
+	protected Session createSession() {
+		return cluster.connect();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0861a784/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index 4444fa6..cb5767a 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -430,6 +430,25 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 	}
 
 	@Test
+	public void testCassandraPojoNoAnnotatedKeyspaceAtLeastOnceSink() throws Exception {
+		session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, "testPojoNoAnnotatedKeyspace"));
+
+		CassandraPojoSink<PojoNoAnnotatedKeyspace> sink = new CassandraPojoSink<>(PojoNoAnnotatedKeyspace.class, builder, "flink");
+
+		Configuration configuration = new Configuration();
+		sink.open(configuration);
+
+		for (int x = 0; x < 20; x++) {
+			sink.send(new PojoNoAnnotatedKeyspace(UUID.randomUUID().toString(), x, 0));
+		}
+
+		sink.close();
+
+		ResultSet rs = session.execute(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, "testPojoNoAnnotatedKeyspace"));
+		Assert.assertEquals(20, rs.all().size());
+	}
+
+	@Test
 	public void testCassandraTableSink() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(4);

http://git-wip-us.apache.org/repos/asf/flink/blob/0861a784/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/PojoNoAnnotatedKeyspace.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/PojoNoAnnotatedKeyspace.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/PojoNoAnnotatedKeyspace.java
new file mode 100644
index 0000000..ab8844d
--- /dev/null
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/PojoNoAnnotatedKeyspace.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.mapping.annotations.Column;
+import com.datastax.driver.mapping.annotations.Table;
+
+import java.io.Serializable;
+
+/**
+ * Test Pojo with DataStax annotations used (no keyspace).
+ */
+@Table(name = "testPojoNoAnnotatedKeyspace")
+public class PojoNoAnnotatedKeyspace implements Serializable {
+
+	private static final long serialVersionUID = 1038054554690916991L;
+
+	@Column(name = "id")
+	private String id;
+	@Column(name = "counter")
+	private int counter;
+	@Column(name = "batch_id")
+	private int batchID;
+
+	public PojoNoAnnotatedKeyspace(String id, int counter, int batchID) {
+		this.id = id;
+		this.counter = counter;
+		this.batchID = batchID;
+	}
+
+	public String getId() {
+		return id;
+	}
+
+	public void setId(String id) {
+		this.id = id;
+	}
+
+	public int getCounter() {
+		return counter;
+	}
+
+	public void setCounter(int counter) {
+		this.counter = counter;
+	}
+
+	public int getBatchID() {
+		return batchID;
+	}
+
+	public void setBatchID(int batchId) {
+		this.batchID = batchId;
+	}
+}