You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/02/15 09:41:26 UTC
[2/2] flink git commit: [FLINK-2168] Refactor HBaseTableSource and
extend tests.
[FLINK-2168] Refactor HBaseTableSource and extend tests.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/87d09342
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/87d09342
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/87d09342
Branch: refs/heads/master
Commit: 87d09342bf5eef24ec22197284b16879c83e34dd
Parents: 58d4513
Author: Fabian Hueske <fh...@apache.org>
Authored: Mon Feb 6 23:36:05 2017 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Feb 15 10:24:14 2017 +0100
----------------------------------------------------------------------
flink-connectors/flink-hbase/pom.xml | 1 +
.../addons/hbase/AbstractTableInputFormat.java | 92 ++---
.../flink/addons/hbase/HBaseRowInputFormat.java | 263 +++++++++++++++
.../flink/addons/hbase/HBaseTableSchema.java | 174 ++++++----
.../flink/addons/hbase/HBaseTableSource.java | 44 ++-
.../hbase/HBaseTableSourceInputFormat.java | 144 --------
.../flink/addons/hbase/TableInputFormat.java | 6 +-
.../addons/hbase/HBaseConnectorITCase.java | 338 +++++++++++++++++++
.../addons/hbase/TableInputFormatITCase.java | 122 -------
.../addons/hbase/example/HBaseReadExample.java | 2 +-
.../hbase/example/HBaseTableSourceITCase.java | 196 -----------
11 files changed, 803 insertions(+), 579 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/87d09342/flink-connectors/flink-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/pom.xml b/flink-connectors/flink-hbase/pom.xml
index 14e2b59..96688f3 100644
--- a/flink-connectors/flink-hbase/pom.xml
+++ b/flink-connectors/flink-hbase/pom.xml
@@ -44,6 +44,7 @@ under the License.
<artifactId>maven-surefire-plugin</artifactId>
<version>2.19.1</version>
<configuration>
+ <argLine>-XX:MaxPermSize=128m</argLine>
<!-- Enforce single fork execution due to heavy mini cluster use in the tests -->
<forkCount>1</forkCount>
</configuration>
http://git-wip-us.apache.org/repos/asf/flink/blob/87d09342/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/AbstractTableInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/AbstractTableInputFormat.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/AbstractTableInputFormat.java
index d7f6ce9..59ba5b1f 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/AbstractTableInputFormat.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/AbstractTableInputFormat.java
@@ -38,12 +38,13 @@ import java.util.ArrayList;
import java.util.List;
/**
- * {@link InputFormat} subclass that wraps the access for HTables.
+ * Abstract {@link InputFormat} to read data from HBase tables.
*/
public abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, TableInputSplit> {
protected static Logger LOG = LoggerFactory.getLogger(AbstractTableInputFormat.class);
- /** helper variable to decide whether the input is exhausted or not */
+
+ // helper variable to decide whether the input is exhausted or not
protected boolean endReached = false;
protected transient HTable table = null;
@@ -52,34 +53,40 @@ public abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, Tab
/** HBase iterator wrapper */
protected ResultScanner resultScanner = null;
- protected byte[] lastRow;
- protected int scannedRows;
+ protected byte[] currentRow;
+ protected long scannedRows;
+
/**
* Returns an instance of Scan that retrieves the required subset of records from the HBase table.
- * @return The appropriate instance of Scan for this usecase.
+ *
+ * @return The appropriate instance of Scan for this use case.
*/
protected abstract Scan getScanner();
/**
* What table is to be read.
- * Per instance of a TableInputFormat derivative only a single tablename is possible.
+ * Per instance of a TableInputFormat derivative only a single table name is possible.
+ *
* @return The name of the table
*/
protected abstract String getTableName();
/**
- * The output from HBase is always an instance of {@link Result}.
- * This method is to copy the data in the Result instance into the required {@link T}
+ * HBase returns an instance of {@link Result}.
+ *
+ * This method maps the returned {@link Result} instance into the output type {@link T}.
+ *
* @param r The Result instance from HBase that needs to be converted
- * @return The approriate instance of {@link T} that contains the needed information.
+ * @return The appropriate instance of {@link T} that contains the data of Result.
*/
- protected abstract T mapResultToType(Result r);
+ protected abstract T mapResultToOutType(Result r);
/**
* Creates a {@link Scan} object and opens the {@link HTable} connection.
* These are opened here because they are needed in the createInputSplits
* which is called before the openInputFormat method.
- * So the connection is opened in {@link #configure(Configuration)} and closed in {@link #closeInputFormat()}.
+ *
+ * The connection is opened in this method and closed in {@link #closeInputFormat()}.
*
* @param parameters The configuration that is to be used
* @see Configuration
@@ -89,19 +96,23 @@ public abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, Tab
@Override
public void open(TableInputSplit split) throws IOException {
if (table == null) {
- throw new IOException("The HBase table has not been opened!");
+ throw new IOException("The HBase table has not been opened! " +
+ "This needs to be done in configure().");
}
if (scan == null) {
- throw new IOException("getScanner returned null");
+ throw new IOException("Scan has not been initialized! " +
+ "This needs to be done in configure().");
}
if (split == null) {
throw new IOException("Input split is null!");
}
logSplitInfo("opening", split);
- scan.setStartRow(split.getStartRow());
- lastRow = split.getEndRow();
- scan.setStopRow(lastRow);
+
+ // set scan range
+ currentRow = split.getStartRow();
+ scan.setStartRow(currentRow);
+ scan.setStopRow(split.getEndRow());
resultScanner = table.getScanner(scan);
endReached = false;
@@ -116,20 +127,20 @@ public abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, Tab
Result res = resultScanner.next();
if (res != null) {
scannedRows++;
- lastRow = res.getRow();
- return mapResultToType(res);
+ currentRow = res.getRow();
+ return mapResultToOutType(res);
}
} catch (Exception e) {
resultScanner.close();
//workaround for timeout on scan
LOG.warn("Error after scan of " + scannedRows + " rows. Retry with a new scanner...", e);
- scan.setStartRow(lastRow);
+ scan.setStartRow(currentRow);
resultScanner = table.getScanner(scan);
Result res = resultScanner.next();
if (res != null) {
scannedRows++;
- lastRow = res.getRow();
- return mapResultToType(res);
+ currentRow = res.getRow();
+ return mapResultToOutType(res);
}
}
@@ -146,6 +157,7 @@ public abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, Tab
String[] hostnames = split.getHostnames();
LOG.info("{} split (this={})[{}|{}|{}|{}]", action, this, splitId, hostnames, splitStartKey, splitStopKey);
}
+
@Override
public boolean reachedEnd() throws IOException {
return endReached;
@@ -154,7 +166,7 @@ public abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, Tab
@Override
public void close() throws IOException {
LOG.info("Closing split (scanned {} rows)", scannedRows);
- lastRow = null;
+ currentRow = null;
try {
if (resultScanner != null) {
resultScanner.close();
@@ -178,14 +190,16 @@ public abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, Tab
@Override
public TableInputSplit[] createInputSplits(final int minNumSplits) throws IOException {
if (table == null) {
- throw new IOException("The HBase table has not been opened!");
+ throw new IOException("The HBase table has not been opened! " +
+ "This needs to be done in configure().");
}
if (scan == null) {
- throw new IOException("getScanner returned null");
+ throw new IOException("Scan has not been initialized! " +
+ "This needs to be done in configure().");
}
- //Gets the starting and ending row keys for every region in the currently open table
- final Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
+ // Get the starting and ending row keys for every region in the currently open table
+ final Pair<byte[][], byte[][]> keys = table.getRegionLocator().getStartEndKeys();
if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
throw new IOException("Expecting at least one region.");
}
@@ -198,15 +212,15 @@ public abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, Tab
for (int i = 0; i < keys.getFirst().length; i++) {
final byte[] startKey = keys.getFirst()[i];
final byte[] endKey = keys.getSecond()[i];
- final String regionLocation = table.getRegionLocation(startKey, false).getHostnamePort();
- //Test if the given region is to be included in the InputSplit while splitting the regions of a table
- if (!includeRegionInSplit(startKey, endKey)) {
+ final String regionLocation = table.getRegionLocator().getRegionLocation(startKey, false).getHostnamePort();
+ // Test if the given region is to be included in the InputSplit while splitting the regions of a table
+ if (!includeRegionInScan(startKey, endKey)) {
continue;
}
- //Finds the region on which the given row is being served
+ // Find the region on which the given row is being served
final String[] hosts = new String[]{regionLocation};
- // determine if regions contains keys used by the scan
+ // Determine if regions contains keys used by the scan
boolean isLastRegion = endKey.length == 0;
if ((scanWithNoLowerBound || isLastRegion || Bytes.compareTo(startRow, endKey) < 0) &&
(scanWithNoUpperBound || Bytes.compareTo(stopRow, startKey) > 0)) {
@@ -223,27 +237,17 @@ public abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, Tab
for (TableInputSplit split : splits) {
logSplitInfo("created", split);
}
- return splits.toArray(new TableInputSplit[0]);
+ return splits.toArray(new TableInputSplit[splits.size()]);
}
/**
- * Test if the given region is to be included in the InputSplit while splitting the regions of a table.
- * <p>
- * This optimization is effective when there is a specific reasoning to exclude an entire region from the M-R job,
- * (and hence, not contributing to the InputSplit), given the start and end keys of the same. <br>
- * Useful when we need to remember the last-processed top record and revisit the [last, current) interval for M-R
- * processing, continuously. In addition to reducing InputSplits, reduces the load on the region server as well, due
- * to the ordering of the keys. <br>
- * <br>
- * Note: It is possible that <code>endKey.length() == 0 </code> , for the last (recent) region. <br>
- * Override this method, if you want to bulk exclude regions altogether from M-R. By default, no region is excluded(
- * i.e. all regions are included).
+ * Test if the given region is to be included in the scan while splitting the regions of a table.
*
* @param startKey Start key of the region
* @param endKey End key of the region
* @return true, if this region needs to be included as part of the input (default).
*/
- protected boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) {
+ protected boolean includeRegionInScan(final byte[] startKey, final byte[] endKey) {
return true;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/87d09342/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseRowInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseRowInputFormat.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseRowInputFormat.java
new file mode 100644
index 0000000..fff2a9e
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseRowInputFormat.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.charset.Charset;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * {@link InputFormat} subclass that wraps the access for HTables. Returns the result as {@link Row}
+ */
+public class HBaseRowInputFormat extends AbstractTableInputFormat<Row> implements ResultTypeQueryable<Row> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(HBaseRowInputFormat.class);
+ private String tableName;
+ private transient org.apache.hadoop.conf.Configuration conf;
+ private HBaseTableSchema schema;
+ private transient Charset stringCharset;
+
+ // family keys
+ private byte[][] families;
+ // qualifier keys
+ private byte[][][] qualifiers;
+ // qualifier types
+ private int[][] types;
+
+ // row which is returned
+ private Row resultRow;
+ // nested family rows
+ private Row[] familyRows;
+
+ public HBaseRowInputFormat(org.apache.hadoop.conf.Configuration conf, String tableName, HBaseTableSchema schema) {
+ this.tableName = tableName;
+ this.conf = conf;
+ this.schema = schema;
+
+ // set families, qualifiers, and types
+ String[] familyNames = schema.getFamilyNames();
+ this.families = schema.getFamilyKeys();
+ this.qualifiers = new byte[this.families.length][][];
+ this.types = new int[this.families.length][];
+ for (int f = 0; f < families.length; f++) {
+ this.qualifiers[f] = schema.getQualifierKeys(familyNames[f]);
+ TypeInformation[] typeInfos = schema.getQualifierTypes(familyNames[f]);
+ this.types[f] = new int[typeInfos.length];
+ for (int i = 0; i < typeInfos.length; i++) {
+ int typeIdx = getTypeIndex(typeInfos[i].getTypeClass());
+ if (typeIdx >= 0) {
+ types[f][i] = typeIdx;
+ } else {
+ throw new IllegalArgumentException("Unsupported type: " + typeInfos[i]);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void configure(Configuration parameters) {
+ LOG.info("Initializing HBase configuration.");
+ connectToTable();
+ if(table != null) {
+ scan = getScanner();
+ }
+
+ // prepare output rows
+ this.resultRow = new Row(families.length);
+ this.familyRows = new Row[families.length];
+ for (int f = 0; f < families.length; f++) {
+ this.familyRows[f] = new Row(qualifiers[f].length);
+ this.resultRow.setField(f, this.familyRows[f]);
+ }
+
+ this.stringCharset = Charset.forName(schema.getStringCharset());
+ }
+
+ @Override
+ protected Scan getScanner() {
+ Scan scan = new Scan();
+ for (int f = 0; f < families.length; f++) {
+ byte[] family = families[f];
+ for (int q = 0; q < qualifiers[f].length; q++) {
+ byte[] quantifier = qualifiers[f][q];
+ scan.addColumn(family, quantifier);
+ }
+ }
+ return scan;
+ }
+
+ @Override
+ public String getTableName() {
+ return tableName;
+ }
+
+ @Override
+ protected Row mapResultToOutType(Result res) {
+ for (int f = 0; f < this.families.length; f++) {
+ // get family key
+ byte[] familyKey = families[f];
+ Row familyRow = familyRows[f];
+ for (int q = 0; q < this.qualifiers[f].length; q++) {
+ // get quantifier key
+ byte[] qualifier = qualifiers[f][q];
+ // get quantifier type idx
+ int typeIdx = types[f][q];
+ // read value
+ byte[] value = res.getValue(familyKey, qualifier);
+ if(value != null) {
+ familyRow.setField(q, deserialize(value, typeIdx));
+ } else {
+ familyRow.setField(q, null);
+ }
+ }
+ resultRow.setField(f, familyRow);
+ }
+ return resultRow;
+ }
+
+ private void connectToTable() {
+
+ if (this.conf == null) {
+ this.conf = HBaseConfiguration.create();
+ }
+
+ try {
+ Connection conn = ConnectionFactory.createConnection(conf);
+ super.table = (HTable) conn.getTable(TableName.valueOf(tableName));
+ } catch(TableNotFoundException tnfe) {
+ LOG.error("The table " + tableName + " not found ", tnfe);
+ throw new RuntimeException("HBase table '" + tableName + "' not found.", tnfe);
+ } catch(IOException ioe) {
+ LOG.error("Exception while creating connection to HBase.", ioe);
+ throw new RuntimeException("Cannot create connection to HBase.", ioe);
+ }
+ }
+
+ @Override
+ public TypeInformation<Row> getProducedType() {
+ // split the fieldNames
+ String[] famNames = schema.getFamilyNames();
+ TypeInformation<?>[] typeInfos = new TypeInformation[famNames.length];
+ int i = 0;
+ for (String family : famNames) {
+ typeInfos[i] = new RowTypeInfo(schema.getQualifierTypes(family), schema.getQualifierNames(family));
+ i++;
+ }
+ return new RowTypeInfo(typeInfos, famNames);
+ }
+
+ private Object deserialize(byte[] value, int typeIdx) {
+ switch (typeIdx) {
+ case 0: // byte[]
+ return value;
+ case 1:
+ return new String(value, stringCharset);
+ case 2: // byte
+ return value[0];
+ case 3:
+ return Bytes.toShort(value);
+ case 4:
+ return Bytes.toInt(value);
+ case 5:
+ return Bytes.toLong(value);
+ case 6:
+ return Bytes.toFloat(value);
+ case 7:
+ return Bytes.toDouble(value);
+ case 8:
+ return Bytes.toBoolean(value);
+ case 9: // sql.Timestamp encoded as long
+ return new Timestamp(Bytes.toLong(value));
+ case 10: // sql.Date encoded as long
+ return new Date(Bytes.toLong(value));
+ case 11: // sql.Time encoded as long
+ return new Time(Bytes.toLong(value));
+ case 12:
+ return Bytes.toBigDecimal(value);
+ case 13:
+ return new BigInteger(value);
+
+ default:
+ throw new IllegalArgumentException("Unknown type index " + typeIdx);
+ }
+ }
+
+ private static int getTypeIndex(Class<?> clazz) {
+ if (byte[].class.equals(clazz)) {
+ return 0;
+ } else if (String.class.equals(clazz)) {
+ return 1;
+ } else if (Byte.class.equals(clazz)) {
+ return 2;
+ } else if (Short.class.equals(clazz)) {
+ return 3;
+ } else if (Integer.class.equals(clazz)) {
+ return 4;
+ } else if (Long.class.equals(clazz)) {
+ return 5;
+ } else if (Float.class.equals(clazz)) {
+ return 6;
+ } else if (Double.class.equals(clazz)) {
+ return 7;
+ } else if (Boolean.class.equals(clazz)) {
+ return 8;
+ } else if (Timestamp.class.equals(clazz)) {
+ return 9;
+ } else if (Date.class.equals(clazz)) {
+ return 10;
+ } else if (Time.class.equals(clazz)) {
+ return 11;
+ } else if (BigDecimal.class.equals(clazz)) {
+ return 12;
+ } else if (BigInteger.class.equals(clazz)) {
+ return 13;
+ } else {
+ return -1;
+ }
+ }
+
+ static boolean isSupportedType(Class<?> clazz) {
+ return getTypeIndex(clazz) != -1;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/87d09342/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
index d3c9fb8..b6b3916 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
@@ -17,20 +17,14 @@
*/
package org.apache.flink.addons.hbase;
-import com.google.common.collect.ImmutableCollection;
-import com.google.common.collect.ImmutableList;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.util.Preconditions;
-import org.apache.hadoop.hbase.util.Bytes;
import java.io.Serializable;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.sql.Time;
+import java.nio.charset.Charset;
+import java.util.LinkedHashMap;
import java.util.Map;
-import java.sql.Date;
-import java.util.TreeMap;
/**
* Helps to specify an HBase Table's schema
@@ -38,17 +32,13 @@ import java.util.TreeMap;
public class HBaseTableSchema implements Serializable {
// A Map with key as column family.
- // Guarantees natural ordering
- private final Map<String, Map<String, TypeInformation<?>>> familyMap =
- new TreeMap<>();
+ private final Map<String, Map<String, TypeInformation<?>>> familyMap = new LinkedHashMap<>();
- // Allowed types. This may change.
- private static ImmutableCollection<Class<?>> CLASS_TYPES = ImmutableList.<Class<?>>of(
- Integer.class, Short.class, Float.class, Long.class, String.class, Byte.class, Boolean.class, Double.class, BigInteger.class, BigDecimal.class, Date.class, Time.class, byte[].class
- );
+ // charset to parse HBase keys and strings. UTF-8 by default.
+ private String charset = "UTF-8";
/**
- * Allows specifying the family and qualifier name along with the data type of the qualifier for an HBase table
+ * Adds a column defined by family, qualifier, and type to the table schema.
*
* @param family the family name
* @param qualifier the qualifier name
@@ -58,76 +48,138 @@ public class HBaseTableSchema implements Serializable {
Preconditions.checkNotNull(family, "family name");
Preconditions.checkNotNull(qualifier, "qualifier name");
Preconditions.checkNotNull(clazz, "class type");
- Map<String, TypeInformation<?>> map = this.familyMap.get(family);
- if (map == null) {
- map = new TreeMap<>();
- }
- if (!CLASS_TYPES.contains(clazz)) {
+ Map<String, TypeInformation<?>> qualifierMap = this.familyMap.get(family);
+
+ if (!HBaseRowInputFormat.isSupportedType(clazz)) {
// throw exception
- throw new IllegalArgumentException("Unsupported class type found " + clazz+". Better to use byte[].class and deserialize using user defined scalar functions");
+ throw new IllegalArgumentException("Unsupported class type found " + clazz+". " +
+ "Better to use byte[].class and deserialize using user defined scalar functions");
+ }
+
+ if (qualifierMap == null) {
+ qualifierMap = new LinkedHashMap<>();
}
- map.put(qualifier, TypeExtractor.getForClass(clazz));
- familyMap.put(family, map);
+ qualifierMap.put(qualifier, TypeExtractor.getForClass(clazz));
+ familyMap.put(family, qualifierMap);
}
+ /**
+ * Sets the charset for value strings and HBase identifiers.
+ *
+ * @param charset the charset for value strings and HBase identifiers.
+ */
+ void setCharset(String charset) {
+ this.charset = charset;
+ }
+
+ /**
+ * Returns the names of all registered column families.
+ *
+ * @return The names of all registered column families.
+ */
String[] getFamilyNames() {
return this.familyMap.keySet().toArray(new String[this.familyMap.size()]);
}
+ /**
+ * Returns the HBase identifiers of all registered column families.
+ *
+ * @return The HBase identifiers of all registered column families.
+ */
+ byte[][] getFamilyKeys() {
+ Charset c = Charset.forName(charset);
+
+ byte[][] familyKeys = new byte[this.familyMap.size()][];
+ int i = 0;
+ for(String name : this.familyMap.keySet()) {
+ familyKeys[i++] = name.getBytes(c);
+ }
+ return familyKeys;
+ }
+
+ /**
+ * Returns the names of all registered column qualifiers of a specific column family.
+ *
+ * @param family The name of the column family for which the column qualifier names are returned.
+ * @return The names of all registered column qualifiers of a specific column family.
+ */
String[] getQualifierNames(String family) {
- Map<String, TypeInformation<?>> colDetails = familyMap.get(family);
- String[] qualifierNames = new String[colDetails.size()];
+ Map<String, TypeInformation<?>> qualifierMap = familyMap.get(family);
+
+ if (qualifierMap == null) {
+ throw new IllegalArgumentException("Family " + family + " does not exist in schema.");
+ }
+
+ String[] qualifierNames = new String[qualifierMap.size()];
int i = 0;
- for (String qualifier: colDetails.keySet()) {
+ for (String qualifier: qualifierMap.keySet()) {
qualifierNames[i] = qualifier;
i++;
}
return qualifierNames;
}
+ /**
+ * Returns the HBase identifiers of all registered column qualifiers for a specific column family.
+ *
+ * @param family The name of the column family for which the column qualifier identifiers are returned.
+ * @return The HBase identifiers of all registered column qualifiers for a specific column family.
+ */
+ byte[][] getQualifierKeys(String family) {
+ Map<String, TypeInformation<?>> qualifierMap = familyMap.get(family);
+
+ if (qualifierMap == null) {
+ throw new IllegalArgumentException("Family " + family + " does not exist in schema.");
+ }
+ Charset c = Charset.forName(charset);
+
+ byte[][] qualifierKeys = new byte[qualifierMap.size()][];
+ int i = 0;
+ for(String name : qualifierMap.keySet()) {
+ qualifierKeys[i++] = name.getBytes(c);
+ }
+ return qualifierKeys;
+ }
+
+ /**
+ * Returns the types of all registered column qualifiers of a specific column family.
+ *
+ * @param family The name of the column family for which the column qualifier types are returned.
+ * @return The types of all registered column qualifiers of a specific column family.
+ */
TypeInformation<?>[] getQualifierTypes(String family) {
- Map<String, TypeInformation<?>> colDetails = familyMap.get(family);
- TypeInformation<?>[] typeInformations = new TypeInformation[colDetails.size()];
+ Map<String, TypeInformation<?>> qualifierMap = familyMap.get(family);
+
+ if (qualifierMap == null) {
+ throw new IllegalArgumentException("Family " + family + " does not exist in schema.");
+ }
+
+ TypeInformation<?>[] typeInformation = new TypeInformation[qualifierMap.size()];
int i = 0;
- for (TypeInformation<?> typeInfo : colDetails.values()) {
- typeInformations[i] = typeInfo;
+ for (TypeInformation<?> typeInfo : qualifierMap.values()) {
+ typeInformation[i] = typeInfo;
i++;
}
- return typeInformations;
+ return typeInformation;
}
+ /**
+ * Returns the names and types of all registered column qualifiers of a specific column family.
+ *
+ * @param family The name of the column family for which the column qualifier names and types are returned.
+ * @return The names and types of all registered column qualifiers of a specific column family.
+ */
Map<String, TypeInformation<?>> getFamilyInfo(String family) {
return familyMap.get(family);
}
- Object deserialize(byte[] value, TypeInformation<?> typeInfo) {
- if (typeInfo.isBasicType()) {
- if (typeInfo.getTypeClass() == Integer.class) {
- return Bytes.toInt(value);
- } else if (typeInfo.getTypeClass() == Short.class) {
- return Bytes.toShort(value);
- } else if (typeInfo.getTypeClass() == Float.class) {
- return Bytes.toFloat(value);
- } else if (typeInfo.getTypeClass() == Long.class) {
- return Bytes.toLong(value);
- } else if (typeInfo.getTypeClass() == String.class) {
- return Bytes.toString(value);
- } else if (typeInfo.getTypeClass() == Byte.class) {
- return value[0];
- } else if (typeInfo.getTypeClass() == Boolean.class) {
- return Bytes.toBoolean(value);
- } else if (typeInfo.getTypeClass() == Double.class) {
- return Bytes.toDouble(value);
- } else if (typeInfo.getTypeClass() == BigInteger.class) {
- return new BigInteger(value);
- } else if (typeInfo.getTypeClass() == BigDecimal.class) {
- return Bytes.toBigDecimal(value);
- } else if (typeInfo.getTypeClass() == Date.class) {
- return new Date(Bytes.toLong(value));
- } else if (typeInfo.getTypeClass() == Time.class) {
- return new Time(Bytes.toLong(value));
- }
- }
- return value;
+ /**
+ * Returns the charset for value strings and HBase identifiers.
+ *
+ * @return The charset for value strings and HBase identifiers.
+ */
+ String getStringCharset() {
+ return this.charset;
}
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/87d09342/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
index c484bd7..a1be23f 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
@@ -30,11 +30,27 @@ import org.apache.hadoop.conf.Configuration;
import java.util.Map;
/**
- * Creates a table source that helps to scan data from an hbase table
- * The table name is passed during {@link HBaseTableSource} construction along with the required hbase configurations.
- * Use {@link #addColumn(String, String, Class)} to specify the family name, qualifier and the type of the qualifier that needs to be scanned
- * This supports nested schema, for eg: if we have a column family 'Person' and two qualifiers 'Name' (type String) and 'Unique_Id' (Type Integer),
- * then we represent this schema as Row<Person : Row<Name: String, Unique_Id : Integer>>
+ * Creates a TableSource to scan an HBase table.
+ *
+ * The table name and required HBase configuration is passed during {@link HBaseTableSource} construction.
+ * Use {@link #addColumn(String, String, Class)} to specify the family, qualifier, and type of columns to scan.
+ *
+ * The TableSource returns {@link Row} with nested Rows for each column family.
+ *
+ * The HBaseTableSource is used as shown in the example below.
+ *
+ * <pre>
+ * {@code
+ * HBaseTableSource hSrc = new HBaseTableSource(conf, "hTable");
+ * hSrc.addColumn("fam1", "col1", byte[].class);
+ * hSrc.addColumn("fam1", "col2", Integer.class);
+ * hSrc.addColumn("fam2", "col1", String.class);
+ *
+ * tableEnv.registerTableSource("hTable", hSrc);
+ * Table res = tableEnv.sql("SELECT t.fam2.col1, SUM(t.fam1.col2) FROM hTable AS t GROUP BY t.fam2.col1");
+ * }
+ * </pre>
+ *
*/
public class HBaseTableSource implements BatchTableSource<Row>, ProjectableTableSource<Row> {
@@ -43,7 +59,7 @@ public class HBaseTableSource implements BatchTableSource<Row>, ProjectableTable
private HBaseTableSchema schema;
/**
- * The hbase configuration and the table name that acts as the hbase table source
+ * The HBase configuration and the name of the table to read.
*
* @param conf hbase configuration
* @param tableName the tableName
@@ -55,7 +71,7 @@ public class HBaseTableSource implements BatchTableSource<Row>, ProjectableTable
}
/**
- * Allows specifying the family and qualifier name along with the data type of the qualifier for an HBase table
+ * Adds a column defined by family, qualifier, and type to the table schema.
*
* @param family the family name
* @param qualifier the qualifier name
@@ -65,6 +81,15 @@ public class HBaseTableSource implements BatchTableSource<Row>, ProjectableTable
this.schema.addColumn(family, qualifier, clazz);
}
+ /**
+ * Specifies the charset to parse Strings to HBase byte[] keys and String values.
+ *
+ * @param charset Name of the charset to use.
+ */
+ public void setCharset(String charset) {
+ this.schema.setCharset(charset);
+ }
+
@Override
public TypeInformation<Row> getReturnType() {
String[] famNames = schema.getFamilyNames();
@@ -74,13 +99,12 @@ public class HBaseTableSource implements BatchTableSource<Row>, ProjectableTable
typeInfos[i] = new RowTypeInfo(schema.getQualifierTypes(family), schema.getQualifierNames(family));
i++;
}
- RowTypeInfo rowInfo = new RowTypeInfo(typeInfos, famNames);
- return rowInfo;
+ return new RowTypeInfo(typeInfos, famNames);
}
@Override
public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) {
- return execEnv.createInput(new HBaseTableSourceInputFormat(conf, tableName, schema), getReturnType());
+ return execEnv.createInput(new HBaseRowInputFormat(conf, tableName, schema), getReturnType());
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/87d09342/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java
deleted file mode 100644
index 6c4d7da..0000000
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.addons.hbase;
-
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.Row;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * {@link InputFormat} subclass that wraps the access for HTables. Returns the result as {@link Row}
- */
-public class HBaseTableSourceInputFormat extends AbstractTableInputFormat<Row> implements ResultTypeQueryable<Row> {
-
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG = LoggerFactory.getLogger(HBaseTableSourceInputFormat.class);
- private String tableName;
- private transient Connection conn;
- private transient org.apache.hadoop.conf.Configuration conf;
- private HBaseTableSchema schema;
-
- public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration conf, String tableName, HBaseTableSchema schema) {
- this.tableName = tableName;
- this.conf = conf;
- this.schema = schema;
- }
-
- @Override
- public void configure(Configuration parameters) {
- LOG.info("Initializing HBaseConfiguration");
- connectToTable();
- if(table != null) {
- scan = getScanner();
- }
- }
-
- @Override
- protected Scan getScanner() {
- Scan scan = new Scan();
- for(String family : schema.getFamilyNames()) {
- for(String qualifierName : schema.getQualifierNames(family)) {
- scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifierName));
- }
- }
- return scan;
- }
-
- @Override
- public String getTableName() {
- return tableName;
- }
-
- @Override
- protected Row mapResultToType(Result res) {
- List<Object> values = new ArrayList<Object>();
- int i = 0;
- String[] familyNames = schema.getFamilyNames();
- Object[] rows = new Object[familyNames.length];
- for(String family : familyNames) {
- Map<String, TypeInformation<?>> infos = schema.getFamilyInfo(family);
- for(String qualifier : infos.keySet()) {
- byte[] value = res.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier));
- if(value != null) {
- values.add(schema.deserialize(value, infos.get(qualifier)));
- } else {
- values.add(null);
- }
- }
- rows[i] = Row.of(values.toArray(new Object[values.size()]));
- values.clear();
- i++;
- }
- return Row.of(rows);
- }
-
- private void connectToTable() {
- //use files found in the classpath
- if(this.conf == null) {
- this.conf = HBaseConfiguration.create();
- }
- try {
- conn = ConnectionFactory.createConnection(conf);
- } catch(IOException ioe) {
- LOG.error("Exception while creating connection to hbase cluster", ioe);
- return;
- }
- try {
- table = (HTable)conn.getTable(TableName.valueOf(tableName));
- } catch(TableNotFoundException tnfe) {
- LOG.error("The table " + tableName + " not found ", tnfe);
- } catch(IOException ioe) {
- LOG.error("Exception while connecting to the table "+tableName+ " ", ioe);
- }
- }
-
- @Override
- public TypeInformation<Row> getProducedType() {
- // split the fieldNames
- String[] famNames = schema.getFamilyNames();
- TypeInformation<?>[] typeInfos = new TypeInformation[famNames.length];
- int i = 0;
- for (String family : famNames) {
- typeInfos[i] = new RowTypeInfo(schema.getQualifierTypes(family), schema.getQualifierNames(family));
- i++;
- }
- RowTypeInfo rowInfo = new RowTypeInfo(typeInfos, famNames);
- return rowInfo;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/87d09342/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
index 467652a..6ea2d04 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
@@ -52,7 +52,7 @@ public abstract class TableInputFormat<T extends Tuple> extends AbstractTableInp
* @param r The Result instance from HBase that needs to be converted
* @return The approriate instance of {@link Tuple} that contains the needed information.
*/
- protected abstract T mapResultToType(Result r);
+ protected abstract T mapResultToTuple(Result r);
/**
* Creates a {@link Scan} object and opens the {@link HTable} connection.
@@ -86,4 +86,8 @@ public abstract class TableInputFormat<T extends Tuple> extends AbstractTableInp
}
return null;
}
+
+ protected T mapResultToOutType(Result r) {
+ return mapResultToTuple(r);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/87d09342/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
new file mode 100644
index 0000000..c1aa9a0
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
@@ -0,0 +1,338 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * This class contains integrations tests for multiple HBase connectors:
+ * - TableInputFormat
+ * - HBaseTableSource
+ *
+ * These tests are located in a single test file to avoided unnecessary initializations of the
+ * HBaseTestingCluster which takes about half a minute.
+ *
+ */
+public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
+
+ private static final String TEST_TABLE = "testTable";
+
+ private static final String FAMILY1 = "family1";
+ private static final String F1COL1 = "col1";
+
+ private static final String FAMILY2 = "family2";
+ private static final String F2COL1 = "col1";
+ private static final String F2COL2 = "col2";
+
+ private static final String FAMILY3 = "family3";
+ private static final String F3COL1 = "col1";
+ private static final String F3COL2 = "col2";
+ private static final String F3COL3 = "col3";
+
+ @BeforeClass
+ public static void activateHBaseCluster() throws IOException {
+ registerHBaseMiniClusterInClasspath();
+ prepareTable();
+ }
+
+ private static void prepareTable() throws IOException {
+
+ // create a table
+ TableName tableName = TableName.valueOf(TEST_TABLE);
+ // column families
+ byte[][] families = new byte[][]{
+ Bytes.toBytes(FAMILY1),
+ Bytes.toBytes(FAMILY2),
+ Bytes.toBytes(FAMILY3)
+ };
+ // split keys
+ byte[][] splitKeys = new byte[][]{ Bytes.toBytes(4) };
+ createTable(tableName, families, splitKeys);
+
+ // get the HTable instance
+ HTable table = openTable(tableName);
+ List<Put> puts = new ArrayList<>();
+ // add some data
+ puts.add(putRow(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1"));
+ puts.add(putRow(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2"));
+ puts.add(putRow(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3"));
+ puts.add(putRow(4, 40, null, 400L, 4.04, true, "Welt-4"));
+ puts.add(putRow(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5"));
+ puts.add(putRow(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6"));
+ puts.add(putRow(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7"));
+ puts.add(putRow(8, 80, null, 800L, 8.08, true, "Welt-8"));
+
+ // append rows to table
+ table.put(puts);
+ table.close();
+ }
+
+ private static Put putRow(int rowKey, int f1c1, String f2c1, long f2c2, double f3c1, boolean f3c2, String f3c3) {
+ Put put = new Put(Bytes.toBytes(rowKey));
+ // family 1
+ put.addColumn(Bytes.toBytes(FAMILY1), Bytes.toBytes(F1COL1), Bytes.toBytes(f1c1));
+ // family 2
+ if (f2c1 != null) {
+ put.addColumn(Bytes.toBytes(FAMILY2), Bytes.toBytes(F2COL1), Bytes.toBytes(f2c1));
+ }
+ put.addColumn(Bytes.toBytes(FAMILY2), Bytes.toBytes(F2COL2), Bytes.toBytes(f2c2));
+ // family 3
+ put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL1), Bytes.toBytes(f3c1));
+ put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL2), Bytes.toBytes(f3c2));
+ put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL3), Bytes.toBytes(f3c3));
+
+ return put;
+ }
+
+ // ######## HBaseTableSource tests ############
+
+ @Test
+ public void testTableSourceFullScan() throws Exception {
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(4);
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, new TableConfig());
+ HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE);
+ hbaseTable.addColumn(FAMILY1, F1COL1, Integer.class);
+ hbaseTable.addColumn(FAMILY2, F2COL1, String.class);
+ hbaseTable.addColumn(FAMILY2, F2COL2, Long.class);
+ hbaseTable.addColumn(FAMILY3, F3COL1, Double.class);
+ hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class);
+ hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
+ tableEnv.registerTableSource("hTable", hbaseTable);
+
+ Table result = tableEnv.sql(
+ "SELECT " +
+ " h.family1.col1, " +
+ " h.family2.col1, " +
+ " h.family2.col2, " +
+ " h.family3.col1, " +
+ " h.family3.col2, " +
+ " h.family3.col3 " +
+ "FROM hTable AS h"
+ );
+ DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = resultSet.collect();
+
+ String expected =
+ "10,Hello-1,100,1.01,false,Welt-1\n" +
+ "20,Hello-2,200,2.02,true,Welt-2\n" +
+ "30,Hello-3,300,3.03,false,Welt-3\n" +
+ "40,null,400,4.04,true,Welt-4\n" +
+ "50,Hello-5,500,5.05,false,Welt-5\n" +
+ "60,Hello-6,600,6.06,true,Welt-6\n" +
+ "70,Hello-7,700,7.07,false,Welt-7\n" +
+ "80,null,800,8.08,true,Welt-8\n";
+
+ TestBaseUtils.compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testTableSourceProjection() throws Exception {
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(4);
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, new TableConfig());
+ HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE);
+ hbaseTable.addColumn(FAMILY1, F1COL1, Integer.class);
+ hbaseTable.addColumn(FAMILY2, F2COL1, String.class);
+ hbaseTable.addColumn(FAMILY2, F2COL2, Long.class);
+ hbaseTable.addColumn(FAMILY3, F3COL1, Double.class);
+ hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class);
+ hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
+ tableEnv.registerTableSource("hTable", hbaseTable);
+
+ Table result = tableEnv.sql(
+ "SELECT " +
+ " h.family1.col1, " +
+ " h.family3.col1, " +
+ " h.family3.col2, " +
+ " h.family3.col3 " +
+ "FROM hTable AS h"
+ );
+ DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = resultSet.collect();
+
+ String expected =
+ "10,1.01,false,Welt-1\n" +
+ "20,2.02,true,Welt-2\n" +
+ "30,3.03,false,Welt-3\n" +
+ "40,4.04,true,Welt-4\n" +
+ "50,5.05,false,Welt-5\n" +
+ "60,6.06,true,Welt-6\n" +
+ "70,7.07,false,Welt-7\n" +
+ "80,8.08,true,Welt-8\n";
+
+ TestBaseUtils.compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testTableSourceFieldOrder() throws Exception {
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(4);
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, new TableConfig());
+ HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE);
+ // shuffle order of column registration
+ hbaseTable.addColumn(FAMILY2, F2COL1, String.class);
+ hbaseTable.addColumn(FAMILY3, F3COL1, Double.class);
+ hbaseTable.addColumn(FAMILY1, F1COL1, Integer.class);
+ hbaseTable.addColumn(FAMILY2, F2COL2, Long.class);
+ hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class);
+ hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
+ tableEnv.registerTableSource("hTable", hbaseTable);
+
+ Table result = tableEnv.sql(
+ "SELECT * FROM hTable AS h"
+ );
+ DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = resultSet.collect();
+
+ String expected =
+ "Hello-1,100,1.01,false,Welt-1,10\n" +
+ "Hello-2,200,2.02,true,Welt-2,20\n" +
+ "Hello-3,300,3.03,false,Welt-3,30\n" +
+ "null,400,4.04,true,Welt-4,40\n" +
+ "Hello-5,500,5.05,false,Welt-5,50\n" +
+ "Hello-6,600,6.06,true,Welt-6,60\n" +
+ "Hello-7,700,7.07,false,Welt-7,70\n" +
+ "null,800,8.08,true,Welt-8,80\n";
+
+ TestBaseUtils.compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testTableSourceReadAsByteArray() throws Exception {
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(4);
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, new TableConfig());
+ // fetch row2 from the table till the end
+ HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE);
+ hbaseTable.addColumn(FAMILY2, F2COL1, byte[].class);
+ hbaseTable.addColumn(FAMILY2, F2COL2, byte[].class);
+
+ tableEnv.registerTableSource("hTable", hbaseTable);
+ tableEnv.registerFunction("toUTF8", new ToUTF8());
+ tableEnv.registerFunction("toLong", new ToLong());
+
+ Table result = tableEnv.sql(
+ "SELECT " +
+ " toUTF8(h.family2.col1), " +
+ " toLong(h.family2.col2) " +
+ "FROM hTable AS h"
+ );
+ DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = resultSet.collect();
+
+ String expected =
+ "Hello-1,100\n" +
+ "Hello-2,200\n" +
+ "Hello-3,300\n" +
+ "null,400\n" +
+ "Hello-5,500\n" +
+ "Hello-6,600\n" +
+ "Hello-7,700\n" +
+ "null,800\n";
+
+ TestBaseUtils.compareResultAsText(results, expected);
+ }
+
+ public static class ToUTF8 extends ScalarFunction {
+
+ public String eval(byte[] bytes) {
+ return Bytes.toString(bytes);
+ }
+ }
+
+ public static class ToLong extends ScalarFunction {
+
+ public long eval(byte[] bytes) {
+ return Bytes.toLong(bytes);
+ }
+ }
+
+ // ######## TableInputFormate tests ############
+
+ class InputFormatForTestTable extends TableInputFormat<Tuple1<Integer>> {
+
+ @Override
+ protected Scan getScanner() {
+ return new Scan();
+ }
+
+ @Override
+ protected String getTableName() {
+ return TEST_TABLE;
+ }
+
+ @Override
+ protected Tuple1<Integer> mapResultToTuple(Result r) {
+ return new Tuple1<>(Bytes.toInt(r.getValue(Bytes.toBytes(FAMILY1), Bytes.toBytes(F1COL1))));
+ }
+ }
+
+ @Test
+ public void testTableInputFormat() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(4);
+
+ DataSet<Tuple1<Integer>> result = env
+ .createInput(new InputFormatForTestTable())
+ .reduce(new ReduceFunction<Tuple1<Integer>>(){
+
+ @Override
+ public Tuple1<Integer> reduce(Tuple1<Integer> v1, Tuple1<Integer> v2) throws Exception {
+ return Tuple1.of(v1.f0 + v2.f0);
+ }
+ });
+
+ List<Tuple1<Integer>> resultSet = result.collect();
+
+ assertEquals(1, resultSet.size());
+ assertEquals(360, (int)resultSet.get(0).f0);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/87d09342/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java
deleted file mode 100644
index 57ccafd..0000000
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.addons.hbase;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class TableInputFormatITCase extends HBaseTestingClusterAutostarter {
- private static final String TEST_TABLE_NAME = "TableInputFormatTestTable";
- private static final byte[] TEST_TABLE_FAMILY_NAME = "F".getBytes();
- private static final byte[] TEST_TABLE_COLUMN_NAME = "Col".getBytes();
-
- // These are the row ids AND also the values we will put in the test table
- private static final String[] ROW_IDS = {"000", "111", "222", "333", "444", "555", "666", "777", "888", "999"};
-
- @BeforeClass
- public static void activateHBaseCluster(){
- registerHBaseMiniClusterInClasspath();
- }
-
- @Before
- public void createTestTable() throws IOException {
- TableName tableName = TableName.valueOf(TEST_TABLE_NAME);
- byte[][] splitKeys = {"0".getBytes(), "3".getBytes(), "6".getBytes(), "9".getBytes()};
- byte[][] famNames = new byte[1][];
- famNames[0] = TEST_TABLE_FAMILY_NAME;
- createTable(tableName, famNames, splitKeys);
- HTable table = openTable(tableName);
-
- for (String rowId : ROW_IDS) {
- byte[] rowIdBytes = rowId.getBytes();
- Put p = new Put(rowIdBytes);
- // Use the rowId as the value to facilitate the testing better
- p.add(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME, rowIdBytes);
- table.put(p);
- }
-
- table.close();
- }
-
- class InputFormatForTestTable extends TableInputFormat<Tuple1<String>> {
- @Override
- protected Scan getScanner() {
- return new Scan();
- }
-
- @Override
- protected String getTableName() {
- return TEST_TABLE_NAME;
- }
-
- @Override
- protected Tuple1<String> mapResultToType(Result r) {
- return new Tuple1<>(new String(r.getValue(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME)));
- }
- }
-
- @Test
- public void testTableInputFormat() {
- ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
- environment.setParallelism(1);
-
- DataSet<String> resultDataSet =
- environment.createInput(new InputFormatForTestTable()).map(new MapFunction<Tuple1<String>, String>() {
- @Override
- public String map(Tuple1<String> value) throws Exception {
- return value.f0;
- }
- });
-
- List<String> resultSet = new ArrayList<>();
- resultDataSet.output(new LocalCollectionOutputFormat<>(resultSet));
-
- try {
- environment.execute("HBase InputFormat Test");
- } catch (Exception e) {
- Assert.fail("HBase InputFormat test failed. " + e.getMessage());
- }
-
- for (String rowId : ROW_IDS) {
- assertTrue("Missing rowId from table: " + rowId, resultSet.contains(rowId));
- }
-
- assertEquals("The number of records is wrong.", ROW_IDS.length, resultSet.size());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/87d09342/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
index 5377791..dccf876 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
@@ -63,7 +63,7 @@ public class HBaseReadExample {
private Tuple2<String, String> reuse = new Tuple2<String, String>();
@Override
- protected Tuple2<String, String> mapResultToType(Result r) {
+ protected Tuple2<String, String> mapResultToTuple(Result r) {
String key = Bytes.toString(r.getRow());
String val = Bytes.toString(r.getValue(HBaseFlinkTestConstants.CF_SOME, HBaseFlinkTestConstants.Q_SOME));
reuse.setField(key, 0);
http://git-wip-us.apache.org/repos/asf/flink/blob/87d09342/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java
deleted file mode 100644
index 8882d5c..0000000
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Copyright The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.addons.hbase.example;
-
-import org.apache.flink.addons.hbase.HBaseTableSchema;
-import org.apache.flink.addons.hbase.HBaseTableSource;
-import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.java.BatchTableEnvironment;
-import org.apache.flink.table.sources.BatchTableSource;
-import org.apache.flink.test.util.TestBaseUtils;
-import org.apache.flink.types.Row;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-
-public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter {
-
- private static final byte[] ROW_1 = Bytes.toBytes("row1");
- private static final byte[] ROW_2 = Bytes.toBytes("row2");
- private static final byte[] ROW_3 = Bytes.toBytes("row3");
- private static final byte[] F_1 = Bytes.toBytes("f1");
- private static final byte[] F_2 = Bytes.toBytes("f2");
- private static final byte[] Q_1 = Bytes.toBytes("q1");
- private static final byte[] Q_2 = Bytes.toBytes("q2");
- private static final byte[] Q_3 = Bytes.toBytes("q3");
-
- @BeforeClass
- public static void activateHBaseCluster(){
- registerHBaseMiniClusterInClasspath();
- }
-
- @Test
- public void testHBaseTableSourceWithSingleColumnFamily() throws Exception {
- // create a table with single region
- TableName tableName = TableName.valueOf("test");
- // no split keys
- byte[][] famNames = new byte[1][];
- famNames[0] = F_1;
- createTable(tableName, famNames, null);
- // get the htable instance
- HTable table = openTable(tableName);
- List<Put> puts = new ArrayList<Put>();
- // add some data
- Put put = new Put(ROW_1);
- // add 3 qualifiers per row
- //1st qual is integer
- put.addColumn(F_1, Q_1, Bytes.toBytes(100));
- //2nd qual is String
- put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue"));
- // 3rd qual is long
- put.addColumn(F_1, Q_3, Bytes.toBytes(19991L));
- puts.add(put);
-
- put = new Put(ROW_2);
- // add 3 qualifiers per row
- //1st qual is integer
- put.addColumn(F_1, Q_1, Bytes.toBytes(101));
- //2nd qual is String
- put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue1"));
- // 3rd qual is long
- put.addColumn(F_1, Q_3, Bytes.toBytes(19992L));
- puts.add(put);
-
- put = new Put(ROW_3);
- // add 3 qualifiers per row
- //1st qual is integer
- put.addColumn(F_1, Q_1, Bytes.toBytes(102));
- //2nd qual is String
- put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue2"));
- // 3rd qual is long
- put.addColumn(F_1, Q_3, Bytes.toBytes(19993L));
- puts.add(put);
- // add the mutations to the table
- table.put(puts);
- table.close();
- // preparetion is done
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, new TableConfig());
- // fetch row2 from the table till the end
- HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), tableName.getNameAsString());
- hbaseTable.addColumn(Bytes.toString(F_1), Bytes.toString(Q_1), Integer.class);
- hbaseTable.addColumn(Bytes.toString(F_1), Bytes.toString(Q_2), String.class);
- hbaseTable.addColumn(Bytes.toString(F_1), Bytes.toString(Q_3), Long.class);
- tableEnv.registerTableSource("test", hbaseTable);
- Table result = tableEnv
- .sql("SELECT test.f1.q1, test.f1.q2, test.f1.q3 FROM test");
- DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
- List<Row> results = resultSet.collect();
-
- String expected = "100,strvalue,19991\n" +
- "101,strvalue1,19992\n" +
- "102,strvalue2,19993\n";
- TestBaseUtils.compareResultAsText(results, expected);
- }
-
- @Test
- public void testHBaseTableSourceWithTwoColumnFamily() throws Exception {
- // create a table with single region
- TableName tableName = TableName.valueOf("test1");
- // no split keys
- byte[][] famNames = new byte[2][];
- famNames[0] = F_1;
- famNames[1] = F_2;
- createTable(tableName, famNames, null);
- // get the htable instance
- HTable table = openTable(tableName);
- List<Put> puts = new ArrayList<Put>();
- // add some data
- Put put = new Put(ROW_1);
- // add 3 qualifiers per row
- //1st qual is integer
- put.addColumn(F_1, Q_1, Bytes.toBytes(100));
- //2nd qual is String
- put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue"));
- // 3rd qual is long
- put.addColumn(F_1, Q_3, Bytes.toBytes(19991L));
- puts.add(put);
-
- put = new Put(ROW_2);
- // add 3 qualifiers per row
- //1st qual is integer
- put.addColumn(F_2, Q_1, Bytes.toBytes(201));
- //2nd qual is String
- put.addColumn(F_2, Q_2, Bytes.toBytes("newvalue1"));
- // 3rd qual is long
- put.addColumn(F_2, Q_3, Bytes.toBytes(29992L));
- puts.add(put);
-
- put = new Put(ROW_3);
- // add 3 qualifiers per row
- //1st qual is integer
- put.addColumn(F_1, Q_1, Bytes.toBytes(102));
- //2nd qual is String
- put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue2"));
- // 3rd qual is long
- put.addColumn(F_1, Q_3, Bytes.toBytes(19993L));
- puts.add(put);
- // add the mutations to the table
- table.put(puts);
- table.close();
- // preparation is done
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, new TableConfig());
- // fetch row2 from the table till the end
- HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), tableName.getNameAsString());
- hbaseTable.addColumn(Bytes.toString(F_1), Bytes.toString(Q_1), Integer.class);
- hbaseTable.addColumn(Bytes.toString(F_1), Bytes.toString(Q_2), String.class);
- hbaseTable.addColumn(Bytes.toString(F_1), Bytes.toString(Q_3), Long.class);
- hbaseTable.addColumn(Bytes.toString(F_2), Bytes.toString(Q_1), Integer.class);
- hbaseTable.addColumn(Bytes.toString(F_2), Bytes.toString(Q_2), String.class);
- hbaseTable.addColumn(Bytes.toString(F_2), Bytes.toString(Q_3), Long.class);
- tableEnv.registerTableSource("test1", hbaseTable);
- Table result = tableEnv
- .sql("SELECT test1.f1.q1, test1.f1.q2, test1.f1.q3, test1.f2.q1, test1.f2.q2, test1.f2.q3 FROM test1");
- DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
- List<Row> results = resultSet.collect();
-
- String expected = "100,strvalue,19991,null,null,null\n" +
- "null,null,null,201,newvalue1,29992\n" +
- "102,strvalue2,19993,null,null,null\n";
- TestBaseUtils.compareResultAsText(results, expected);
- }
-}