You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/28 06:17:35 UTC
[05/21] flink git commit: [FLINK-6711] Activate strict checkstyle for
flink-hbase
[FLINK-6711] Activate strict checkstyle for flink-hbase
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/43183ad2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/43183ad2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/43183ad2
Branch: refs/heads/master
Commit: 43183ad2ca6326dc6021d4f880f5095454c6952d
Parents: 23920bb
Author: zentol <ch...@apache.org>
Authored: Wed May 24 23:12:36 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sun May 28 00:11:09 2017 +0200
----------------------------------------------------------------------
.../addons/hbase/AbstractTableInputFormat.java | 17 +++---
.../flink/addons/hbase/HBaseRowInputFormat.java | 11 ++--
.../flink/addons/hbase/HBaseTableSchema.java | 9 +--
.../flink/addons/hbase/HBaseTableSource.java | 12 ++--
.../flink/addons/hbase/TableInputFormat.java | 5 +-
.../flink/addons/hbase/TableInputSplit.java | 12 ++--
.../addons/hbase/HBaseConnectorITCase.java | 15 +++--
.../hbase/HBaseTestingClusterAutostarter.java | 6 +-
.../hbase/example/HBaseFlinkTestConstants.java | 14 ++---
.../addons/hbase/example/HBaseReadExample.java | 30 +++++-----
.../addons/hbase/example/HBaseWriteExample.java | 61 ++++++++++----------
.../hbase/example/HBaseWriteStreamExample.java | 11 ++--
.../src/test/resources/log4j-test.properties | 4 +-
13 files changed, 113 insertions(+), 94 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/43183ad2/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/AbstractTableInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/AbstractTableInputFormat.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/AbstractTableInputFormat.java
index 59ba5b1f..73a21b3 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/AbstractTableInputFormat.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/AbstractTableInputFormat.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.addons.hbase;
import org.apache.flink.api.common.io.InputFormat;
@@ -23,6 +24,7 @@ import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplitAssigner;
+
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -32,7 +34,6 @@ import org.apache.hadoop.hbase.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -42,7 +43,7 @@ import java.util.List;
*/
public abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, TableInputSplit> {
- protected static Logger LOG = LoggerFactory.getLogger(AbstractTableInputFormat.class);
+ protected static final Logger LOG = LoggerFactory.getLogger(AbstractTableInputFormat.class);
// helper variable to decide whether the input is exhausted or not
protected boolean endReached = false;
@@ -50,7 +51,7 @@ public abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, Tab
protected transient HTable table = null;
protected transient Scan scan = null;
- /** HBase iterator wrapper */
+ /** HBase iterator wrapper. */
protected ResultScanner resultScanner = null;
protected byte[] currentRow;
@@ -65,7 +66,8 @@ public abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, Tab
/**
* What table is to be read.
- * Per instance of a TableInputFormat derivative only a single table name is possible.
+ *
+ * <p>Per instance of a TableInputFormat derivative only a single table name is possible.
*
* @return The name of the table
*/
@@ -74,7 +76,7 @@ public abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, Tab
/**
* HBase returns an instance of {@link Result}.
*
- * This method maps the returned {@link Result} instance into the output type {@link T}.
+ * <p>This method maps the returned {@link Result} instance into the output type {@link T}.
*
* @param r The Result instance from HBase that needs to be converted
* @return The appropriate instance of {@link T} that contains the data of Result.
@@ -83,10 +85,11 @@ public abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, Tab
/**
* Creates a {@link Scan} object and opens the {@link HTable} connection.
- * These are opened here because they are needed in the createInputSplits
+ *
+ * <p>These are opened here because they are needed in the createInputSplits
* which is called before the openInputFormat method.
*
- * The connection is opened in this method and closed in {@link #closeInputFormat()}.
+ * <p>The connection is opened in this method and closed in {@link #closeInputFormat()}.
*
* @param parameters The configuration that is to be used
* @see Configuration
http://git-wip-us.apache.org/repos/asf/flink/blob/43183ad2/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseRowInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseRowInputFormat.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseRowInputFormat.java
index fff2a9e..dde24f0 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseRowInputFormat.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseRowInputFormat.java
@@ -24,12 +24,13 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.types.Row;
+
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
@@ -98,7 +99,7 @@ public class HBaseRowInputFormat extends AbstractTableInputFormat<Row> implement
public void configure(Configuration parameters) {
LOG.info("Initializing HBase configuration.");
connectToTable();
- if(table != null) {
+ if (table != null) {
scan = getScanner();
}
@@ -144,7 +145,7 @@ public class HBaseRowInputFormat extends AbstractTableInputFormat<Row> implement
int typeIdx = types[f][q];
// read value
byte[] value = res.getValue(familyKey, qualifier);
- if(value != null) {
+ if (value != null) {
familyRow.setField(q, deserialize(value, typeIdx));
} else {
familyRow.setField(q, null);
@@ -164,10 +165,10 @@ public class HBaseRowInputFormat extends AbstractTableInputFormat<Row> implement
try {
Connection conn = ConnectionFactory.createConnection(conf);
super.table = (HTable) conn.getTable(TableName.valueOf(tableName));
- } catch(TableNotFoundException tnfe) {
+ } catch (TableNotFoundException tnfe) {
LOG.error("The table " + tableName + " not found ", tnfe);
throw new RuntimeException("HBase table '" + tableName + "' not found.", tnfe);
- } catch(IOException ioe) {
+ } catch (IOException ioe) {
LOG.error("Exception while creating connection to HBase.", ioe);
throw new RuntimeException("Cannot create connection to HBase.", ioe);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/43183ad2/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
index b6b3916..fee9fa9 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.addons.hbase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -27,7 +28,7 @@ import java.util.LinkedHashMap;
import java.util.Map;
/**
- * Helps to specify an HBase Table's schema
+ * Helps to specify an HBase Table's schema.
*/
public class HBaseTableSchema implements Serializable {
@@ -52,7 +53,7 @@ public class HBaseTableSchema implements Serializable {
if (!HBaseRowInputFormat.isSupportedType(clazz)) {
// throw exception
- throw new IllegalArgumentException("Unsupported class type found " + clazz+". " +
+ throw new IllegalArgumentException("Unsupported class type found " + clazz + ". " +
"Better to use byte[].class and deserialize using user defined scalar functions");
}
@@ -91,7 +92,7 @@ public class HBaseTableSchema implements Serializable {
byte[][] familyKeys = new byte[this.familyMap.size()][];
int i = 0;
- for(String name : this.familyMap.keySet()) {
+ for (String name : this.familyMap.keySet()) {
familyKeys[i++] = name.getBytes(c);
}
return familyKeys;
@@ -135,7 +136,7 @@ public class HBaseTableSchema implements Serializable {
byte[][] qualifierKeys = new byte[qualifierMap.size()][];
int i = 0;
- for(String name : qualifierMap.keySet()) {
+ for (String name : qualifierMap.keySet()) {
qualifierKeys[i++] = name.getBytes(c);
}
return qualifierKeys;
http://git-wip-us.apache.org/repos/asf/flink/blob/43183ad2/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
index f709212..cc7e602 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.addons.hbase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -25,6 +26,7 @@ import org.apache.flink.table.sources.BatchTableSource;
import org.apache.flink.table.sources.ProjectableTableSource;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
+
import org.apache.hadoop.conf.Configuration;
import java.util.Map;
@@ -32,12 +34,12 @@ import java.util.Map;
/**
* Creates a TableSource to scan an HBase table.
*
- * The table name and required HBase configuration is passed during {@link HBaseTableSource} construction.
+ * <p>The table name and required HBase configuration is passed during {@link HBaseTableSource} construction.
* Use {@link #addColumn(String, String, Class)} to specify the family, qualifier, and type of columns to scan.
*
- * The TableSource returns {@link Row} with nested Rows for each column family.
+ * <p>The TableSource returns {@link Row} with nested Rows for each column family.
*
- * The HBaseTableSource is used as shown in the example below.
+ * <p>The HBaseTableSource is used as shown in the example below.
*
* <pre>
* {@code
@@ -112,10 +114,10 @@ public class HBaseTableSource implements BatchTableSource<Row>, ProjectableTable
String[] famNames = schema.getFamilyNames();
HBaseTableSource newTableSource = new HBaseTableSource(this.conf, tableName);
// Extract the family from the given fields
- for(int field : fields) {
+ for (int field : fields) {
String family = famNames[field];
Map<String, TypeInformation<?>> familyInfo = schema.getFamilyInfo(family);
- for(String qualifier : familyInfo.keySet()) {
+ for (String qualifier : familyInfo.keySet()) {
// create the newSchema
newTableSource.addColumn(family, qualifier, familyInfo.get(qualifier).getTypeClass());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/43183ad2/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
index 6ea2d04..52fd012 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
@@ -15,17 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.addons.hbase;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
+
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
-
/**
* {@link InputFormat} subclass that wraps the access for HTables.
*/
@@ -72,7 +73,7 @@ public abstract class TableInputFormat<T extends Tuple> extends AbstractTableInp
}
/**
- * Create an {@link HTable} instance and set it into this format
+ * Create an {@link HTable} instance and set it into this format.
*/
private HTable createTable() {
LOG.info("Initializing HBaseConfiguration");
http://git-wip-us.apache.org/repos/asf/flink/blob/43183ad2/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
index 75f0b9b..d265bd4 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
@@ -28,7 +28,7 @@ public class TableInputSplit extends LocatableInputSplit {
private static final long serialVersionUID = 1L;
- /** The name of the table to retrieve data from */
+ /** The name of the table to retrieve data from. */
private final byte[] tableName;
/** The start row of the split. */
@@ -38,8 +38,8 @@ public class TableInputSplit extends LocatableInputSplit {
private final byte[] endRow;
/**
- * Creates a new table input split
- *
+ * Creates a new table input split.
+ *
* @param splitNumber
* the number of the input split
* @param hostnames
@@ -62,7 +62,7 @@ public class TableInputSplit extends LocatableInputSplit {
/**
* Returns the table name.
- *
+ *
* @return The table name.
*/
public byte[] getTableName() {
@@ -71,7 +71,7 @@ public class TableInputSplit extends LocatableInputSplit {
/**
* Returns the start row.
- *
+ *
* @return The start row.
*/
public byte[] getStartRow() {
@@ -80,7 +80,7 @@ public class TableInputSplit extends LocatableInputSplit {
/**
* Returns the end row.
- *
+ *
* @return The end row.
*/
public byte[] getEndRow() {
http://git-wip-us.apache.org/repos/asf/flink/blob/43183ad2/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
index 33bbe12..5d71ca5 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
@@ -17,6 +17,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.addons.hbase;
import org.apache.flink.api.common.functions.ReduceFunction;
@@ -34,6 +35,7 @@ import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.Row;
+
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
@@ -55,7 +57,7 @@ import static org.junit.Assert.assertEquals;
* - TableInputFormat
* - HBaseTableSource
*
- * These tests are located in a single test file to avoided unnecessary initializations of the
+ * <p>These tests are located in a single test file to avoided unnecessary initializations of the
* HBaseTestingCluster which takes about half a minute.
*
*/
@@ -290,6 +292,9 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
TestBaseUtils.compareResultAsText(results, expected);
}
+ /**
+ * A {@link ScalarFunction} that maps byte arrays to UTF-8 strings.
+ */
public static class ToUTF8 extends ScalarFunction {
public String eval(byte[] bytes) {
@@ -297,6 +302,9 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
}
}
+ /**
+ * A {@link ScalarFunction} that maps byte array to longs.
+ */
public static class ToLong extends ScalarFunction {
public long eval(byte[] bytes) {
@@ -342,16 +350,15 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
List<Tuple1<Integer>> resultSet = result.collect();
assertEquals(1, resultSet.size());
- assertEquals(360, (int)resultSet.get(0).f0);
+ assertEquals(360, (int) resultSet.get(0).f0);
}
-
/**
* Allows the tests to use {@link ExecutionEnvironment#getExecutionEnvironment()} but with a
* configuration that limits the maximum memory used for network buffers since the current
* defaults are too high for Travis-CI.
*/
- private static abstract class LimitNetworkBuffersTestEnvironment extends ExecutionEnvironment {
+ private abstract static class LimitNetworkBuffersTestEnvironment extends ExecutionEnvironment {
public static void setAsContext() {
Configuration config = new Configuration();
http://git-wip-us.apache.org/repos/asf/flink/blob/43183ad2/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java
index 727a5b1..e4b2bd2 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java
@@ -20,10 +20,11 @@
package org.apache.flink.addons.hbase;
+import org.apache.flink.util.TestLogger;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
-import org.apache.flink.util.TestLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -95,7 +96,7 @@ public class HBaseTestingClusterAutostarter extends TestLogger implements Serial
assertNotNull("HBaseAdmin is not initialized successfully.", admin);
HTableDescriptor desc = new HTableDescriptor(tableName);
- for(byte[] fam : columnFamilyName) {
+ for (byte[] fam : columnFamilyName) {
HColumnDescriptor colDef = new HColumnDescriptor(fam);
desc.addFamily(colDef);
}
@@ -195,6 +196,7 @@ public class HBaseTestingClusterAutostarter extends TestLogger implements Serial
public static Configuration getConf() {
return conf;
}
+
private static void createHBaseSiteXml(File hbaseSiteXmlDirectory, String zookeeperQuorum) {
hbaseSiteXmlFile = new File(hbaseSiteXmlDirectory, "hbase-site.xml");
// Create the hbase-site.xml file for this run.
http://git-wip-us.apache.org/repos/asf/flink/blob/43183ad2/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
index f56295e..57224c2 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
@@ -20,11 +20,11 @@ package org.apache.flink.addons.hbase.example;
import org.apache.flink.configuration.ConfigConstants;
-public class HBaseFlinkTestConstants {
-
- public static final byte[] CF_SOME = "someCf".getBytes(ConfigConstants.DEFAULT_CHARSET);
- public static final byte[] Q_SOME = "someQual".getBytes(ConfigConstants.DEFAULT_CHARSET);
- public static final String TEST_TABLE_NAME = "test-table";
- public static final String TMP_DIR = "/tmp/test";
-
+class HBaseFlinkTestConstants {
+
+ static final byte[] CF_SOME = "someCf".getBytes(ConfigConstants.DEFAULT_CHARSET);
+ static final byte[] Q_SOME = "someQual".getBytes(ConfigConstants.DEFAULT_CHARSET);
+ static final String TEST_TABLE_NAME = "test-table";
+ static final String TMP_DIR = "/tmp/test";
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/43183ad2/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
index dccf876..817ae09 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
@@ -23,31 +23,32 @@ import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
+
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
/**
* Simple stub for HBase DataSet read
- *
- * To run the test first create the test table with hbase shell.
- *
- * Use the following commands:
+ *
+ * <p>To run the test first create the test table with hbase shell.
+ *
+ * <p>Use the following commands:
* <ul>
* <li>create 'test-table', 'someCf'</li>
* <li>put 'test-table', '1', 'someCf:someQual', 'someString'</li>
* <li>put 'test-table', '2', 'someCf:someQual', 'anotherString'</li>
* </ul>
- *
- * The test should return just the first entry.
- *
+ *
+ * <p>The test should return just the first entry.
+ *
*/
public class HBaseReadExample {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@SuppressWarnings("serial")
DataSet<Tuple2<String, String>> hbaseDs = env.createInput(new TableInputFormat<Tuple2<String, String>>() {
-
+
@Override
public String getTableName() {
return HBaseFlinkTestConstants.TEST_TABLE_NAME;
@@ -61,7 +62,7 @@ public class HBaseReadExample {
}
private Tuple2<String, String> reuse = new Tuple2<String, String>();
-
+
@Override
protected Tuple2<String, String> mapResultToTuple(Result r) {
String key = Bytes.toString(r.getRow());
@@ -71,22 +72,23 @@ public class HBaseReadExample {
return reuse;
}
})
- .filter(new FilterFunction<Tuple2<String,String>>() {
+ .filter(new FilterFunction<Tuple2<String, String>>() {
@Override
public boolean filter(Tuple2<String, String> t) throws Exception {
String val = t.getField(1);
- if(val.startsWith("someStr"))
+ if (val.startsWith("someStr")) {
return true;
+ }
return false;
}
});
-
+
hbaseDs.print();
-
+
// kick off execution.
env.execute();
-
+
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/43183ad2/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
index 64d20c3..ca82392 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
+
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
@@ -36,36 +37,36 @@ import org.apache.hadoop.mapreduce.Job;
/**
* Simple stub for HBase DataSet write
- *
- * To run the test first create the test table with hbase shell.
- *
- * Use the following commands:
+ *
+ * <p>To run the test first create the test table with hbase shell.
+ *
+ * <p>Use the following commands:
* <ul>
* <li>create 'test-table', 'someCf'</li>
* </ul>
- *
+ *
*/
@SuppressWarnings("serial")
public class HBaseWriteExample {
-
+
// *************************************************************************
// PROGRAM
// *************************************************************************
-
+
public static void main(String[] args) throws Exception {
- if(!parseParameters(args)) {
+ if (!parseParameters(args)) {
return;
}
-
+
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+
// get input data
DataSet<String> text = getTextDataSet(env);
-
- DataSet<Tuple2<String, Integer>> counts =
- // split up the lines in pairs (2-tuples) containing: (word,1)
+
+ DataSet<Tuple2<String, Integer>> counts =
+ // split up the lines in pairs (2-tuples) containing: (word, 1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0)
@@ -75,8 +76,8 @@ public class HBaseWriteExample {
Job job = Job.getInstance();
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, outputTableName);
// TODO is "mapred.output.dir" really useful?
- job.getConfiguration().set("mapred.output.dir",HBaseFlinkTestConstants.TMP_DIR);
- counts.map(new RichMapFunction <Tuple2<String,Integer>, Tuple2<Text,Mutation>>() {
+ job.getConfiguration().set("mapred.output.dir", HBaseFlinkTestConstants.TMP_DIR);
+ counts.map(new RichMapFunction <Tuple2<String, Integer>, Tuple2<Text, Mutation>>() {
private transient Tuple2<Text, Mutation> reuse;
@Override
@@ -89,24 +90,24 @@ public class HBaseWriteExample {
public Tuple2<Text, Mutation> map(Tuple2<String, Integer> t) throws Exception {
reuse.f0 = new Text(t.f0);
Put put = new Put(t.f0.getBytes(ConfigConstants.DEFAULT_CHARSET));
- put.add(HBaseFlinkTestConstants.CF_SOME,HBaseFlinkTestConstants.Q_SOME, Bytes.toBytes(t.f1));
+ put.add(HBaseFlinkTestConstants.CF_SOME, HBaseFlinkTestConstants.Q_SOME, Bytes.toBytes(t.f1));
reuse.f1 = put;
return reuse;
}
}).output(new HadoopOutputFormat<Text, Mutation>(new TableOutputFormat<Text>(), job));
-
+
// execute program
env.execute("WordCount (HBase sink) Example");
}
-
+
// *************************************************************************
// USER FUNCTIONS
// *************************************************************************
-
+
/**
* Implements the string tokenizer that splits sentences into words as a user-defined
- * FlatMapFunction. The function takes a line (String) and splits it into
- * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
+ * FlatMapFunction. The function takes a line (String) and splits it into
+ * multiple pairs in the form of "(word, 1)" (Tuple2<String, Integer>).
*/
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@@ -114,7 +115,7 @@ public class HBaseWriteExample {
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");
-
+
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
@@ -123,20 +124,20 @@ public class HBaseWriteExample {
}
}
}
-
+
// *************************************************************************
// UTIL METHODS
// *************************************************************************
private static boolean fileOutput = false;
private static String textPath;
private static String outputTableName = HBaseFlinkTestConstants.TEST_TABLE_NAME;
-
+
private static boolean parseParameters(String[] args) {
-
- if(args.length > 0) {
+
+ if (args.length > 0) {
// parse input arguments
fileOutput = true;
- if(args.length == 2) {
+ if (args.length == 2) {
textPath = args[0];
outputTableName = args[1];
} else {
@@ -150,9 +151,9 @@ public class HBaseWriteExample {
}
return true;
}
-
+
private static DataSet<String> getTextDataSet(ExecutionEnvironment env) {
- if(fileOutput) {
+ if (fileOutput) {
// read the text file from given input path
return env.readTextFile(textPath);
} else {
@@ -160,9 +161,11 @@ public class HBaseWriteExample {
return getDefaultTextLineDataSet(env);
}
}
+
private static DataSet<String> getDefaultTextLineDataSet(ExecutionEnvironment env) {
return env.fromElements(WORDS);
}
+
private static final String[] WORDS = new String[] {
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
http://git-wip-us.apache.org/repos/asf/flink/blob/43183ad2/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
index 05398db..1ed471d 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
@@ -17,27 +17,26 @@
package org.apache.flink.addons.hbase.example;
-import java.io.IOException;
-
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
+import java.io.IOException;
+
/**
- *
* This is an example how to write streams into HBase. In this example the
* stream will be written into a local Hbase but it is possible to adapt this
* example for an HBase running in a cloud. You need a running local HBase with a
* table "flinkExample" and a column "entry". If your HBase configuration does
* not fit the hbase-site.xml in the resource folder then you gave to delete temporary this
* hbase-site.xml to execute the example properly.
- *
*/
public class HBaseWriteStreamExample {
@@ -70,9 +69,7 @@ public class HBaseWriteStreamExample {
}
/**
- *
- * This class implements an OutputFormat for HBase
- *
+ * This class implements an OutputFormat for HBase.
*/
private static class HBaseOutputFormat implements OutputFormat<String> {
http://git-wip-us.apache.org/repos/asf/flink/blob/43183ad2/flink-connectors/flink-hbase/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/resources/log4j-test.properties b/flink-connectors/flink-hbase/src/test/resources/log4j-test.properties
index 804ff45..25dd575 100644
--- a/flink-connectors/flink-hbase/src/test/resources/log4j-test.properties
+++ b/flink-connectors/flink-hbase/src/test/resources/log4j-test.properties
@@ -5,9 +5,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY