You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/05/29 11:19:14 UTC
[3/6] flink git commit: [FLINK-8655][cassandra] Support default
keyspace for POJOs
[FLINK-8655][cassandra] Support default keyspace for POJOs
This closes #5538.
This closes #5964.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0861a784
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0861a784
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0861a784
Branch: refs/heads/master
Commit: 0861a784469662840fc599361b6e3910bb30de38
Parents: 7c90447
Author: Clément Tamisier <cl...@gmail.com>
Authored: Sun May 6 17:04:18 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue May 29 09:33:50 2018 +0200
----------------------------------------------------------------------
.../connectors/cassandra/CassandraPojoSink.java | 18 ++++-
.../connectors/cassandra/CassandraSink.java | 23 ++++++-
.../connectors/cassandra/CassandraSinkBase.java | 6 +-
.../cassandra/CassandraConnectorITCase.java | 19 ++++++
.../cassandra/PojoNoAnnotatedKeyspace.java | 69 ++++++++++++++++++++
5 files changed, 132 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0861a784/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
index e060ce0..da70da1 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.cassandra;
import org.apache.flink.configuration.Configuration;
import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
import com.datastax.driver.mapping.Mapper;
import com.datastax.driver.mapping.MappingManager;
import com.google.common.util.concurrent.ListenableFuture;
@@ -41,6 +42,7 @@ public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, ResultSet> {
protected final Class<IN> clazz;
private final MapperOptions options;
+ private final String keyspace;
protected transient Mapper<IN> mapper;
protected transient MappingManager mappingManager;
@@ -50,13 +52,22 @@ public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, ResultSet> {
* @param clazz Class instance
*/
public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder) {
- this(clazz, builder, null);
+ this(clazz, builder, null, null);
}
public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder, @Nullable MapperOptions options) {
+ this(clazz, builder, options, null);
+ }
+
+ public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder, String keyspace) {
+ this(clazz, builder, null, keyspace);
+ }
+
+ public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder, @Nullable MapperOptions options, String keyspace) {
super(builder);
this.clazz = clazz;
this.options = options;
+ this.keyspace = keyspace;
}
@Override
@@ -77,6 +88,11 @@ public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, ResultSet> {
}
@Override
+ protected Session createSession() {
+ return cluster.connect(keyspace);
+ }
+
+ @Override
public ListenableFuture<ResultSet> send(IN value) {
return session.executeAsync(mapper.saveQuery(value));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0861a784/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
index 3543378..5d5e9ef 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
@@ -236,6 +236,7 @@ public class CassandraSink<IN> {
protected final TypeSerializer<IN> serializer;
protected final TypeInformation<IN> typeInfo;
protected ClusterBuilder builder;
+ protected String keyspace;
protected MapperOptions mapperOptions;
protected String query;
protected CheckpointCommitter committer;
@@ -259,6 +260,17 @@ public class CassandraSink<IN> {
}
/**
+ * Sets the keyspace to be used.
+ *
+ * @param keyspace keyspace to use
+ * @return this builder
+ */
+ public CassandraSinkBuilder<IN> setDefaultKeyspace(String keyspace) {
+ this.keyspace = keyspace;
+ return this;
+ }
+
+ /**
* Sets the cassandra host to connect to.
*
* @param host host to connect to
@@ -381,6 +393,9 @@ public class CassandraSink<IN> {
if (query == null || query.length() == 0) {
throw new IllegalArgumentException("Query must not be null or empty.");
}
+ if (keyspace != null) {
+ throw new IllegalArgumentException("Specifying a default keyspace is only allowed when using a Pojo-Stream as input.");
+ }
}
@Override
@@ -410,6 +425,9 @@ public class CassandraSink<IN> {
if (query == null || query.length() == 0) {
throw new IllegalArgumentException("Query must not be null or empty.");
}
+ if (keyspace != null) {
+ throw new IllegalArgumentException("Specifying a default keyspace is only allowed when using a Pojo-Stream as input.");
+ }
}
@Override
@@ -445,7 +463,7 @@ public class CassandraSink<IN> {
@Override
public CassandraSink<IN> createSink() throws Exception {
- return new CassandraSink<>(input.addSink(new CassandraPojoSink<>(typeInfo.getTypeClass(), builder, mapperOptions)).name("Cassandra Sink"));
+ return new CassandraSink<>(input.addSink(new CassandraPojoSink<>(typeInfo.getTypeClass(), builder, mapperOptions, keyspace)).name("Cassandra Sink"));
}
@Override
@@ -470,6 +488,9 @@ public class CassandraSink<IN> {
if (query == null || query.length() == 0) {
throw new IllegalArgumentException("Query must not be null or empty.");
}
+ if (keyspace != null) {
+ throw new IllegalArgumentException("Specifying a default keyspace is only allowed when using a Pojo-Stream as input.");
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/0861a784/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
index 6d1b095..d2ba8e5 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
@@ -85,7 +85,11 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> impl
}
};
this.cluster = builder.getCluster();
- this.session = cluster.connect();
+ this.session = createSession();
+ }
+
+ protected Session createSession() {
+ return cluster.connect();
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/0861a784/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index 4444fa6..cb5767a 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -430,6 +430,25 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
}
@Test
+ public void testCassandraPojoNoAnnotatedKeyspaceAtLeastOnceSink() throws Exception {
+ session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, "testPojoNoAnnotatedKeyspace"));
+
+ CassandraPojoSink<PojoNoAnnotatedKeyspace> sink = new CassandraPojoSink<>(PojoNoAnnotatedKeyspace.class, builder, "flink");
+
+ Configuration configuration = new Configuration();
+ sink.open(configuration);
+
+ for (int x = 0; x < 20; x++) {
+ sink.send(new PojoNoAnnotatedKeyspace(UUID.randomUUID().toString(), x, 0));
+ }
+
+ sink.close();
+
+ ResultSet rs = session.execute(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, "testPojoNoAnnotatedKeyspace"));
+ Assert.assertEquals(20, rs.all().size());
+ }
+
+ @Test
public void testCassandraTableSink() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
http://git-wip-us.apache.org/repos/asf/flink/blob/0861a784/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/PojoNoAnnotatedKeyspace.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/PojoNoAnnotatedKeyspace.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/PojoNoAnnotatedKeyspace.java
new file mode 100644
index 0000000..ab8844d
--- /dev/null
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/PojoNoAnnotatedKeyspace.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.mapping.annotations.Column;
+import com.datastax.driver.mapping.annotations.Table;
+
+import java.io.Serializable;
+
+/**
+ * Test Pojo with DataStax annotations used (no keyspace).
+ */
+@Table(name = "testPojoNoAnnotatedKeyspace")
+public class PojoNoAnnotatedKeyspace implements Serializable {
+
+ private static final long serialVersionUID = 1038054554690916991L;
+
+ @Column(name = "id")
+ private String id;
+ @Column(name = "counter")
+ private int counter;
+ @Column(name = "batch_id")
+ private int batchID;
+
+ public PojoNoAnnotatedKeyspace(String id, int counter, int batchID) {
+ this.id = id;
+ this.counter = counter;
+ this.batchID = batchID;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public int getCounter() {
+ return counter;
+ }
+
+ public void setCounter(int counter) {
+ this.counter = counter;
+ }
+
+ public int getBatchID() {
+ return batchID;
+ }
+
+ public void setBatchID(int batchId) {
+ this.batchID = batchId;
+ }
+}