You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/28 06:17:35 UTC

[05/21] flink git commit: [FLINK-6711] Activate strict checkstyle for flink-hbase

[FLINK-6711] Activate strict checkstyle for flink-hbase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/43183ad2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/43183ad2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/43183ad2

Branch: refs/heads/master
Commit: 43183ad2ca6326dc6021d4f880f5095454c6952d
Parents: 23920bb
Author: zentol <ch...@apache.org>
Authored: Wed May 24 23:12:36 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sun May 28 00:11:09 2017 +0200

----------------------------------------------------------------------
 .../addons/hbase/AbstractTableInputFormat.java  | 17 +++---
 .../flink/addons/hbase/HBaseRowInputFormat.java | 11 ++--
 .../flink/addons/hbase/HBaseTableSchema.java    |  9 +--
 .../flink/addons/hbase/HBaseTableSource.java    | 12 ++--
 .../flink/addons/hbase/TableInputFormat.java    |  5 +-
 .../flink/addons/hbase/TableInputSplit.java     | 12 ++--
 .../addons/hbase/HBaseConnectorITCase.java      | 15 +++--
 .../hbase/HBaseTestingClusterAutostarter.java   |  6 +-
 .../hbase/example/HBaseFlinkTestConstants.java  | 14 ++---
 .../addons/hbase/example/HBaseReadExample.java  | 30 +++++-----
 .../addons/hbase/example/HBaseWriteExample.java | 61 ++++++++++----------
 .../hbase/example/HBaseWriteStreamExample.java  | 11 ++--
 .../src/test/resources/log4j-test.properties    |  4 +-
 13 files changed, 113 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/43183ad2/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/AbstractTableInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/AbstractTableInputFormat.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/AbstractTableInputFormat.java
index 59ba5b1f..73a21b3 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/AbstractTableInputFormat.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/AbstractTableInputFormat.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.addons.hbase;
 
 import org.apache.flink.api.common.io.InputFormat;
@@ -23,6 +24,7 @@ import org.apache.flink.api.common.io.RichInputFormat;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplitAssigner;
+
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
@@ -32,7 +34,6 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -42,7 +43,7 @@ import java.util.List;
  */
 public abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, TableInputSplit> {
 
-	protected static Logger LOG = LoggerFactory.getLogger(AbstractTableInputFormat.class);
+	protected static final Logger LOG = LoggerFactory.getLogger(AbstractTableInputFormat.class);
 
 	// helper variable to decide whether the input is exhausted or not
 	protected boolean endReached = false;
@@ -50,7 +51,7 @@ public abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, Tab
 	protected transient HTable table = null;
 	protected transient Scan scan = null;
 
-	/** HBase iterator wrapper */
+	/** HBase iterator wrapper. */
 	protected ResultScanner resultScanner = null;
 
 	protected byte[] currentRow;
@@ -65,7 +66,8 @@ public abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, Tab
 
 	/**
 	 * What table is to be read.
-	 * Per instance of a TableInputFormat derivative only a single table name is possible.
+	 *
+	 * <p>Per instance of a TableInputFormat derivative only a single table name is possible.
 	 *
 	 * @return The name of the table
 	 */
@@ -74,7 +76,7 @@ public abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, Tab
 	/**
 	 * HBase returns an instance of {@link Result}.
 	 *
-	 * This method maps the returned {@link Result} instance into the output type {@link T}.
+	 * <p>This method maps the returned {@link Result} instance into the output type {@link T}.
 	 *
 	 * @param r The Result instance from HBase that needs to be converted
 	 * @return The appropriate instance of {@link T} that contains the data of Result.
@@ -83,10 +85,11 @@ public abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, Tab
 
 	/**
 	 * Creates a {@link Scan} object and opens the {@link HTable} connection.
-	 * These are opened here because they are needed in the createInputSplits
+	 *
+	 * <p>These are opened here because they are needed in the createInputSplits
 	 * which is called before the openInputFormat method.
 	 *
-	 * The connection is opened in this method and closed in {@link #closeInputFormat()}.
+	 * <p>The connection is opened in this method and closed in {@link #closeInputFormat()}.
 	 *
 	 * @param parameters The configuration that is to be used
 	 * @see Configuration

http://git-wip-us.apache.org/repos/asf/flink/blob/43183ad2/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseRowInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseRowInputFormat.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseRowInputFormat.java
index fff2a9e..dde24f0 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseRowInputFormat.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseRowInputFormat.java
@@ -24,12 +24,13 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.types.Row;
+
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -98,7 +99,7 @@ public class HBaseRowInputFormat extends AbstractTableInputFormat<Row> implement
 	public void configure(Configuration parameters) {
 		LOG.info("Initializing HBase configuration.");
 		connectToTable();
-		if(table != null) {
+		if (table != null) {
 			scan = getScanner();
 		}
 
@@ -144,7 +145,7 @@ public class HBaseRowInputFormat extends AbstractTableInputFormat<Row> implement
 				int typeIdx = types[f][q];
 				// read value
 				byte[] value = res.getValue(familyKey, qualifier);
-				if(value != null) {
+				if (value != null) {
 					familyRow.setField(q, deserialize(value, typeIdx));
 				} else {
 					familyRow.setField(q, null);
@@ -164,10 +165,10 @@ public class HBaseRowInputFormat extends AbstractTableInputFormat<Row> implement
 		try {
 			Connection conn = ConnectionFactory.createConnection(conf);
 			super.table = (HTable) conn.getTable(TableName.valueOf(tableName));
-		} catch(TableNotFoundException tnfe) {
+		} catch (TableNotFoundException tnfe) {
 			LOG.error("The table " + tableName + " not found ", tnfe);
 			throw new RuntimeException("HBase table '" + tableName + "' not found.", tnfe);
-		} catch(IOException ioe) {
+		} catch (IOException ioe) {
 			LOG.error("Exception while creating connection to HBase.", ioe);
 			throw new RuntimeException("Cannot create connection to HBase.", ioe);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/43183ad2/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
index b6b3916..fee9fa9 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.addons.hbase;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -27,7 +28,7 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 
 /**
- * Helps to specify an HBase Table's schema
+ * Helps to specify an HBase Table's schema.
  */
 public class HBaseTableSchema implements Serializable {
 
@@ -52,7 +53,7 @@ public class HBaseTableSchema implements Serializable {
 
 		if (!HBaseRowInputFormat.isSupportedType(clazz)) {
 			// throw exception
-			throw new IllegalArgumentException("Unsupported class type found " + clazz+". " +
+			throw new IllegalArgumentException("Unsupported class type found " + clazz + ". " +
 				"Better to use byte[].class and deserialize using user defined scalar functions");
 		}
 
@@ -91,7 +92,7 @@ public class HBaseTableSchema implements Serializable {
 
 		byte[][] familyKeys = new byte[this.familyMap.size()][];
 		int i = 0;
-		for(String name : this.familyMap.keySet()) {
+		for (String name : this.familyMap.keySet()) {
 			familyKeys[i++] = name.getBytes(c);
 		}
 		return familyKeys;
@@ -135,7 +136,7 @@ public class HBaseTableSchema implements Serializable {
 
 		byte[][] qualifierKeys = new byte[qualifierMap.size()][];
 		int i = 0;
-		for(String name : qualifierMap.keySet()) {
+		for (String name : qualifierMap.keySet()) {
 			qualifierKeys[i++] = name.getBytes(c);
 		}
 		return qualifierKeys;

http://git-wip-us.apache.org/repos/asf/flink/blob/43183ad2/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
index f709212..cc7e602 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.addons.hbase;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -25,6 +26,7 @@ import org.apache.flink.table.sources.BatchTableSource;
 import org.apache.flink.table.sources.ProjectableTableSource;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
+
 import org.apache.hadoop.conf.Configuration;
 
 import java.util.Map;
@@ -32,12 +34,12 @@ import java.util.Map;
 /**
  * Creates a TableSource to scan an HBase table.
  *
- * The table name and required HBase configuration is passed during {@link HBaseTableSource} construction.
+ * <p>The table name and required HBase configuration is passed during {@link HBaseTableSource} construction.
  * Use {@link #addColumn(String, String, Class)} to specify the family, qualifier, and type of columns to scan.
  *
- * The TableSource returns {@link Row} with nested Rows for each column family.
+ * <p>The TableSource returns {@link Row} with nested Rows for each column family.
  *
- * The HBaseTableSource is used as shown in the example below.
+ * <p>The HBaseTableSource is used as shown in the example below.
  *
  * <pre>
  * {@code
@@ -112,10 +114,10 @@ public class HBaseTableSource implements BatchTableSource<Row>, ProjectableTable
 		String[] famNames = schema.getFamilyNames();
 		HBaseTableSource newTableSource = new HBaseTableSource(this.conf, tableName);
 		// Extract the family from the given fields
-		for(int field : fields) {
+		for (int field : fields) {
 			String family = famNames[field];
 			Map<String, TypeInformation<?>> familyInfo = schema.getFamilyInfo(family);
-			for(String qualifier : familyInfo.keySet()) {
+			for (String qualifier : familyInfo.keySet()) {
 				// create the newSchema
 				newTableSource.addColumn(family, qualifier, familyInfo.get(qualifier).getTypeClass());
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/43183ad2/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
index 6ea2d04..52fd012 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
@@ -15,17 +15,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.addons.hbase;
 
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.configuration.Configuration;
+
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 
-
 /**
  * {@link InputFormat} subclass that wraps the access for HTables.
  */
@@ -72,7 +73,7 @@ public abstract class TableInputFormat<T extends Tuple> extends AbstractTableInp
 	}
 
 	/**
-	 * Create an {@link HTable} instance and set it into this format
+	 * Create an {@link HTable} instance and set it into this format.
 	 */
 	private HTable createTable() {
 		LOG.info("Initializing HBaseConfiguration");

http://git-wip-us.apache.org/repos/asf/flink/blob/43183ad2/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
index 75f0b9b..d265bd4 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
@@ -28,7 +28,7 @@ public class TableInputSplit extends LocatableInputSplit {
 
 	private static final long serialVersionUID = 1L;
 
-	/** The name of the table to retrieve data from */
+	/** The name of the table to retrieve data from. */
 	private final byte[] tableName;
 
 	/** The start row of the split. */
@@ -38,8 +38,8 @@ public class TableInputSplit extends LocatableInputSplit {
 	private final byte[] endRow;
 
 	/**
-	 * Creates a new table input split
-	 * 
+	 * Creates a new table input split.
+	 *
 	 * @param splitNumber
 	 *        the number of the input split
 	 * @param hostnames
@@ -62,7 +62,7 @@ public class TableInputSplit extends LocatableInputSplit {
 
 	/**
 	 * Returns the table name.
-	 * 
+	 *
 	 * @return The table name.
 	 */
 	public byte[] getTableName() {
@@ -71,7 +71,7 @@ public class TableInputSplit extends LocatableInputSplit {
 
 	/**
 	 * Returns the start row.
-	 * 
+	 *
 	 * @return The start row.
 	 */
 	public byte[] getStartRow() {
@@ -80,7 +80,7 @@ public class TableInputSplit extends LocatableInputSplit {
 
 	/**
 	 * Returns the end row.
-	 * 
+	 *
 	 * @return The end row.
 	 */
 	public byte[] getEndRow() {

http://git-wip-us.apache.org/repos/asf/flink/blob/43183ad2/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
index 33bbe12..5d71ca5 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
@@ -17,6 +17,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.addons.hbase;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
@@ -34,6 +35,7 @@ import org.apache.flink.table.api.java.BatchTableEnvironment;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.Row;
+
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
@@ -55,7 +57,7 @@ import static org.junit.Assert.assertEquals;
  * - TableInputFormat
  * - HBaseTableSource
  *
- * These tests are located in a single test file to avoided unnecessary initializations of the
+ * <p>These tests are located in a single test file to avoided unnecessary initializations of the
  * HBaseTestingCluster which takes about half a minute.
  *
  */
@@ -290,6 +292,9 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
 		TestBaseUtils.compareResultAsText(results, expected);
 	}
 
+	/**
+	 * A {@link ScalarFunction} that maps byte arrays to UTF-8 strings.
+	 */
 	public static class ToUTF8 extends ScalarFunction {
 
 		public String eval(byte[] bytes) {
@@ -297,6 +302,9 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
 		}
 	}
 
+	/**
+	 * A {@link ScalarFunction} that maps byte array to longs.
+	 */
 	public static class ToLong extends ScalarFunction {
 
 		public long eval(byte[] bytes) {
@@ -342,16 +350,15 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
 		List<Tuple1<Integer>> resultSet = result.collect();
 
 		assertEquals(1, resultSet.size());
-		assertEquals(360, (int)resultSet.get(0).f0);
+		assertEquals(360, (int) resultSet.get(0).f0);
 	}
 
-
 	/**
 	 * Allows the tests to use {@link ExecutionEnvironment#getExecutionEnvironment()} but with a
 	 * configuration that limits the maximum memory used for network buffers since the current
 	 * defaults are too high for Travis-CI.
 	 */
-	private static abstract class LimitNetworkBuffersTestEnvironment extends ExecutionEnvironment {
+	private abstract static class LimitNetworkBuffersTestEnvironment extends ExecutionEnvironment {
 
 		public static void setAsContext() {
 			Configuration config = new Configuration();

http://git-wip-us.apache.org/repos/asf/flink/blob/43183ad2/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java
index 727a5b1..e4b2bd2 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java
@@ -20,10 +20,11 @@
 
 package org.apache.flink.addons.hbase;
 
+import org.apache.flink.util.TestLogger;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
-import org.apache.flink.util.TestLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -95,7 +96,7 @@ public class HBaseTestingClusterAutostarter extends TestLogger implements Serial
 
 		assertNotNull("HBaseAdmin is not initialized successfully.", admin);
 		HTableDescriptor desc = new HTableDescriptor(tableName);
-		for(byte[] fam : columnFamilyName) {
+		for (byte[] fam : columnFamilyName) {
 			HColumnDescriptor colDef = new HColumnDescriptor(fam);
 			desc.addFamily(colDef);
 		}
@@ -195,6 +196,7 @@ public class HBaseTestingClusterAutostarter extends TestLogger implements Serial
 	public static Configuration getConf() {
 		return conf;
 	}
+
 	private static void createHBaseSiteXml(File hbaseSiteXmlDirectory, String zookeeperQuorum) {
 		hbaseSiteXmlFile = new File(hbaseSiteXmlDirectory, "hbase-site.xml");
 		// Create the hbase-site.xml file for this run.

http://git-wip-us.apache.org/repos/asf/flink/blob/43183ad2/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
index f56295e..57224c2 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
@@ -20,11 +20,11 @@ package org.apache.flink.addons.hbase.example;
 
 import org.apache.flink.configuration.ConfigConstants;
 
-public class HBaseFlinkTestConstants {
-	
-	public static final byte[] CF_SOME = "someCf".getBytes(ConfigConstants.DEFAULT_CHARSET);
-	public static final byte[] Q_SOME = "someQual".getBytes(ConfigConstants.DEFAULT_CHARSET);
-	public static final String TEST_TABLE_NAME = "test-table";
-	public static final String TMP_DIR = "/tmp/test";
-	
+class HBaseFlinkTestConstants {
+
+	static final byte[] CF_SOME = "someCf".getBytes(ConfigConstants.DEFAULT_CHARSET);
+	static final byte[] Q_SOME = "someQual".getBytes(ConfigConstants.DEFAULT_CHARSET);
+	static final String TEST_TABLE_NAME = "test-table";
+	static final String TMP_DIR = "/tmp/test";
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/43183ad2/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
index dccf876..817ae09 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
@@ -23,31 +23,32 @@ import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
+
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
  * Simple stub for HBase DataSet read
- * 
- * To run the test first create the test table with hbase shell.
- * 
- * Use the following commands:
+ *
+ * <p>To run the test first create the test table with hbase shell.
+ *
+ * <p>Use the following commands:
  * <ul>
  *     <li>create 'test-table', 'someCf'</li>
  *     <li>put 'test-table', '1', 'someCf:someQual', 'someString'</li>
  *     <li>put 'test-table', '2', 'someCf:someQual', 'anotherString'</li>
  * </ul>
- * 
- * The test should return just the first entry.
- * 
+ *
+ * <p>The test should return just the first entry.
+ *
  */
 public class HBaseReadExample {
 	public static void main(String[] args) throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		@SuppressWarnings("serial")
 		DataSet<Tuple2<String, String>> hbaseDs = env.createInput(new TableInputFormat<Tuple2<String, String>>() {
-			
+
 				@Override
 				public String getTableName() {
 					return HBaseFlinkTestConstants.TEST_TABLE_NAME;
@@ -61,7 +62,7 @@ public class HBaseReadExample {
 				}
 
 				private Tuple2<String, String> reuse = new Tuple2<String, String>();
-				
+
 				@Override
 				protected Tuple2<String, String> mapResultToTuple(Result r) {
 					String key = Bytes.toString(r.getRow());
@@ -71,22 +72,23 @@ public class HBaseReadExample {
 					return reuse;
 				}
 		})
-		.filter(new FilterFunction<Tuple2<String,String>>() {
+		.filter(new FilterFunction<Tuple2<String, String>>() {
 
 			@Override
 			public boolean filter(Tuple2<String, String> t) throws Exception {
 				String val = t.getField(1);
-				if(val.startsWith("someStr"))
+				if (val.startsWith("someStr")) {
 					return true;
+				}
 				return false;
 			}
 		});
-		
+
 		hbaseDs.print();
-		
+
 		// kick off execution.
 		env.execute();
-				
+
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/43183ad2/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
index 64d20c3..ca82392 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Collector;
+
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
@@ -36,36 +37,36 @@ import org.apache.hadoop.mapreduce.Job;
 
 /**
  * Simple stub for HBase DataSet write
- * 
- * To run the test first create the test table with hbase shell.
- * 
- * Use the following commands:
+ *
+ * <p>To run the test first create the test table with hbase shell.
+ *
+ * <p>Use the following commands:
  * <ul>
  *     <li>create 'test-table', 'someCf'</li>
  * </ul>
- * 
+ *
  */
 @SuppressWarnings("serial")
 public class HBaseWriteExample {
-	
+
 	// *************************************************************************
 	//     PROGRAM
 	// *************************************************************************
-	
+
 	public static void main(String[] args) throws Exception {
 
-		if(!parseParameters(args)) {
+		if (!parseParameters(args)) {
 			return;
 		}
-		
+
 		// set up the execution environment
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
+
 		// get input data
 		DataSet<String> text = getTextDataSet(env);
-		
-		DataSet<Tuple2<String, Integer>> counts = 
-				// split up the lines in pairs (2-tuples) containing: (word,1)
+
+		DataSet<Tuple2<String, Integer>> counts =
+				// split up the lines in pairs (2-tuples) containing: (word, 1)
 				text.flatMap(new Tokenizer())
 				// group by the tuple field "0" and sum up tuple field "1"
 				.groupBy(0)
@@ -75,8 +76,8 @@ public class HBaseWriteExample {
 		Job job = Job.getInstance();
 		job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, outputTableName);
 		// TODO is "mapred.output.dir" really useful?
-		job.getConfiguration().set("mapred.output.dir",HBaseFlinkTestConstants.TMP_DIR);
-		counts.map(new RichMapFunction <Tuple2<String,Integer>, Tuple2<Text,Mutation>>() {
+		job.getConfiguration().set("mapred.output.dir", HBaseFlinkTestConstants.TMP_DIR);
+		counts.map(new RichMapFunction <Tuple2<String, Integer>, Tuple2<Text, Mutation>>() {
 			private transient Tuple2<Text, Mutation> reuse;
 
 			@Override
@@ -89,24 +90,24 @@ public class HBaseWriteExample {
 			public Tuple2<Text, Mutation> map(Tuple2<String, Integer> t) throws Exception {
 				reuse.f0 = new Text(t.f0);
 				Put put = new Put(t.f0.getBytes(ConfigConstants.DEFAULT_CHARSET));
-				put.add(HBaseFlinkTestConstants.CF_SOME,HBaseFlinkTestConstants.Q_SOME, Bytes.toBytes(t.f1));
+				put.add(HBaseFlinkTestConstants.CF_SOME, HBaseFlinkTestConstants.Q_SOME, Bytes.toBytes(t.f1));
 				reuse.f1 = put;
 				return reuse;
 			}
 		}).output(new HadoopOutputFormat<Text, Mutation>(new TableOutputFormat<Text>(), job));
-		
+
 		// execute program
 		env.execute("WordCount (HBase sink) Example");
 	}
-	
+
 	// *************************************************************************
 	//     USER FUNCTIONS
 	// *************************************************************************
-	
+
 	/**
 	 * Implements the string tokenizer that splits sentences into words as a user-defined
-	 * FlatMapFunction. The function takes a line (String) and splits it into 
-	 * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
+	 * FlatMapFunction. The function takes a line (String) and splits it into
+	 * multiple pairs in the form of "(word, 1)" (Tuple2&lt;String, Integer&gt;).
 	 */
 	public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
 
@@ -114,7 +115,7 @@ public class HBaseWriteExample {
 		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
 			// normalize and split the line
 			String[] tokens = value.toLowerCase().split("\\W+");
-			
+
 			// emit the pairs
 			for (String token : tokens) {
 				if (token.length() > 0) {
@@ -123,20 +124,20 @@ public class HBaseWriteExample {
 			}
 		}
 	}
-	
+
 	// *************************************************************************
 	//     UTIL METHODS
 	// *************************************************************************
 	private static boolean fileOutput = false;
 	private static String textPath;
 	private static String outputTableName = HBaseFlinkTestConstants.TEST_TABLE_NAME;
-	
+
 	private static boolean parseParameters(String[] args) {
-		
-		if(args.length > 0) {
+
+		if (args.length > 0) {
 			// parse input arguments
 			fileOutput = true;
-			if(args.length == 2) {
+			if (args.length == 2) {
 				textPath = args[0];
 				outputTableName = args[1];
 			} else {
@@ -150,9 +151,9 @@ public class HBaseWriteExample {
 		}
 		return true;
 	}
-	
+
 	private static DataSet<String> getTextDataSet(ExecutionEnvironment env) {
-		if(fileOutput) {
+		if (fileOutput) {
 			// read the text file from given input path
 			return env.readTextFile(textPath);
 		} else {
@@ -160,9 +161,11 @@ public class HBaseWriteExample {
 			return getDefaultTextLineDataSet(env);
 		}
 	}
+
 	private static DataSet<String> getDefaultTextLineDataSet(ExecutionEnvironment env) {
 		return env.fromElements(WORDS);
 	}
+
 	private static final String[] WORDS = new String[] {
 		"To be, or not to be,--that is the question:--",
 		"Whether 'tis nobler in the mind to suffer",

http://git-wip-us.apache.org/repos/asf/flink/blob/43183ad2/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
index 05398db..1ed471d 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
@@ -17,27 +17,26 @@
 
 package org.apache.flink.addons.hbase.example;
 
-import java.io.IOException;
-
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.util.Bytes;
 
+import java.io.IOException;
+
 /**
- * 
  * This is an example how to write streams into HBase. In this example the
  * stream will be written into a local Hbase but it is possible to adapt this
  * example for an HBase running in a cloud. You need a running local HBase with a
  * table "flinkExample" and a column "entry". If your HBase configuration does
  * not fit the hbase-site.xml in the resource folder then you gave to delete temporary this
  * hbase-site.xml to execute the example properly.
- * 
  */
 public class HBaseWriteStreamExample {
 
@@ -70,9 +69,7 @@ public class HBaseWriteStreamExample {
 	}
 
 	/**
-	 * 
-	 * This class implements an OutputFormat for HBase
-	 *
+	 * This class implements an OutputFormat for HBase.
 	 */
 	private static class HBaseOutputFormat implements OutputFormat<String> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/43183ad2/flink-connectors/flink-hbase/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/resources/log4j-test.properties b/flink-connectors/flink-hbase/src/test/resources/log4j-test.properties
index 804ff45..25dd575 100644
--- a/flink-connectors/flink-hbase/src/test/resources/log4j-test.properties
+++ b/flink-connectors/flink-hbase/src/test/resources/log4j-test.properties
@@ -5,9 +5,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 # http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY