You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/28 06:17:49 UTC

[19/21] flink git commit: [FLINK-6711] Activate strict checkstyle for flink-connector-cassandra

[FLINK-6711] Activate strict checkstyle for flink-connector-cassandra


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1a3a5b6e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1a3a5b6e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1a3a5b6e

Branch: refs/heads/master
Commit: 1a3a5b6e976d18c49a99870c0f71ebf615a862d3
Parents: 7292c87
Author: zentol <ch...@apache.org>
Authored: Wed May 24 23:57:46 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sun May 28 00:11:32 2017 +0200

----------------------------------------------------------------------
 .../cassandra/CassandraInputFormat.java         |  14 +--
 .../cassandra/CassandraOutputFormat.java        |  14 +--
 .../cassandra/AbstractCassandraTupleSink.java   | 105 ++++++++++---------
 .../cassandra/CassandraCommitter.java           |  10 +-
 .../connectors/cassandra/CassandraPojoSink.java |   9 +-
 .../cassandra/CassandraScalaProductSink.java    |  82 +++++++--------
 .../connectors/cassandra/CassandraSink.java     |  39 +++++--
 .../connectors/cassandra/CassandraSinkBase.java |   9 +-
 .../cassandra/CassandraTupleWriteAheadSink.java |  14 +--
 .../connectors/cassandra/ClusterBuilder.java    |   1 +
 .../cassandra/example/BatchExample.java         |  10 +-
 .../cassandra/CassandraConnectorITCase.java     |  34 +++---
 .../CassandraTupleWriteAheadSinkTest.java       |  17 +--
 .../streaming/connectors/cassandra/Pojo.java    |  18 ++--
 .../example/CassandraPojoSinkExample.java       |  11 +-
 .../example/CassandraTupleSinkExample.java      |   8 +-
 .../CassandraTupleWriteAheadSinkExample.java    |  12 ++-
 .../connectors/cassandra/example/Message.java   |   4 +
 .../src/test/resources/log4j-test.properties    |   1 -
 19 files changed, 230 insertions(+), 182 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1a3a5b6e/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
index 849e023..e0806fe 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
@@ -14,13 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.batch.connectors.cassandra;
 
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-import com.google.common.base.Strings;
 import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
 import org.apache.flink.api.common.io.NonParallelInput;
 import org.apache.flink.api.common.io.RichInputFormat;
@@ -32,6 +28,12 @@ import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
 import org.apache.flink.util.Preconditions;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.google.common.base.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -121,7 +123,7 @@ public class CassandraInputFormat<OUT extends Tuple> extends RichInputFormat<OUT
 		}
 
 		try {
-			if (cluster != null ) {
+			if (cluster != null) {
 				cluster.close();
 			}
 		} catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/1a3a5b6e/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
index 15d8fb3..c81391d 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
@@ -14,8 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.batch.connectors.cassandra;
 
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.util.Preconditions;
+
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.ResultSet;
@@ -24,11 +31,6 @@ import com.datastax.driver.core.Session;
 import com.google.common.base.Strings;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
-import org.apache.flink.api.common.io.RichOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
-import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -115,7 +117,7 @@ public class CassandraOutputFormat<OUT extends Tuple> extends RichOutputFormat<O
 		}
 
 		try {
-			if (cluster != null ) {
+			if (cluster != null) {
 				cluster.close();
 			}
 		} catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/1a3a5b6e/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java
index 7a8d097..fda739e 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java
@@ -1,52 +1,53 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra;
-
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
-import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.flink.configuration.Configuration;
-
-/**
- * Abstract sink to write tuple-like values into a Cassandra cluster.
- *
- * @param <IN> Type of the elements emitted by this sink
- */
-public abstract class AbstractCassandraTupleSink<IN> extends CassandraSinkBase<IN, ResultSet> {
-	private final String insertQuery;
-	private transient PreparedStatement ps;
-
-	public AbstractCassandraTupleSink(String insertQuery, ClusterBuilder builder) {
-		super(builder);
-		this.insertQuery = insertQuery;
-	}
-
-	@Override
-	public void open(Configuration configuration) {
-		super.open(configuration);
-		this.ps = session.prepare(insertQuery);
-	}
-
-	@Override
-	public ListenableFuture<ResultSet> send(IN value) {
-		Object[] fields = extract(value);
-		return session.executeAsync(ps.bind(fields));
-	}
-
-	protected abstract Object[] extract(IN record);
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.configuration.Configuration;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * Abstract sink to write tuple-like values into a Cassandra cluster.
+ *
+ * @param <IN> Type of the elements emitted by this sink
+ */
+public abstract class AbstractCassandraTupleSink<IN> extends CassandraSinkBase<IN, ResultSet> {
+	private final String insertQuery;
+	private transient PreparedStatement ps;
+
+	public AbstractCassandraTupleSink(String insertQuery, ClusterBuilder builder) {
+		super(builder);
+		this.insertQuery = insertQuery;
+	}
+
+	@Override
+	public void open(Configuration configuration) {
+		super.open(configuration);
+		this.ps = session.prepare(insertQuery);
+	}
+
+	@Override
+	public ListenableFuture<ResultSet> send(IN value) {
+		Object[] fields = extract(value);
+		return session.executeAsync(ps.bind(fields));
+	}
+
+	protected abstract Object[] extract(IN record);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a3a5b6e/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
index 63b76da..b3948b2 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
@@ -15,13 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.cassandra;
 
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
 
 import java.util.HashMap;
 import java.util.Iterator;
@@ -30,13 +32,13 @@ import java.util.Map;
 /**
  * CheckpointCommitter that saves information about completed checkpoints within a separate table in a cassandra
  * database.
- * 
+ *
  * <p>Entries are in the form |operator_id | subtask_id | last_completed_checkpoint|
  */
 public class CassandraCommitter extends CheckpointCommitter {
 
 	private static final long serialVersionUID = 1L;
-	
+
 	private final ClusterBuilder builder;
 	private transient Cluster cluster;
 	private transient Session session;

http://git-wip-us.apache.org/repos/asf/flink/blob/1a3a5b6e/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
index 9cfb2f8..c9b29b8 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
@@ -17,14 +17,15 @@
 
 package org.apache.flink.streaming.connectors.cassandra;
 
+import org.apache.flink.configuration.Configuration;
+
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.mapping.Mapper;
 import com.datastax.driver.mapping.MappingManager;
 import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.flink.configuration.Configuration;
 
 /**
- * Flink Sink to save data into a Cassandra cluster using 
+ * Flink Sink to save data into a Cassandra cluster using
  * <a href="http://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/mapping/Mapper.html">Mapper</a>,
  * which it uses annotations from
  * <a href="http://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/mapping/annotations/package-summary.html">
@@ -41,9 +42,9 @@ public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, ResultSet> {
 	protected transient MappingManager mappingManager;
 
 	/**
-	 * The main constructor for creating CassandraPojoSink
+	 * The main constructor for creating CassandraPojoSink.
 	 *
-	 * @param clazz Class<IN> instance
+	 * @param clazz Class instance
 	 */
 	public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder) {
 		super(builder);

http://git-wip-us.apache.org/repos/asf/flink/blob/1a3a5b6e/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java
index a975985..1d1b634 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java
@@ -1,41 +1,41 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.cassandra;
-
-
-import scala.Product;
-
-/**
- * Sink to write scala tuples and case classes into a Cassandra cluster.
- *
- * @param <IN> Type of the elements emitted by this sink, it must extend {@link Product}
- */
-public class CassandraScalaProductSink<IN extends Product> extends AbstractCassandraTupleSink<IN> {
-	public CassandraScalaProductSink(String insertQuery, ClusterBuilder builder) {
-		super(insertQuery, builder);
-	}
-
-	@Override
-	protected Object[] extract(IN record) {
-		Object[] al = new Object[record.productArity()];
-		for (int i = 0; i < record.productArity(); i++) {
-			al[i] = record.productElement(i);
-		}
-		return al;
-	}
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import scala.Product;
+
+/**
+ * Sink to write scala tuples and case classes into a Cassandra cluster.
+ *
+ * @param <IN> Type of the elements emitted by this sink, it must extend {@link Product}
+ */
+public class CassandraScalaProductSink<IN extends Product> extends AbstractCassandraTupleSink<IN> {
+	public CassandraScalaProductSink(String insertQuery, ClusterBuilder builder) {
+		super(insertQuery, builder);
+	}
+
+	@Override
+	protected Object[] extract(IN record) {
+		Object[] al = new Object[record.productArity()];
+		for (int i = 0; i < record.productArity(); i++) {
+			al[i] = record.productElement(i);
+		}
+		return al;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a3a5b6e/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
index 6a33601..af138c5 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
@@ -15,9 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.cassandra;
 
-import com.datastax.driver.core.Cluster;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -32,6 +32,9 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.transformations.SinkTransformation;
 import org.apache.flink.streaming.api.transformations.StreamTransformation;
 import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+
+import com.datastax.driver.core.Cluster;
+
 import scala.Product;
 
 /**
@@ -79,10 +82,10 @@ public class CassandraSink<IN> {
 
 	/**
 	 * Sets an ID for this operator.
-	 * <p/>
+	 *
 	 * <p>The specified ID is used to assign the same operator ID across job
 	 * submissions (for example when starting a job from a savepoint).
-	 * <p/>
+	 *
 	 * <p><strong>Important</strong>: this ID needs to be unique per
 	 * transformation and job. Otherwise, job submission will fail.
 	 *
@@ -101,19 +104,17 @@ public class CassandraSink<IN> {
 
 	/**
 	 * Sets an user provided hash for this operator. This will be used AS IS the create the JobVertexID.
-	 * <p/>
+	 *
 	 * <p>The user provided hash is an alternative to the generated hashes, that is considered when identifying an
 	 * operator through the default hash mechanics fails (e.g. because of changes between Flink versions).
-	 * <p/>
+	 *
 	 * <p><strong>Important</strong>: this should be used as a workaround or for trouble shooting. The provided hash
 	 * needs to be unique per transformation and job. Otherwise, job submission will fail. Furthermore, you cannot
 	 * assign user-specified hash to intermediate nodes in an operator chain and trying so will let your job fail.
 	 *
-	 * <p>
-	 * A use case for this is in migration between Flink versions or changing the jobs in a way that changes the
+	 * <p>A use case for this is in migration between Flink versions or changing the jobs in a way that changes the
 	 * automatically generated hashes. In this case, providing the previous hashes directly through this method (e.g.
 	 * obtained from old logs) can help to reestablish a lost mapping from states to their target operator.
-	 * <p/>
 	 *
 	 * @param uidHash The user provided hash for this operator. This will become the JobVertexID, which is shown in the
 	 *                 logs and web ui.
@@ -168,10 +169,10 @@ public class CassandraSink<IN> {
 	 * Sets the slot sharing group of this operation. Parallel instances of
 	 * operations that are in the same slot sharing group will be co-located in the same
 	 * TaskManager slot, if possible.
-	 * <p/>
+	 *
 	 * <p>Operations inherit the slot sharing group of input operations if all input operations
 	 * are in the same slot sharing group and no slot sharing group was explicitly specified.
-	 * <p/>
+	 *
 	 * <p>Initially an operation is in the default slot sharing group. An operation can be put into
 	 * the default group explicitly by setting the slot sharing group to {@code "default"}.
 	 *
@@ -220,6 +221,10 @@ public class CassandraSink<IN> {
 		throw new IllegalArgumentException("No support for the type of the given DataStream: " + input.getType());
 	}
 
+	/**
+	 * Builder for a {@link CassandraSink}.
+	 * @param <IN>
+	 */
 	public abstract static class CassandraSinkBuilder<IN> {
 		protected final DataStream<IN> input;
 		protected final TypeSerializer<IN> serializer;
@@ -327,7 +332,7 @@ public class CassandraSink<IN> {
 				? createWriteAheadSink()
 				: createSink();
 		}
-		
+
 		protected abstract CassandraSink<IN> createSink() throws Exception;
 
 		protected abstract CassandraSink<IN> createWriteAheadSink() throws Exception;
@@ -339,6 +344,10 @@ public class CassandraSink<IN> {
 		}
 	}
 
+	/**
+	 * Builder for a {@link CassandraTupleSink}.
+	 * @param <IN>
+	 */
 	public static class CassandraTupleSinkBuilder<IN extends Tuple> extends CassandraSinkBuilder<IN> {
 		public CassandraTupleSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
 			super(input, typeInfo, serializer);
@@ -365,6 +374,10 @@ public class CassandraSink<IN> {
 		}
 	}
 
+	/**
+	 * Builder for a {@link CassandraPojoSink}.
+	 * @param <IN>
+	 */
 	public static class CassandraPojoSinkBuilder<IN> extends CassandraSinkBuilder<IN> {
 		public CassandraPojoSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
 			super(input, typeInfo, serializer);
@@ -389,6 +402,10 @@ public class CassandraSink<IN> {
 		}
 	}
 
+	/**
+	 * Builder for a {@link CassandraScalaProductSink}.
+	 * @param <IN>
+	 */
 	public static class CassandraScalaProductSinkBuilder<IN extends Product> extends CassandraSinkBuilder<IN> {
 
 		public CassandraScalaProductSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {

http://git-wip-us.apache.org/repos/asf/flink/blob/1a3a5b6e/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
index b1b261e..5da1f57 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
@@ -17,14 +17,15 @@
 
 package org.apache.flink.streaming.connectors.cassandra;
 
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.Session;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,7 +76,7 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
 					}
 				}
 				exception = t;
-				
+
 				log.error("Error while sending value.", t);
 			}
 		};

http://git-wip-us.apache.org/repos/asf/flink/blob/1a3a5b6e/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
index a3d002e..fac7b8b 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
@@ -15,8 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.cassandra;
 
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink;
+
 import com.datastax.driver.core.BoundStatement;
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.PreparedStatement;
@@ -25,12 +33,6 @@ import com.datastax.driver.core.ResultSetFuture;
 import com.datastax.driver.core.Session;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
-import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
-import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink;
 
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;

http://git-wip-us.apache.org/repos/asf/flink/blob/1a3a5b6e/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java
index 9fd3b4e..4dedda4 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.cassandra;
 
 import com.datastax.driver.core.Cluster;

http://git-wip-us.apache.org/repos/asf/flink/blob/1a3a5b6e/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java
index e66b8b3..af21f2d 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java
@@ -14,10 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.batch.connectors.cassandra.example;
 
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Cluster.Builder;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -27,12 +26,15 @@ import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
 import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;
 import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
 
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Cluster.Builder;
+
 import java.util.ArrayList;
 
 /**
  * This is an example showing the to use the Cassandra Input-/OutputFormats in the Batch API.
- * 
- * The example assumes that a table exists in a local cassandra database, according to the following query: 
+ *
+ * <p>The example assumes that a table exists in a local cassandra database, according to the following query:
  * CREATE TABLE test.batches (number int, strings text, PRIMARY KEY(number, strings));
  */
 public class BatchExample {

http://git-wip-us.apache.org/repos/asf/flink/blob/1a3a5b6e/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index e6924a3..fe538a8 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -18,15 +18,6 @@
 
 package org.apache.flink.streaming.connectors.cassandra;
 
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.core.QueryOptions;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-
-import org.apache.cassandra.service.CassandraDaemon;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.io.InputFormat;
@@ -47,16 +38,20 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
 
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import org.apache.cassandra.service.CassandraDaemon;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.collection.JavaConverters;
-import scala.collection.Seq;
 
 import java.io.BufferedWriter;
 import java.io.File;
@@ -69,8 +64,14 @@ import java.util.Random;
 import java.util.Scanner;
 import java.util.UUID;
 
-import static org.junit.Assert.*;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+import static org.junit.Assert.assertTrue;
 
+/**
+ * IT cases for all cassandra sinks.
+ */
 @SuppressWarnings("serial")
 public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<String, Integer, Integer>, CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>> {
 
@@ -138,7 +139,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 		ClassLoader classLoader = CassandraConnectorITCase.class.getClassLoader();
 		File file = new File(classLoader.getResource("cassandra.yaml").getFile());
 		File tmp = new File(tmpDir.getAbsolutePath() + File.separator + "cassandra.yaml");
-		
+
 		assertTrue(tmp.createNewFile());
 
 		try (
@@ -155,7 +156,6 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 			}
 		}
 
-
 		// Tell cassandra where the configuration files are.
 		// Use the test configuration file.
 		System.setProperty("cassandra.config", tmp.getAbsoluteFile().toURI().toString());
@@ -468,11 +468,11 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 
 		ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY));
 		List<Row> rows = rs.all();
-		assertEquals(scalaTupleCollection.size(), rows.size());
+		Assert.assertEquals(scalaTupleCollection.size(), rows.size());
 
 		for (Row row : rows) {
 			scalaTupleCollection.remove(new scala.Tuple3<>(row.getString("id"), row.getInt("counter"), row.getInt("batch_id")));
 		}
-		assertEquals(0, scalaTupleCollection.size());
+		Assert.assertEquals(0, scalaTupleCollection.size());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1a3a5b6e/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java
index 847d1a0..06a9335 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java
@@ -15,18 +15,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.cassandra;
 
-import com.datastax.driver.core.BoundStatement;
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSetFuture;
-import com.datastax.driver.core.Session;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.java.tuple.Tuple0;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
 import org.junit.Test;
 import org.mockito.Matchers;
 import org.mockito.invocation.InvocationOnMock;
@@ -43,9 +45,12 @@ import static org.powermock.api.mockito.PowerMockito.doAnswer;
 import static org.powermock.api.mockito.PowerMockito.mock;
 import static org.powermock.api.mockito.PowerMockito.when;
 
+/**
+ * Tests for the {@link CassandraTupleWriteAheadSink}.
+ */
 public class CassandraTupleWriteAheadSinkTest {
 
-	@Test(timeout=20000)
+	@Test(timeout = 20000)
 	public void testAckLoopExitOnException() throws Exception {
 		final AtomicReference<Runnable> runnableFuture = new AtomicReference<>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1a3a5b6e/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java
index 9b331d6..226043f 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.cassandra;
 
 import com.datastax.driver.mapping.annotations.Column;
@@ -21,6 +22,9 @@ import com.datastax.driver.mapping.annotations.Table;
 
 import java.io.Serializable;
 
+/**
+ * Test Pojo with DataStax annotations used.
+ */
 @Table(keyspace = "flink", name = "test")
 public class Pojo implements Serializable {
 
@@ -31,12 +35,12 @@ public class Pojo implements Serializable {
 	@Column(name = "counter")
 	private int counter;
 	@Column(name = "batch_id")
-	private int batch_id;
+	private int batchID;
 
-	public Pojo(String id, int counter, int batch_id) {
+	public Pojo(String id, int counter, int batchID) {
 		this.id = id;
 		this.counter = counter;
-		this.batch_id = batch_id;
+		this.batchID = batchID;
 	}
 
 	public String getId() {
@@ -55,11 +59,11 @@ public class Pojo implements Serializable {
 		this.counter = counter;
 	}
 
-	public int getBatch_id() {
-		return batch_id;
+	public int getBatchID() {
+		return batchID;
 	}
 
-	public void setBatch_id(int batch_id) {
-		this.batch_id = batch_id;
+	public void setBatchID(int batchId) {
+		this.batchID = batchId;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1a3a5b6e/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java
index e1bcea9..a38b73b 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java
@@ -17,21 +17,22 @@
 
 package org.apache.flink.streaming.connectors.cassandra.example;
 
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Cluster.Builder;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
 import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
 
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Cluster.Builder;
+
 import java.util.ArrayList;
 
 /**
  * This is an example showing the to use the Pojo Cassandra Sink in the Streaming API.
- * 
- * Pojo's have to be annotated with datastax annotations to work with this sink.
  *
- * The example assumes that a table exists in a local cassandra database, according to the following query:
+ * <p>Pojo's have to be annotated with datastax annotations to work with this sink.
+ *
+ * <p>The example assumes that a table exists in a local cassandra database, according to the following query:
  * CREATE TABLE IF NOT EXISTS test.message(body txt PRIMARY KEY)
  */
 public class CassandraPojoSinkExample {

http://git-wip-us.apache.org/repos/asf/flink/blob/1a3a5b6e/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java
index c6345df..ce2326f 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java
@@ -14,22 +14,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.cassandra.example;
 
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Cluster.Builder;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
 import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
 
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Cluster.Builder;
+
 import java.util.ArrayList;
 
 /**
  * This is an example showing the to use the Tuple Cassandra Sink in the Streaming API.
  *
- * The example assumes that a table exists in a local cassandra database, according to the following query:
+ * <p>The example assumes that a table exists in a local cassandra database, according to the following query:
  * CREATE TABLE IF NOT EXISTS test.writetuple(element1 text PRIMARY KEY, element2 int)
  */
 public class CassandraTupleSinkExample {

http://git-wip-us.apache.org/repos/asf/flink/blob/1a3a5b6e/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java
index 23de949..38618fe 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java
@@ -15,9 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.cassandra.example;
 
-import com.datastax.driver.core.Cluster;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
@@ -27,6 +27,8 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
 import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
 
+import com.datastax.driver.core.Cluster;
+
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
@@ -34,10 +36,10 @@ import java.util.UUID;
 /**
  * This is an example showing the to use the Cassandra Sink (with write-ahead log) in the Streaming API.
  *
- * The example assumes that a table exists in a local cassandra database, according to the following query:
+ * <p>The example assumes that a table exists in a local cassandra database, according to the following query:
  * CREATE TABLE example.values (id text, count int, PRIMARY KEY(id));
- * 
- * Important things to note are that checkpointing is enabled, a StateBackend is set and the enableWriteAheadLog() call
+ *
+ * <p>Important things to note are that checkpointing is enabled, a StateBackend is set and the enableWriteAheadLog() call
  * when creating the CassandraSink.
  */
 public class CassandraTupleWriteAheadSinkExample {
@@ -67,7 +69,7 @@ public class CassandraTupleWriteAheadSinkExample {
 		env.execute();
 	}
 
-	public static class MySource implements SourceFunction<Tuple2<String, Integer>>, ListCheckpointed<Integer> {
+	private static class MySource implements SourceFunction<Tuple2<String, Integer>>, ListCheckpointed<Integer> {
 		private static final long serialVersionUID = 4022367939215095610L;
 
 		private int counter = 0;

http://git-wip-us.apache.org/repos/asf/flink/blob/1a3a5b6e/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/Message.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/Message.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/Message.java
index 7524d95..512d0ea 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/Message.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/Message.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.cassandra.example;
 
 import com.datastax.driver.mapping.annotations.Column;
@@ -21,6 +22,9 @@ import com.datastax.driver.mapping.annotations.Table;
 
 import java.io.Serializable;
 
+/**
+ * Pojo with DataStax annotations..
+ */
 @Table(keyspace = "test", name = "message")
 public class Message implements Serializable {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1a3a5b6e/flink-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties
index a43d556..c1d3cca 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties
+++ b/flink-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties
@@ -26,4 +26,3 @@ log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
 # suppress the irrelevant (wrong) warnings from the netty channel handler
 log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
 
-