You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/28 06:17:49 UTC
[19/21] flink git commit: [FLINK-6711] Activate strict checkstyle for
flink-connector-cassandra
[FLINK-6711] Activate strict checkstyle for flink-connector-cassandra
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1a3a5b6e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1a3a5b6e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1a3a5b6e
Branch: refs/heads/master
Commit: 1a3a5b6e976d18c49a99870c0f71ebf615a862d3
Parents: 7292c87
Author: zentol <ch...@apache.org>
Authored: Wed May 24 23:57:46 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sun May 28 00:11:32 2017 +0200
----------------------------------------------------------------------
.../cassandra/CassandraInputFormat.java | 14 +--
.../cassandra/CassandraOutputFormat.java | 14 +--
.../cassandra/AbstractCassandraTupleSink.java | 105 ++++++++++---------
.../cassandra/CassandraCommitter.java | 10 +-
.../connectors/cassandra/CassandraPojoSink.java | 9 +-
.../cassandra/CassandraScalaProductSink.java | 82 +++++++--------
.../connectors/cassandra/CassandraSink.java | 39 +++++--
.../connectors/cassandra/CassandraSinkBase.java | 9 +-
.../cassandra/CassandraTupleWriteAheadSink.java | 14 +--
.../connectors/cassandra/ClusterBuilder.java | 1 +
.../cassandra/example/BatchExample.java | 10 +-
.../cassandra/CassandraConnectorITCase.java | 34 +++---
.../CassandraTupleWriteAheadSinkTest.java | 17 +--
.../streaming/connectors/cassandra/Pojo.java | 18 ++--
.../example/CassandraPojoSinkExample.java | 11 +-
.../example/CassandraTupleSinkExample.java | 8 +-
.../CassandraTupleWriteAheadSinkExample.java | 12 ++-
.../connectors/cassandra/example/Message.java | 4 +
.../src/test/resources/log4j-test.properties | 1 -
19 files changed, 230 insertions(+), 182 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1a3a5b6e/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
index 849e023..e0806fe 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
@@ -14,13 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.batch.connectors.cassandra;
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-import com.google.common.base.Strings;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.NonParallelInput;
import org.apache.flink.api.common.io.RichInputFormat;
@@ -32,6 +28,12 @@ import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
import org.apache.flink.util.Preconditions;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.google.common.base.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -121,7 +123,7 @@ public class CassandraInputFormat<OUT extends Tuple> extends RichInputFormat<OUT
}
try {
- if (cluster != null ) {
+ if (cluster != null) {
cluster.close();
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/1a3a5b6e/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
index 15d8fb3..c81391d 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
@@ -14,8 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.batch.connectors.cassandra;
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.util.Preconditions;
+
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
@@ -24,11 +31,6 @@ import com.datastax.driver.core.Session;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
-import org.apache.flink.api.common.io.RichOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
-import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -115,7 +117,7 @@ public class CassandraOutputFormat<OUT extends Tuple> extends RichOutputFormat<O
}
try {
- if (cluster != null ) {
+ if (cluster != null) {
cluster.close();
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/1a3a5b6e/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java
index 7a8d097..fda739e 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java
@@ -1,52 +1,53 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra;
-
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
-import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.flink.configuration.Configuration;
-
-/**
- * Abstract sink to write tuple-like values into a Cassandra cluster.
- *
- * @param <IN> Type of the elements emitted by this sink
- */
-public abstract class AbstractCassandraTupleSink<IN> extends CassandraSinkBase<IN, ResultSet> {
- private final String insertQuery;
- private transient PreparedStatement ps;
-
- public AbstractCassandraTupleSink(String insertQuery, ClusterBuilder builder) {
- super(builder);
- this.insertQuery = insertQuery;
- }
-
- @Override
- public void open(Configuration configuration) {
- super.open(configuration);
- this.ps = session.prepare(insertQuery);
- }
-
- @Override
- public ListenableFuture<ResultSet> send(IN value) {
- Object[] fields = extract(value);
- return session.executeAsync(ps.bind(fields));
- }
-
- protected abstract Object[] extract(IN record);
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.configuration.Configuration;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * Abstract sink to write tuple-like values into a Cassandra cluster.
+ *
+ * @param <IN> Type of the elements emitted by this sink
+ */
+public abstract class AbstractCassandraTupleSink<IN> extends CassandraSinkBase<IN, ResultSet> {
+ private final String insertQuery;
+ private transient PreparedStatement ps;
+
+ public AbstractCassandraTupleSink(String insertQuery, ClusterBuilder builder) {
+ super(builder);
+ this.insertQuery = insertQuery;
+ }
+
+ @Override
+ public void open(Configuration configuration) {
+ super.open(configuration);
+ this.ps = session.prepare(insertQuery);
+ }
+
+ @Override
+ public ListenableFuture<ResultSet> send(IN value) {
+ Object[] fields = extract(value);
+ return session.executeAsync(ps.bind(fields));
+ }
+
+ protected abstract Object[] extract(IN record);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a3a5b6e/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
index 63b76da..b3948b2 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
@@ -15,13 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.cassandra;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
import java.util.HashMap;
import java.util.Iterator;
@@ -30,13 +32,13 @@ import java.util.Map;
/**
* CheckpointCommitter that saves information about completed checkpoints within a separate table in a cassandra
* database.
- *
+ *
* <p>Entries are in the form |operator_id | subtask_id | last_completed_checkpoint|
*/
public class CassandraCommitter extends CheckpointCommitter {
private static final long serialVersionUID = 1L;
-
+
private final ClusterBuilder builder;
private transient Cluster cluster;
private transient Session session;
http://git-wip-us.apache.org/repos/asf/flink/blob/1a3a5b6e/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
index 9cfb2f8..c9b29b8 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
@@ -17,14 +17,15 @@
package org.apache.flink.streaming.connectors.cassandra;
+import org.apache.flink.configuration.Configuration;
+
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.mapping.Mapper;
import com.datastax.driver.mapping.MappingManager;
import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.flink.configuration.Configuration;
/**
- * Flink Sink to save data into a Cassandra cluster using
+ * Flink Sink to save data into a Cassandra cluster using
* <a href="http://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/mapping/Mapper.html">Mapper</a>,
* which it uses annotations from
* <a href="http://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/mapping/annotations/package-summary.html">
@@ -41,9 +42,9 @@ public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, ResultSet> {
protected transient MappingManager mappingManager;
/**
- * The main constructor for creating CassandraPojoSink
+ * The main constructor for creating CassandraPojoSink.
*
- * @param clazz Class<IN> instance
+ * @param clazz Class instance
*/
public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder) {
super(builder);
http://git-wip-us.apache.org/repos/asf/flink/blob/1a3a5b6e/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java
index a975985..1d1b634 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java
@@ -1,41 +1,41 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.cassandra;
-
-
-import scala.Product;
-
-/**
- * Sink to write scala tuples and case classes into a Cassandra cluster.
- *
- * @param <IN> Type of the elements emitted by this sink, it must extend {@link Product}
- */
-public class CassandraScalaProductSink<IN extends Product> extends AbstractCassandraTupleSink<IN> {
- public CassandraScalaProductSink(String insertQuery, ClusterBuilder builder) {
- super(insertQuery, builder);
- }
-
- @Override
- protected Object[] extract(IN record) {
- Object[] al = new Object[record.productArity()];
- for (int i = 0; i < record.productArity(); i++) {
- al[i] = record.productElement(i);
- }
- return al;
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import scala.Product;
+
+/**
+ * Sink to write scala tuples and case classes into a Cassandra cluster.
+ *
+ * @param <IN> Type of the elements emitted by this sink, it must extend {@link Product}
+ */
+public class CassandraScalaProductSink<IN extends Product> extends AbstractCassandraTupleSink<IN> {
+ public CassandraScalaProductSink(String insertQuery, ClusterBuilder builder) {
+ super(insertQuery, builder);
+ }
+
+ @Override
+ protected Object[] extract(IN record) {
+ Object[] al = new Object[record.productArity()];
+ for (int i = 0; i < record.productArity(); i++) {
+ al[i] = record.productElement(i);
+ }
+ return al;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a3a5b6e/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
index 6a33601..af138c5 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
@@ -15,9 +15,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.cassandra;
-import com.datastax.driver.core.Cluster;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -32,6 +32,9 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.transformations.SinkTransformation;
import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+
+import com.datastax.driver.core.Cluster;
+
import scala.Product;
/**
@@ -79,10 +82,10 @@ public class CassandraSink<IN> {
/**
* Sets an ID for this operator.
- * <p/>
+ *
* <p>The specified ID is used to assign the same operator ID across job
* submissions (for example when starting a job from a savepoint).
- * <p/>
+ *
* <p><strong>Important</strong>: this ID needs to be unique per
* transformation and job. Otherwise, job submission will fail.
*
@@ -101,19 +104,17 @@ public class CassandraSink<IN> {
/**
* Sets an user provided hash for this operator. This will be used AS IS the create the JobVertexID.
- * <p/>
+ *
* <p>The user provided hash is an alternative to the generated hashes, that is considered when identifying an
* operator through the default hash mechanics fails (e.g. because of changes between Flink versions).
- * <p/>
+ *
* <p><strong>Important</strong>: this should be used as a workaround or for trouble shooting. The provided hash
* needs to be unique per transformation and job. Otherwise, job submission will fail. Furthermore, you cannot
* assign user-specified hash to intermediate nodes in an operator chain and trying so will let your job fail.
*
- * <p>
- * A use case for this is in migration between Flink versions or changing the jobs in a way that changes the
+ * <p>A use case for this is in migration between Flink versions or changing the jobs in a way that changes the
* automatically generated hashes. In this case, providing the previous hashes directly through this method (e.g.
* obtained from old logs) can help to reestablish a lost mapping from states to their target operator.
- * <p/>
*
* @param uidHash The user provided hash for this operator. This will become the JobVertexID, which is shown in the
* logs and web ui.
@@ -168,10 +169,10 @@ public class CassandraSink<IN> {
* Sets the slot sharing group of this operation. Parallel instances of
* operations that are in the same slot sharing group will be co-located in the same
* TaskManager slot, if possible.
- * <p/>
+ *
* <p>Operations inherit the slot sharing group of input operations if all input operations
* are in the same slot sharing group and no slot sharing group was explicitly specified.
- * <p/>
+ *
* <p>Initially an operation is in the default slot sharing group. An operation can be put into
* the default group explicitly by setting the slot sharing group to {@code "default"}.
*
@@ -220,6 +221,10 @@ public class CassandraSink<IN> {
throw new IllegalArgumentException("No support for the type of the given DataStream: " + input.getType());
}
+ /**
+ * Builder for a {@link CassandraSink}.
+ * @param <IN>
+ */
public abstract static class CassandraSinkBuilder<IN> {
protected final DataStream<IN> input;
protected final TypeSerializer<IN> serializer;
@@ -327,7 +332,7 @@ public class CassandraSink<IN> {
? createWriteAheadSink()
: createSink();
}
-
+
protected abstract CassandraSink<IN> createSink() throws Exception;
protected abstract CassandraSink<IN> createWriteAheadSink() throws Exception;
@@ -339,6 +344,10 @@ public class CassandraSink<IN> {
}
}
+ /**
+ * Builder for a {@link CassandraTupleSink}.
+ * @param <IN>
+ */
public static class CassandraTupleSinkBuilder<IN extends Tuple> extends CassandraSinkBuilder<IN> {
public CassandraTupleSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
super(input, typeInfo, serializer);
@@ -365,6 +374,10 @@ public class CassandraSink<IN> {
}
}
+ /**
+ * Builder for a {@link CassandraPojoSink}.
+ * @param <IN>
+ */
public static class CassandraPojoSinkBuilder<IN> extends CassandraSinkBuilder<IN> {
public CassandraPojoSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
super(input, typeInfo, serializer);
@@ -389,6 +402,10 @@ public class CassandraSink<IN> {
}
}
+ /**
+ * Builder for a {@link CassandraScalaProductSink}.
+ * @param <IN>
+ */
public static class CassandraScalaProductSinkBuilder<IN extends Product> extends CassandraSinkBuilder<IN> {
public CassandraScalaProductSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
http://git-wip-us.apache.org/repos/asf/flink/blob/1a3a5b6e/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
index b1b261e..5da1f57 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
@@ -17,14 +17,15 @@
package org.apache.flink.streaming.connectors.cassandra;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,7 +76,7 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
}
}
exception = t;
-
+
log.error("Error while sending value.", t);
}
};
http://git-wip-us.apache.org/repos/asf/flink/blob/1a3a5b6e/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
index a3d002e..fac7b8b 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
@@ -15,8 +15,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.cassandra;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink;
+
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.PreparedStatement;
@@ -25,12 +33,6 @@ import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
-import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
-import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
http://git-wip-us.apache.org/repos/asf/flink/blob/1a3a5b6e/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java
index 9fd3b4e..4dedda4 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.cassandra;
import com.datastax.driver.core.Cluster;
http://git-wip-us.apache.org/repos/asf/flink/blob/1a3a5b6e/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java
index e66b8b3..af21f2d 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java
@@ -14,10 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.batch.connectors.cassandra.example;
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Cluster.Builder;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -27,12 +26,15 @@ import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Cluster.Builder;
+
import java.util.ArrayList;
/**
* This is an example showing the to use the Cassandra Input-/OutputFormats in the Batch API.
- *
- * The example assumes that a table exists in a local cassandra database, according to the following query:
+ *
+ * <p>The example assumes that a table exists in a local cassandra database, according to the following query:
* CREATE TABLE test.batches (number int, strings text, PRIMARY KEY(number, strings));
*/
public class BatchExample {
http://git-wip-us.apache.org/repos/asf/flink/blob/1a3a5b6e/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index e6924a3..fe538a8 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -18,15 +18,6 @@
package org.apache.flink.streaming.connectors.cassandra;
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.core.QueryOptions;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-
-import org.apache.cassandra.service.CassandraDaemon;
-
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.io.InputFormat;
@@ -47,16 +38,20 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import org.apache.cassandra.service.CassandraDaemon;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.collection.JavaConverters;
-import scala.collection.Seq;
import java.io.BufferedWriter;
import java.io.File;
@@ -69,8 +64,14 @@ import java.util.Random;
import java.util.Scanner;
import java.util.UUID;
-import static org.junit.Assert.*;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+import static org.junit.Assert.assertTrue;
+/**
+ * IT cases for all cassandra sinks.
+ */
@SuppressWarnings("serial")
public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<String, Integer, Integer>, CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>> {
@@ -138,7 +139,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
ClassLoader classLoader = CassandraConnectorITCase.class.getClassLoader();
File file = new File(classLoader.getResource("cassandra.yaml").getFile());
File tmp = new File(tmpDir.getAbsolutePath() + File.separator + "cassandra.yaml");
-
+
assertTrue(tmp.createNewFile());
try (
@@ -155,7 +156,6 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
}
}
-
// Tell cassandra where the configuration files are.
// Use the test configuration file.
System.setProperty("cassandra.config", tmp.getAbsoluteFile().toURI().toString());
@@ -468,11 +468,11 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY));
List<Row> rows = rs.all();
- assertEquals(scalaTupleCollection.size(), rows.size());
+ Assert.assertEquals(scalaTupleCollection.size(), rows.size());
for (Row row : rows) {
scalaTupleCollection.remove(new scala.Tuple3<>(row.getString("id"), row.getInt("counter"), row.getInt("batch_id")));
}
- assertEquals(0, scalaTupleCollection.size());
+ Assert.assertEquals(0, scalaTupleCollection.size());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a3a5b6e/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java
index 847d1a0..06a9335 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java
@@ -15,18 +15,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.cassandra;
-import com.datastax.driver.core.BoundStatement;
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSetFuture;
-import com.datastax.driver.core.Session;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.tuple.Tuple0;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.invocation.InvocationOnMock;
@@ -43,9 +45,12 @@ import static org.powermock.api.mockito.PowerMockito.doAnswer;
import static org.powermock.api.mockito.PowerMockito.mock;
import static org.powermock.api.mockito.PowerMockito.when;
+/**
+ * Tests for the {@link CassandraTupleWriteAheadSink}.
+ */
public class CassandraTupleWriteAheadSinkTest {
- @Test(timeout=20000)
+ @Test(timeout = 20000)
public void testAckLoopExitOnException() throws Exception {
final AtomicReference<Runnable> runnableFuture = new AtomicReference<>();
http://git-wip-us.apache.org/repos/asf/flink/blob/1a3a5b6e/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java
index 9b331d6..226043f 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.cassandra;
import com.datastax.driver.mapping.annotations.Column;
@@ -21,6 +22,9 @@ import com.datastax.driver.mapping.annotations.Table;
import java.io.Serializable;
+/**
+ * Test Pojo with DataStax annotations used.
+ */
@Table(keyspace = "flink", name = "test")
public class Pojo implements Serializable {
@@ -31,12 +35,12 @@ public class Pojo implements Serializable {
@Column(name = "counter")
private int counter;
@Column(name = "batch_id")
- private int batch_id;
+ private int batchID;
- public Pojo(String id, int counter, int batch_id) {
+ public Pojo(String id, int counter, int batchID) {
this.id = id;
this.counter = counter;
- this.batch_id = batch_id;
+ this.batchID = batchID;
}
public String getId() {
@@ -55,11 +59,11 @@ public class Pojo implements Serializable {
this.counter = counter;
}
- public int getBatch_id() {
- return batch_id;
+ public int getBatchID() {
+ return batchID;
}
- public void setBatch_id(int batch_id) {
- this.batch_id = batch_id;
+ public void setBatchID(int batchId) {
+ this.batchID = batchId;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a3a5b6e/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java
index e1bcea9..a38b73b 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java
@@ -17,21 +17,22 @@
package org.apache.flink.streaming.connectors.cassandra.example;
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Cluster.Builder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Cluster.Builder;
+
import java.util.ArrayList;
/**
* This is an example showing the to use the Pojo Cassandra Sink in the Streaming API.
- *
- * Pojo's have to be annotated with datastax annotations to work with this sink.
*
- * The example assumes that a table exists in a local cassandra database, according to the following query:
+ * <p>Pojo's have to be annotated with datastax annotations to work with this sink.
+ *
+ * <p>The example assumes that a table exists in a local cassandra database, according to the following query:
* CREATE TABLE IF NOT EXISTS test.message(body txt PRIMARY KEY)
*/
public class CassandraPojoSinkExample {
http://git-wip-us.apache.org/repos/asf/flink/blob/1a3a5b6e/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java
index c6345df..ce2326f 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java
@@ -14,22 +14,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.cassandra.example;
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Cluster.Builder;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Cluster.Builder;
+
import java.util.ArrayList;
/**
* This is an example showing the to use the Tuple Cassandra Sink in the Streaming API.
*
- * The example assumes that a table exists in a local cassandra database, according to the following query:
+ * <p>The example assumes that a table exists in a local cassandra database, according to the following query:
* CREATE TABLE IF NOT EXISTS test.writetuple(element1 text PRIMARY KEY, element2 int)
*/
public class CassandraTupleSinkExample {
http://git-wip-us.apache.org/repos/asf/flink/blob/1a3a5b6e/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java
index 23de949..38618fe 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java
@@ -15,9 +15,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.cassandra.example;
-import com.datastax.driver.core.Cluster;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
@@ -27,6 +27,8 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import com.datastax.driver.core.Cluster;
+
import java.util.Collections;
import java.util.List;
import java.util.UUID;
@@ -34,10 +36,10 @@ import java.util.UUID;
/**
* This is an example showing the to use the Cassandra Sink (with write-ahead log) in the Streaming API.
*
- * The example assumes that a table exists in a local cassandra database, according to the following query:
+ * <p>The example assumes that a table exists in a local cassandra database, according to the following query:
* CREATE TABLE example.values (id text, count int, PRIMARY KEY(id));
- *
- * Important things to note are that checkpointing is enabled, a StateBackend is set and the enableWriteAheadLog() call
+ *
+ * <p>Important things to note are that checkpointing is enabled, a StateBackend is set and the enableWriteAheadLog() call
* when creating the CassandraSink.
*/
public class CassandraTupleWriteAheadSinkExample {
@@ -67,7 +69,7 @@ public class CassandraTupleWriteAheadSinkExample {
env.execute();
}
- public static class MySource implements SourceFunction<Tuple2<String, Integer>>, ListCheckpointed<Integer> {
+ private static class MySource implements SourceFunction<Tuple2<String, Integer>>, ListCheckpointed<Integer> {
private static final long serialVersionUID = 4022367939215095610L;
private int counter = 0;
http://git-wip-us.apache.org/repos/asf/flink/blob/1a3a5b6e/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/Message.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/Message.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/Message.java
index 7524d95..512d0ea 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/Message.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/Message.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.cassandra.example;
import com.datastax.driver.mapping.annotations.Column;
@@ -21,6 +22,9 @@ import com.datastax.driver.mapping.annotations.Table;
import java.io.Serializable;
+/**
+ * Pojo with DataStax annotations..
+ */
@Table(keyspace = "test", name = "message")
public class Message implements Serializable {
http://git-wip-us.apache.org/repos/asf/flink/blob/1a3a5b6e/flink-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties
index a43d556..c1d3cca 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties
+++ b/flink-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties
@@ -26,4 +26,3 @@ log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
# suppress the irrelevant (wrong) warnings from the netty channel handler
log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
-