You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/02/15 09:41:26 UTC

[2/2] flink git commit: [FLINK-2168] Refactor HBaseTableSource and extend tests.

[FLINK-2168] Refactor HBaseTableSource and extend tests.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/87d09342
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/87d09342
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/87d09342

Branch: refs/heads/master
Commit: 87d09342bf5eef24ec22197284b16879c83e34dd
Parents: 58d4513
Author: Fabian Hueske <fh...@apache.org>
Authored: Mon Feb 6 23:36:05 2017 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Feb 15 10:24:14 2017 +0100

----------------------------------------------------------------------
 flink-connectors/flink-hbase/pom.xml            |   1 +
 .../addons/hbase/AbstractTableInputFormat.java  |  92 ++---
 .../flink/addons/hbase/HBaseRowInputFormat.java | 263 +++++++++++++++
 .../flink/addons/hbase/HBaseTableSchema.java    | 174 ++++++----
 .../flink/addons/hbase/HBaseTableSource.java    |  44 ++-
 .../hbase/HBaseTableSourceInputFormat.java      | 144 --------
 .../flink/addons/hbase/TableInputFormat.java    |   6 +-
 .../addons/hbase/HBaseConnectorITCase.java      | 338 +++++++++++++++++++
 .../addons/hbase/TableInputFormatITCase.java    | 122 -------
 .../addons/hbase/example/HBaseReadExample.java  |   2 +-
 .../hbase/example/HBaseTableSourceITCase.java   | 196 -----------
 11 files changed, 803 insertions(+), 579 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/87d09342/flink-connectors/flink-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/pom.xml b/flink-connectors/flink-hbase/pom.xml
index 14e2b59..96688f3 100644
--- a/flink-connectors/flink-hbase/pom.xml
+++ b/flink-connectors/flink-hbase/pom.xml
@@ -44,6 +44,7 @@ under the License.
 				<artifactId>maven-surefire-plugin</artifactId>
 				<version>2.19.1</version>
 				<configuration>
+					<argLine>-XX:MaxPermSize=128m</argLine>
 					<!-- Enforce single fork execution due to heavy mini cluster use in the tests -->
 					<forkCount>1</forkCount>
 				</configuration>

http://git-wip-us.apache.org/repos/asf/flink/blob/87d09342/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/AbstractTableInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/AbstractTableInputFormat.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/AbstractTableInputFormat.java
index d7f6ce9..59ba5b1f 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/AbstractTableInputFormat.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/AbstractTableInputFormat.java
@@ -38,12 +38,13 @@ import java.util.ArrayList;
 import java.util.List;
 
 /**
- * {@link InputFormat} subclass that wraps the access for HTables.
+ * Abstract {@link InputFormat} to read data from HBase tables.
  */
 public abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, TableInputSplit> {
 
 	protected static Logger LOG = LoggerFactory.getLogger(AbstractTableInputFormat.class);
-	/** helper variable to decide whether the input is exhausted or not */
+
+	// helper variable to decide whether the input is exhausted or not
 	protected boolean endReached = false;
 
 	protected transient HTable table = null;
@@ -52,34 +53,40 @@ public abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, Tab
 	/** HBase iterator wrapper */
 	protected ResultScanner resultScanner = null;
 
-	protected byte[] lastRow;
-	protected int scannedRows;
+	protected byte[] currentRow;
+	protected long scannedRows;
+
 	/**
 	 * Returns an instance of Scan that retrieves the required subset of records from the HBase table.
-	 * @return The appropriate instance of Scan for this usecase.
+	 *
+	 * @return The appropriate instance of Scan for this use case.
 	 */
 	protected abstract Scan getScanner();
 
 	/**
 	 * What table is to be read.
-	 * Per instance of a TableInputFormat derivative only a single tablename is possible.
+	 * Per instance of a TableInputFormat derivative only a single table name is possible.
+	 *
 	 * @return The name of the table
 	 */
 	protected abstract String getTableName();
 
 	/**
-	 * The output from HBase is always an instance of {@link Result}.
-	 * This method is to copy the data in the Result instance into the required {@link T}
+	 * HBase returns an instance of {@link Result}.
+	 *
+	 * This method maps the returned {@link Result} instance into the output type {@link T}.
+	 *
 	 * @param r The Result instance from HBase that needs to be converted
-	 * @return The approriate instance of {@link T} that contains the needed information.
+	 * @return The appropriate instance of {@link T} that contains the data of Result.
 	 */
-	protected abstract T mapResultToType(Result r);
+	protected abstract T mapResultToOutType(Result r);
 
 	/**
 	 * Creates a {@link Scan} object and opens the {@link HTable} connection.
 	 * These are opened here because they are needed in the createInputSplits
 	 * which is called before the openInputFormat method.
-	 * So the connection is opened in {@link #configure(Configuration)} and closed in {@link #closeInputFormat()}.
+	 *
+	 * The connection is opened in this method and closed in {@link #closeInputFormat()}.
 	 *
 	 * @param parameters The configuration that is to be used
 	 * @see Configuration
@@ -89,19 +96,23 @@ public abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, Tab
 	@Override
 	public void open(TableInputSplit split) throws IOException {
 		if (table == null) {
-			throw new IOException("The HBase table has not been opened!");
+			throw new IOException("The HBase table has not been opened! " +
+				"This needs to be done in configure().");
 		}
 		if (scan == null) {
-			throw new IOException("getScanner returned null");
+			throw new IOException("Scan has not been initialized! " +
+				"This needs to be done in configure().");
 		}
 		if (split == null) {
 			throw new IOException("Input split is null!");
 		}
 
 		logSplitInfo("opening", split);
-		scan.setStartRow(split.getStartRow());
-		lastRow = split.getEndRow();
-		scan.setStopRow(lastRow);
+
+		// set scan range
+		currentRow = split.getStartRow();
+		scan.setStartRow(currentRow);
+		scan.setStopRow(split.getEndRow());
 
 		resultScanner = table.getScanner(scan);
 		endReached = false;
@@ -116,20 +127,20 @@ public abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, Tab
 			Result res = resultScanner.next();
 			if (res != null) {
 				scannedRows++;
-				lastRow = res.getRow();
-				return mapResultToType(res);
+				currentRow = res.getRow();
+				return mapResultToOutType(res);
 			}
 		} catch (Exception e) {
 			resultScanner.close();
 			//workaround for timeout on scan
 			LOG.warn("Error after scan of " + scannedRows + " rows. Retry with a new scanner...", e);
-			scan.setStartRow(lastRow);
+			scan.setStartRow(currentRow);
 			resultScanner = table.getScanner(scan);
 			Result res = resultScanner.next();
 			if (res != null) {
 				scannedRows++;
-				lastRow = res.getRow();
-				return mapResultToType(res);
+				currentRow = res.getRow();
+				return mapResultToOutType(res);
 			}
 		}
 
@@ -146,6 +157,7 @@ public abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, Tab
 		String[] hostnames = split.getHostnames();
 		LOG.info("{} split (this={})[{}|{}|{}|{}]", action, this, splitId, hostnames, splitStartKey, splitStopKey);
 	}
+
 	@Override
 	public boolean reachedEnd() throws IOException {
 		return endReached;
@@ -154,7 +166,7 @@ public abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, Tab
 	@Override
 	public void close() throws IOException {
 		LOG.info("Closing split (scanned {} rows)", scannedRows);
-		lastRow = null;
+		currentRow = null;
 		try {
 			if (resultScanner != null) {
 				resultScanner.close();
@@ -178,14 +190,16 @@ public abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, Tab
 	@Override
 	public TableInputSplit[] createInputSplits(final int minNumSplits) throws IOException {
 		if (table == null) {
-			throw new IOException("The HBase table has not been opened!");
+			throw new IOException("The HBase table has not been opened! " +
+				"This needs to be done in configure().");
 		}
 		if (scan == null) {
-			throw new IOException("getScanner returned null");
+			throw new IOException("Scan has not been initialized! " +
+				"This needs to be done in configure().");
 		}
 
-		//Gets the starting and ending row keys for every region in the currently open table
-		final Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
+		// Get the starting and ending row keys for every region in the currently open table
+		final Pair<byte[][], byte[][]> keys = table.getRegionLocator().getStartEndKeys();
 		if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
 			throw new IOException("Expecting at least one region.");
 		}
@@ -198,15 +212,15 @@ public abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, Tab
 		for (int i = 0; i < keys.getFirst().length; i++) {
 			final byte[] startKey = keys.getFirst()[i];
 			final byte[] endKey = keys.getSecond()[i];
-			final String regionLocation = table.getRegionLocation(startKey, false).getHostnamePort();
-			//Test if the given region is to be included in the InputSplit while splitting the regions of a table
-			if (!includeRegionInSplit(startKey, endKey)) {
+			final String regionLocation = table.getRegionLocator().getRegionLocation(startKey, false).getHostnamePort();
+			// Test if the given region is to be included in the InputSplit while splitting the regions of a table
+			if (!includeRegionInScan(startKey, endKey)) {
 				continue;
 			}
-			//Finds the region on which the given row is being served
+			// Find the region on which the given row is being served
 			final String[] hosts = new String[]{regionLocation};
 
-			// determine if regions contains keys used by the scan
+			// Determine if regions contains keys used by the scan
 			boolean isLastRegion = endKey.length == 0;
 			if ((scanWithNoLowerBound || isLastRegion || Bytes.compareTo(startRow, endKey) < 0) &&
 				(scanWithNoUpperBound || Bytes.compareTo(stopRow, startKey) > 0)) {
@@ -223,27 +237,17 @@ public abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, Tab
 		for (TableInputSplit split : splits) {
 			logSplitInfo("created", split);
 		}
-		return splits.toArray(new TableInputSplit[0]);
+		return splits.toArray(new TableInputSplit[splits.size()]);
 	}
 
 	/**
-	 * Test if the given region is to be included in the InputSplit while splitting the regions of a table.
-	 * <p>
-	 * This optimization is effective when there is a specific reasoning to exclude an entire region from the M-R job,
-	 * (and hence, not contributing to the InputSplit), given the start and end keys of the same. <br>
-	 * Useful when we need to remember the last-processed top record and revisit the [last, current) interval for M-R
-	 * processing, continuously. In addition to reducing InputSplits, reduces the load on the region server as well, due
-	 * to the ordering of the keys. <br>
-	 * <br>
-	 * Note: It is possible that <code>endKey.length() == 0 </code> , for the last (recent) region. <br>
-	 * Override this method, if you want to bulk exclude regions altogether from M-R. By default, no region is excluded(
-	 * i.e. all regions are included).
+	 * Test if the given region is to be included in the scan while splitting the regions of a table.
 	 *
 	 * @param startKey Start key of the region
 	 * @param endKey   End key of the region
 	 * @return true, if this region needs to be included as part of the input (default).
 	 */
-	protected boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) {
+	protected boolean includeRegionInScan(final byte[] startKey, final byte[] endKey) {
 		return true;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/87d09342/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseRowInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseRowInputFormat.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseRowInputFormat.java
new file mode 100644
index 0000000..fff2a9e
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseRowInputFormat.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.charset.Charset;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * {@link InputFormat} subclass that wraps the access for HTables. Returns the result as {@link Row}
+ */
+public class HBaseRowInputFormat extends AbstractTableInputFormat<Row> implements ResultTypeQueryable<Row> {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(HBaseRowInputFormat.class);
+	private String tableName;
+	private transient org.apache.hadoop.conf.Configuration conf;
+	private HBaseTableSchema schema;
+	private transient Charset stringCharset;
+
+	// family keys
+	private byte[][] families;
+	// qualifier keys
+	private byte[][][] qualifiers;
+	// qualifier types
+	private int[][] types;
+
+	// row which is returned
+	private Row resultRow;
+	// nested family rows
+	private Row[] familyRows;
+
+	public HBaseRowInputFormat(org.apache.hadoop.conf.Configuration conf, String tableName, HBaseTableSchema schema) {
+		this.tableName = tableName;
+		this.conf = conf;
+		this.schema = schema;
+
+		// set families, qualifiers, and types
+		String[] familyNames = schema.getFamilyNames();
+		this.families = schema.getFamilyKeys();
+		this.qualifiers = new byte[this.families.length][][];
+		this.types = new int[this.families.length][];
+		for (int f = 0; f < families.length; f++) {
+			this.qualifiers[f] = schema.getQualifierKeys(familyNames[f]);
+			TypeInformation[] typeInfos = schema.getQualifierTypes(familyNames[f]);
+			this.types[f] = new int[typeInfos.length];
+			for (int i = 0; i < typeInfos.length; i++) {
+				int typeIdx = getTypeIndex(typeInfos[i].getTypeClass());
+				if (typeIdx >= 0) {
+					types[f][i] = typeIdx;
+				} else {
+					throw new IllegalArgumentException("Unsupported type: " + typeInfos[i]);
+				}
+			}
+		}
+	}
+
+	@Override
+	public void configure(Configuration parameters) {
+		LOG.info("Initializing HBase configuration.");
+		connectToTable();
+		if(table != null) {
+			scan = getScanner();
+		}
+
+		// prepare output rows
+		this.resultRow = new Row(families.length);
+		this.familyRows = new Row[families.length];
+		for (int f = 0; f < families.length; f++) {
+			this.familyRows[f] = new Row(qualifiers[f].length);
+			this.resultRow.setField(f, this.familyRows[f]);
+		}
+
+		this.stringCharset = Charset.forName(schema.getStringCharset());
+	}
+
+	@Override
+	protected Scan getScanner() {
+		Scan scan = new Scan();
+		for (int f = 0; f < families.length; f++) {
+			byte[] family = families[f];
+			for (int q = 0; q < qualifiers[f].length; q++) {
+				byte[] quantifier = qualifiers[f][q];
+				scan.addColumn(family, quantifier);
+			}
+		}
+		return scan;
+	}
+
+	@Override
+	public String getTableName() {
+		return tableName;
+	}
+
+	@Override
+	protected Row mapResultToOutType(Result res) {
+		for (int f = 0; f < this.families.length; f++) {
+			// get family key
+			byte[] familyKey = families[f];
+			Row familyRow = familyRows[f];
+			for (int q = 0; q < this.qualifiers[f].length; q++) {
+				// get quantifier key
+				byte[] qualifier = qualifiers[f][q];
+				// get quantifier type idx
+				int typeIdx = types[f][q];
+				// read value
+				byte[] value = res.getValue(familyKey, qualifier);
+				if(value != null) {
+					familyRow.setField(q, deserialize(value, typeIdx));
+				} else {
+					familyRow.setField(q, null);
+				}
+			}
+			resultRow.setField(f, familyRow);
+		}
+		return resultRow;
+	}
+
+	private void connectToTable() {
+
+		if (this.conf == null) {
+			this.conf = HBaseConfiguration.create();
+		}
+
+		try {
+			Connection conn = ConnectionFactory.createConnection(conf);
+			super.table = (HTable) conn.getTable(TableName.valueOf(tableName));
+		} catch(TableNotFoundException tnfe) {
+			LOG.error("The table " + tableName + " not found ", tnfe);
+			throw new RuntimeException("HBase table '" + tableName + "' not found.", tnfe);
+		} catch(IOException ioe) {
+			LOG.error("Exception while creating connection to HBase.", ioe);
+			throw new RuntimeException("Cannot create connection to HBase.", ioe);
+		}
+	}
+
+	@Override
+	public TypeInformation<Row> getProducedType() {
+		// split the fieldNames
+		String[] famNames = schema.getFamilyNames();
+		TypeInformation<?>[] typeInfos = new TypeInformation[famNames.length];
+		int i = 0;
+		for (String family : famNames) {
+			typeInfos[i] = new RowTypeInfo(schema.getQualifierTypes(family), schema.getQualifierNames(family));
+			i++;
+		}
+		return new RowTypeInfo(typeInfos, famNames);
+	}
+
+	private Object deserialize(byte[] value, int typeIdx) {
+		switch (typeIdx) {
+			case 0: // byte[]
+				return value;
+			case 1:
+				return new String(value, stringCharset);
+			case 2: // byte
+				return value[0];
+			case 3:
+				return Bytes.toShort(value);
+			case 4:
+				return Bytes.toInt(value);
+			case 5:
+				return Bytes.toLong(value);
+			case 6:
+				return Bytes.toFloat(value);
+			case 7:
+				return Bytes.toDouble(value);
+			case 8:
+				return Bytes.toBoolean(value);
+			case 9: // sql.Timestamp encoded as long
+				return new Timestamp(Bytes.toLong(value));
+			case 10: // sql.Date encoded as long
+				return new Date(Bytes.toLong(value));
+			case 11: // sql.Time encoded as long
+				return new Time(Bytes.toLong(value));
+			case 12:
+				return Bytes.toBigDecimal(value);
+			case 13:
+				return new BigInteger(value);
+
+			default:
+				throw new IllegalArgumentException("Unknown type index " + typeIdx);
+		}
+	}
+
+	private static int getTypeIndex(Class<?> clazz) {
+		if (byte[].class.equals(clazz)) {
+			return 0;
+		} else if (String.class.equals(clazz)) {
+			return 1;
+		} else if (Byte.class.equals(clazz)) {
+			return 2;
+		} else if (Short.class.equals(clazz)) {
+			return 3;
+		} else if (Integer.class.equals(clazz)) {
+			return 4;
+		} else if (Long.class.equals(clazz)) {
+			return 5;
+		} else if (Float.class.equals(clazz)) {
+			return 6;
+		} else if (Double.class.equals(clazz)) {
+			return 7;
+		} else if (Boolean.class.equals(clazz)) {
+			return 8;
+		} else if (Timestamp.class.equals(clazz)) {
+			return 9;
+		} else if (Date.class.equals(clazz)) {
+			return 10;
+		} else if (Time.class.equals(clazz)) {
+			return 11;
+		} else if (BigDecimal.class.equals(clazz)) {
+			return 12;
+		} else if (BigInteger.class.equals(clazz)) {
+			return 13;
+		} else {
+			return -1;
+		}
+	}
+
+	static boolean isSupportedType(Class<?> clazz) {
+		return getTypeIndex(clazz) != -1;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/87d09342/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
index d3c9fb8..b6b3916 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
@@ -17,20 +17,14 @@
  */
 package org.apache.flink.addons.hbase;
 
-import com.google.common.collect.ImmutableCollection;
-import com.google.common.collect.ImmutableList;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.util.Preconditions;
-import org.apache.hadoop.hbase.util.Bytes;
 
 import java.io.Serializable;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.sql.Time;
+import java.nio.charset.Charset;
+import java.util.LinkedHashMap;
 import java.util.Map;
-import java.sql.Date;
-import java.util.TreeMap;
 
 /**
  * Helps to specify an HBase Table's schema
@@ -38,17 +32,13 @@ import java.util.TreeMap;
 public class HBaseTableSchema implements Serializable {
 
 	// A Map with key as column family.
-	// Guarantees natural ordering
-	private final Map<String, Map<String, TypeInformation<?>>> familyMap =
-		new TreeMap<>();
+	private final Map<String, Map<String, TypeInformation<?>>> familyMap = new LinkedHashMap<>();
 
-	// Allowed types. This may change.
-	private static ImmutableCollection<Class<?>> CLASS_TYPES = ImmutableList.<Class<?>>of(
-		Integer.class, Short.class, Float.class, Long.class, String.class, Byte.class, Boolean.class, Double.class, BigInteger.class, BigDecimal.class, Date.class, Time.class, byte[].class
-	);
+	// charset to parse HBase keys and strings. UTF-8 by default.
+	private String charset = "UTF-8";
 
 	/**
-	 * Allows specifying the family and qualifier name along with the data type of the qualifier for an HBase table
+	 * Adds a column defined by family, qualifier, and type to the table schema.
 	 *
 	 * @param family    the family name
 	 * @param qualifier the qualifier name
@@ -58,76 +48,138 @@ public class HBaseTableSchema implements Serializable {
 		Preconditions.checkNotNull(family, "family name");
 		Preconditions.checkNotNull(qualifier, "qualifier name");
 		Preconditions.checkNotNull(clazz, "class type");
-		Map<String, TypeInformation<?>> map = this.familyMap.get(family);
-		if (map == null) {
-			map = new TreeMap<>();
-		}
-		if (!CLASS_TYPES.contains(clazz)) {
+		Map<String, TypeInformation<?>> qualifierMap = this.familyMap.get(family);
+
+		if (!HBaseRowInputFormat.isSupportedType(clazz)) {
 			// throw exception
-			throw new IllegalArgumentException("Unsupported class type found " + clazz+". Better to use byte[].class and deserialize using user defined scalar functions");
+			throw new IllegalArgumentException("Unsupported class type found " + clazz+". " +
+				"Better to use byte[].class and deserialize using user defined scalar functions");
+		}
+
+		if (qualifierMap == null) {
+			qualifierMap = new LinkedHashMap<>();
 		}
-		map.put(qualifier, TypeExtractor.getForClass(clazz));
-		familyMap.put(family, map);
+		qualifierMap.put(qualifier, TypeExtractor.getForClass(clazz));
+		familyMap.put(family, qualifierMap);
 	}
 
+	/**
+	 * Sets the charset for value strings and HBase identifiers.
+	 *
+	 * @param charset the charset for value strings and HBase identifiers.
+	 */
+	void setCharset(String charset) {
+		this.charset = charset;
+	}
+
+	/**
+	 * Returns the names of all registered column families.
+	 *
+	 * @return The names of all registered column families.
+	 */
 	String[] getFamilyNames() {
 		return this.familyMap.keySet().toArray(new String[this.familyMap.size()]);
 	}
 
+	/**
+	 * Returns the HBase identifiers of all registered column families.
+	 *
+	 * @return The HBase identifiers of all registered column families.
+	 */
+	byte[][] getFamilyKeys() {
+		Charset c = Charset.forName(charset);
+
+		byte[][] familyKeys = new byte[this.familyMap.size()][];
+		int i = 0;
+		for(String name : this.familyMap.keySet()) {
+			familyKeys[i++] = name.getBytes(c);
+		}
+		return familyKeys;
+	}
+
+	/**
+	 * Returns the names of all registered column qualifiers of a specific column family.
+	 *
+	 * @param family The name of the column family for which the column qualifier names are returned.
+	 * @return The names of all registered column qualifiers of a specific column family.
+	 */
 	String[] getQualifierNames(String family) {
-		Map<String, TypeInformation<?>> colDetails = familyMap.get(family);
-		String[] qualifierNames = new String[colDetails.size()];
+		Map<String, TypeInformation<?>> qualifierMap = familyMap.get(family);
+
+		if (qualifierMap == null) {
+			throw new IllegalArgumentException("Family " + family + " does not exist in schema.");
+		}
+
+		String[] qualifierNames = new String[qualifierMap.size()];
 		int i = 0;
-		for (String qualifier: colDetails.keySet()) {
+		for (String qualifier: qualifierMap.keySet()) {
 			qualifierNames[i] = qualifier;
 			i++;
 		}
 		return qualifierNames;
 	}
 
+	/**
+	 * Returns the HBase identifiers of all registered column qualifiers for a specific column family.
+	 *
+	 * @param family The name of the column family for which the column qualifier identifiers are returned.
+	 * @return The HBase identifiers of all registered column qualifiers for a specific column family.
+	 */
+	byte[][] getQualifierKeys(String family) {
+		Map<String, TypeInformation<?>> qualifierMap = familyMap.get(family);
+
+		if (qualifierMap == null) {
+			throw new IllegalArgumentException("Family " + family + " does not exist in schema.");
+		}
+		Charset c = Charset.forName(charset);
+
+		byte[][] qualifierKeys = new byte[qualifierMap.size()][];
+		int i = 0;
+		for(String name : qualifierMap.keySet()) {
+			qualifierKeys[i++] = name.getBytes(c);
+		}
+		return qualifierKeys;
+	}
+
+	/**
+	 * Returns the types of all registered column qualifiers of a specific column family.
+	 *
+	 * @param family The name of the column family for which the column qualifier types are returned.
+	 * @return The types of all registered column qualifiers of a specific column family.
+	 */
 	TypeInformation<?>[] getQualifierTypes(String family) {
-		Map<String, TypeInformation<?>> colDetails = familyMap.get(family);
-		TypeInformation<?>[] typeInformations = new TypeInformation[colDetails.size()];
+		Map<String, TypeInformation<?>> qualifierMap = familyMap.get(family);
+
+		if (qualifierMap == null) {
+			throw new IllegalArgumentException("Family " + family + " does not exist in schema.");
+		}
+
+		TypeInformation<?>[] typeInformation = new TypeInformation[qualifierMap.size()];
 		int i = 0;
-		for (TypeInformation<?> typeInfo : colDetails.values()) {
-			typeInformations[i] = typeInfo;
+		for (TypeInformation<?> typeInfo : qualifierMap.values()) {
+			typeInformation[i] = typeInfo;
 			i++;
 		}
-		return typeInformations;
+		return typeInformation;
 	}
 
+	/**
+	 * Returns the names and types of all registered column qualifiers of a specific column family.
+	 *
+	 * @param family The name of the column family for which the column qualifier names and types are returned.
+	 * @return The names and types of all registered column qualifiers of a specific column family.
+	 */
 	Map<String, TypeInformation<?>> getFamilyInfo(String family) {
 		return familyMap.get(family);
 	}
 
-	Object deserialize(byte[] value, TypeInformation<?> typeInfo) {
-		if (typeInfo.isBasicType()) {
-			if (typeInfo.getTypeClass() == Integer.class) {
-				return Bytes.toInt(value);
-			} else if (typeInfo.getTypeClass() == Short.class) {
-				return Bytes.toShort(value);
-			} else if (typeInfo.getTypeClass() == Float.class) {
-				return Bytes.toFloat(value);
-			} else if (typeInfo.getTypeClass() == Long.class) {
-				return Bytes.toLong(value);
-			} else if (typeInfo.getTypeClass() == String.class) {
-				return Bytes.toString(value);
-			} else if (typeInfo.getTypeClass() == Byte.class) {
-				return value[0];
-			} else if (typeInfo.getTypeClass() == Boolean.class) {
-				return Bytes.toBoolean(value);
-			} else if (typeInfo.getTypeClass() == Double.class) {
-				return Bytes.toDouble(value);
-			} else if (typeInfo.getTypeClass() == BigInteger.class) {
-				return new BigInteger(value);
-			} else if (typeInfo.getTypeClass() == BigDecimal.class) {
-				return Bytes.toBigDecimal(value);
-			} else if (typeInfo.getTypeClass() == Date.class) {
-				return new Date(Bytes.toLong(value));
-			} else if (typeInfo.getTypeClass() == Time.class) {
-				return new Time(Bytes.toLong(value));
-			}
-		}
-		return value;
+	/**
+	 * Returns the charset for value strings and HBase identifiers.
+	 *
+	 * @return The charset for value strings and HBase identifiers.
+	 */
+	String getStringCharset() {
+		return this.charset;
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/87d09342/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
index c484bd7..a1be23f 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
@@ -30,11 +30,27 @@ import org.apache.hadoop.conf.Configuration;
 import java.util.Map;
 
 /**
- * Creates a table source that helps to scan data from an hbase table
- * The table name is passed during {@link HBaseTableSource} construction along with the required hbase configurations.
- * Use {@link #addColumn(String, String, Class)} to specify the family name, qualifier and the type of the qualifier that needs to be scanned
- * This supports nested schema, for eg: if we have a column family 'Person' and two qualifiers 'Name' (type String) and 'Unique_Id' (Type Integer),
- * then we represent this schema as Row<Person : Row<Name: String, Unique_Id : Integer>>
+ * Creates a TableSource to scan an HBase table.
+ *
+ * The table name and required HBase configuration is passed during {@link HBaseTableSource} construction.
+ * Use {@link #addColumn(String, String, Class)} to specify the family, qualifier, and type of columns to scan.
+ *
+ * The TableSource returns {@link Row} with nested Rows for each column family.
+ *
+ * The HBaseTableSource is used as shown in the example below.
+ *
+ * <pre>
+ * {@code
+ * HBaseTableSource hSrc = new HBaseTableSource(conf, "hTable");
+ * hSrc.addColumn("fam1", "col1", byte[].class);
+ * hSrc.addColumn("fam1", "col2", Integer.class);
+ * hSrc.addColumn("fam2", "col1", String.class);
+ *
+ * tableEnv.registerTableSource("hTable", hSrc);
+ * Table res = tableEnv.sql("SELECT t.fam2.col1, SUM(t.fam1.col2) FROM hTable AS t GROUP BY t.fam2.col1");
+ * }
+ * </pre>
+ *
  */
 public class HBaseTableSource implements BatchTableSource<Row>, ProjectableTableSource<Row> {
 
@@ -43,7 +59,7 @@ public class HBaseTableSource implements BatchTableSource<Row>, ProjectableTable
 	private HBaseTableSchema schema;
 
 	/**
-	 * The hbase configuration and the table name that acts as the hbase table source
+	 * The HBase configuration and the name of the table to read.
 	 *
 	 * @param conf      hbase configuration
 	 * @param tableName the tableName
@@ -55,7 +71,7 @@ public class HBaseTableSource implements BatchTableSource<Row>, ProjectableTable
 	}
 
 	/**
-	 * Allows specifying the family and qualifier name along with the data type of the qualifier for an HBase table
+	 * Adds a column defined by family, qualifier, and type to the table schema.
 	 *
 	 * @param family    the family name
 	 * @param qualifier the qualifier name
@@ -65,6 +81,15 @@ public class HBaseTableSource implements BatchTableSource<Row>, ProjectableTable
 		this.schema.addColumn(family, qualifier, clazz);
 	}
 
+	/**
+	 * Specifies the charset to parse Strings to HBase byte[] keys and String values.
+	 *
+	 * @param charset Name of the charset to use.
+	 */
+	public void setCharset(String charset) {
+		this.schema.setCharset(charset);
+	}
+
 	@Override
 	public TypeInformation<Row> getReturnType() {
 		String[] famNames = schema.getFamilyNames();
@@ -74,13 +99,12 @@ public class HBaseTableSource implements BatchTableSource<Row>, ProjectableTable
 			typeInfos[i] = new RowTypeInfo(schema.getQualifierTypes(family), schema.getQualifierNames(family));
 			i++;
 		}
-		RowTypeInfo rowInfo = new RowTypeInfo(typeInfos, famNames);
-		return rowInfo;
+		return new RowTypeInfo(typeInfos, famNames);
 	}
 
 	@Override
 	public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) {
-		return execEnv.createInput(new HBaseTableSourceInputFormat(conf, tableName, schema), getReturnType());
+		return execEnv.createInput(new HBaseRowInputFormat(conf, tableName, schema), getReturnType());
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/87d09342/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java
deleted file mode 100644
index 6c4d7da..0000000
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.addons.hbase;
-
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.Row;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * {@link InputFormat} subclass that wraps the access for HTables. Returns the result as {@link Row}
- */
-public class HBaseTableSourceInputFormat extends AbstractTableInputFormat<Row> implements ResultTypeQueryable<Row> {
-
-	private static final long serialVersionUID = 1L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(HBaseTableSourceInputFormat.class);
-	private String tableName;
-	private transient Connection conn;
-	private transient org.apache.hadoop.conf.Configuration conf;
-	private HBaseTableSchema schema;
-
-	public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration conf, String tableName, HBaseTableSchema schema) {
-		this.tableName = tableName;
-		this.conf = conf;
-		this.schema = schema;
-	}
-
-	@Override
-	public void configure(Configuration parameters) {
-		LOG.info("Initializing HBaseConfiguration");
-		connectToTable();
-		if(table != null) {
-			scan = getScanner();
-		}
-	}
-
-	@Override
-	protected Scan getScanner() {
-		Scan scan = new Scan();
-		for(String family : schema.getFamilyNames()) {
-			for(String qualifierName : schema.getQualifierNames(family)) {
-				scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifierName));
-			}
-		}
-		return scan;
-	}
-
-	@Override
-	public String getTableName() {
-		return tableName;
-	}
-
-	@Override
-	protected Row mapResultToType(Result res) {
-		List<Object> values = new ArrayList<Object>();
-		int i = 0;
-		String[] familyNames = schema.getFamilyNames();
-		Object[] rows = new Object[familyNames.length];
-		for(String family : familyNames) {
-			Map<String, TypeInformation<?>> infos = schema.getFamilyInfo(family);
-			for(String qualifier : infos.keySet()) {
-				byte[] value = res.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier));
-				if(value != null) {
-					values.add(schema.deserialize(value, infos.get(qualifier)));
-				} else {
-					values.add(null);
-				}
-			}
-			rows[i] = Row.of(values.toArray(new Object[values.size()]));
-			values.clear();
-			i++;
-		}
-		return Row.of(rows);
-	}
-
-	private void connectToTable() {
-		//use files found in the classpath
-		if(this.conf == null) {
-			this.conf = HBaseConfiguration.create();
-		}
-		try {
-			conn = ConnectionFactory.createConnection(conf);
-		} catch(IOException ioe) {
-			LOG.error("Exception while creating connection to hbase cluster", ioe);
-			return;
-		}
-		try {
-			table = (HTable)conn.getTable(TableName.valueOf(tableName));
-		} catch(TableNotFoundException tnfe) {
-			LOG.error("The table " + tableName + " not found ", tnfe);
-		} catch(IOException ioe) {
-			LOG.error("Exception while connecting to the table "+tableName+ " ", ioe);
-		}
-	}
-
-	@Override
-	public TypeInformation<Row> getProducedType() {
-		// split the fieldNames
-		String[] famNames = schema.getFamilyNames();
-		TypeInformation<?>[] typeInfos = new TypeInformation[famNames.length];
-		int i = 0;
-		for (String family : famNames) {
-			typeInfos[i] = new RowTypeInfo(schema.getQualifierTypes(family), schema.getQualifierNames(family));
-			i++;
-		}
-		RowTypeInfo rowInfo = new RowTypeInfo(typeInfos, famNames);
-		return rowInfo;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/87d09342/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
index 467652a..6ea2d04 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
@@ -52,7 +52,7 @@ public abstract class TableInputFormat<T extends Tuple> extends AbstractTableInp
 	 * @param r The Result instance from HBase that needs to be converted
 	 * @return The approriate instance of {@link Tuple} that contains the needed information.
 	 */
-	protected abstract T mapResultToType(Result r);
+	protected abstract T mapResultToTuple(Result r);
 
 	/**
 	 * Creates a {@link Scan} object and opens the {@link HTable} connection.
@@ -86,4 +86,8 @@ public abstract class TableInputFormat<T extends Tuple> extends AbstractTableInp
 		}
 		return null;
 	}
+
+	protected T mapResultToOutType(Result r) {
+		return mapResultToTuple(r);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/87d09342/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
new file mode 100644
index 0000000..c1aa9a0
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
@@ -0,0 +1,338 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * This class contains integrations tests for multiple HBase connectors:
+ * - TableInputFormat
+ * - HBaseTableSource
+ *
+ * These tests are located in a single test file to avoided unnecessary initializations of the
+ * HBaseTestingCluster which takes about half a minute.
+ *
+ */
+public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
+
+	private static final String TEST_TABLE = "testTable";
+
+	private static final String FAMILY1 = "family1";
+	private static final String F1COL1 = "col1";
+
+	private static final String FAMILY2 = "family2";
+	private static final String F2COL1 = "col1";
+	private static final String F2COL2 = "col2";
+
+	private static final String FAMILY3 = "family3";
+	private static final String F3COL1 = "col1";
+	private static final String F3COL2 = "col2";
+	private static final String F3COL3 = "col3";
+
+	@BeforeClass
+	public static void activateHBaseCluster() throws IOException {
+		registerHBaseMiniClusterInClasspath();
+		prepareTable();
+	}
+
+	private static void prepareTable() throws IOException {
+
+		// create a table
+		TableName tableName = TableName.valueOf(TEST_TABLE);
+		// column families
+		byte[][] families = new byte[][]{
+			Bytes.toBytes(FAMILY1),
+			Bytes.toBytes(FAMILY2),
+			Bytes.toBytes(FAMILY3)
+		};
+		// split keys
+		byte[][] splitKeys = new byte[][]{ Bytes.toBytes(4) };
+		createTable(tableName, families, splitKeys);
+
+		// get the HTable instance
+		HTable table = openTable(tableName);
+		List<Put> puts = new ArrayList<>();
+		// add some data
+		puts.add(putRow(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1"));
+		puts.add(putRow(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2"));
+		puts.add(putRow(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3"));
+		puts.add(putRow(4, 40, null, 400L, 4.04, true, "Welt-4"));
+		puts.add(putRow(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5"));
+		puts.add(putRow(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6"));
+		puts.add(putRow(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7"));
+		puts.add(putRow(8, 80, null, 800L, 8.08, true, "Welt-8"));
+
+		// append rows to table
+		table.put(puts);
+		table.close();
+	}
+
+	private static Put putRow(int rowKey, int f1c1, String f2c1, long f2c2, double f3c1, boolean f3c2, String f3c3) {
+		Put put = new Put(Bytes.toBytes(rowKey));
+		// family 1
+		put.addColumn(Bytes.toBytes(FAMILY1), Bytes.toBytes(F1COL1), Bytes.toBytes(f1c1));
+		// family 2
+		if (f2c1 != null) {
+			put.addColumn(Bytes.toBytes(FAMILY2), Bytes.toBytes(F2COL1), Bytes.toBytes(f2c1));
+		}
+		put.addColumn(Bytes.toBytes(FAMILY2), Bytes.toBytes(F2COL2), Bytes.toBytes(f2c2));
+		// family 3
+		put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL1), Bytes.toBytes(f3c1));
+		put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL2), Bytes.toBytes(f3c2));
+		put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL3), Bytes.toBytes(f3c3));
+
+		return put;
+	}
+
+	// ######## HBaseTableSource tests ############
+
+	@Test
+	public void testTableSourceFullScan() throws Exception {
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(4);
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, new TableConfig());
+		HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE);
+		hbaseTable.addColumn(FAMILY1, F1COL1, Integer.class);
+		hbaseTable.addColumn(FAMILY2, F2COL1, String.class);
+		hbaseTable.addColumn(FAMILY2, F2COL2, Long.class);
+		hbaseTable.addColumn(FAMILY3, F3COL1, Double.class);
+		hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class);
+		hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
+		tableEnv.registerTableSource("hTable", hbaseTable);
+
+		Table result = tableEnv.sql(
+			"SELECT " +
+				"  h.family1.col1, " +
+				"  h.family2.col1, " +
+				"  h.family2.col2, " +
+				"  h.family3.col1, " +
+				"  h.family3.col2, " +
+				"  h.family3.col3 " +
+				"FROM hTable AS h"
+		);
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+
+		String expected =
+			"10,Hello-1,100,1.01,false,Welt-1\n" +
+			"20,Hello-2,200,2.02,true,Welt-2\n" +
+			"30,Hello-3,300,3.03,false,Welt-3\n" +
+			"40,null,400,4.04,true,Welt-4\n" +
+			"50,Hello-5,500,5.05,false,Welt-5\n" +
+			"60,Hello-6,600,6.06,true,Welt-6\n" +
+			"70,Hello-7,700,7.07,false,Welt-7\n" +
+			"80,null,800,8.08,true,Welt-8\n";
+
+		TestBaseUtils.compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testTableSourceProjection() throws Exception {
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(4);
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, new TableConfig());
+		HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE);
+		hbaseTable.addColumn(FAMILY1, F1COL1, Integer.class);
+		hbaseTable.addColumn(FAMILY2, F2COL1, String.class);
+		hbaseTable.addColumn(FAMILY2, F2COL2, Long.class);
+		hbaseTable.addColumn(FAMILY3, F3COL1, Double.class);
+		hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class);
+		hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
+		tableEnv.registerTableSource("hTable", hbaseTable);
+
+		Table result = tableEnv.sql(
+			"SELECT " +
+				"  h.family1.col1, " +
+				"  h.family3.col1, " +
+				"  h.family3.col2, " +
+				"  h.family3.col3 " +
+				"FROM hTable AS h"
+		);
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+
+		String expected =
+			"10,1.01,false,Welt-1\n" +
+			"20,2.02,true,Welt-2\n" +
+			"30,3.03,false,Welt-3\n" +
+			"40,4.04,true,Welt-4\n" +
+			"50,5.05,false,Welt-5\n" +
+			"60,6.06,true,Welt-6\n" +
+			"70,7.07,false,Welt-7\n" +
+			"80,8.08,true,Welt-8\n";
+
+		TestBaseUtils.compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testTableSourceFieldOrder() throws Exception {
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(4);
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, new TableConfig());
+		HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE);
+		// shuffle order of column registration
+		hbaseTable.addColumn(FAMILY2, F2COL1, String.class);
+		hbaseTable.addColumn(FAMILY3, F3COL1, Double.class);
+		hbaseTable.addColumn(FAMILY1, F1COL1, Integer.class);
+		hbaseTable.addColumn(FAMILY2, F2COL2, Long.class);
+		hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class);
+		hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
+		tableEnv.registerTableSource("hTable", hbaseTable);
+
+		Table result = tableEnv.sql(
+			"SELECT * FROM hTable AS h"
+		);
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+
+		String expected =
+			"Hello-1,100,1.01,false,Welt-1,10\n" +
+			"Hello-2,200,2.02,true,Welt-2,20\n" +
+			"Hello-3,300,3.03,false,Welt-3,30\n" +
+			"null,400,4.04,true,Welt-4,40\n" +
+			"Hello-5,500,5.05,false,Welt-5,50\n" +
+			"Hello-6,600,6.06,true,Welt-6,60\n" +
+			"Hello-7,700,7.07,false,Welt-7,70\n" +
+			"null,800,8.08,true,Welt-8,80\n";
+
+		TestBaseUtils.compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testTableSourceReadAsByteArray() throws Exception {
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(4);
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, new TableConfig());
+		// fetch row2 from the table till the end
+		HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE);
+		hbaseTable.addColumn(FAMILY2, F2COL1, byte[].class);
+		hbaseTable.addColumn(FAMILY2, F2COL2, byte[].class);
+
+		tableEnv.registerTableSource("hTable", hbaseTable);
+		tableEnv.registerFunction("toUTF8", new ToUTF8());
+		tableEnv.registerFunction("toLong", new ToLong());
+
+		Table result = tableEnv.sql(
+			"SELECT " +
+				"  toUTF8(h.family2.col1), " +
+				"  toLong(h.family2.col2) " +
+				"FROM hTable AS h"
+		);
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+
+		String expected =
+			"Hello-1,100\n" +
+			"Hello-2,200\n" +
+			"Hello-3,300\n" +
+			"null,400\n" +
+			"Hello-5,500\n" +
+			"Hello-6,600\n" +
+			"Hello-7,700\n" +
+			"null,800\n";
+
+		TestBaseUtils.compareResultAsText(results, expected);
+	}
+
+	public static class ToUTF8 extends ScalarFunction {
+
+		public String eval(byte[] bytes) {
+			return Bytes.toString(bytes);
+		}
+	}
+
+	public static class ToLong extends ScalarFunction {
+
+		public long eval(byte[] bytes) {
+			return Bytes.toLong(bytes);
+		}
+	}
+
+	// ######## TableInputFormate tests ############
+
+	class InputFormatForTestTable extends TableInputFormat<Tuple1<Integer>> {
+
+		@Override
+		protected Scan getScanner() {
+			return new Scan();
+		}
+
+		@Override
+		protected String getTableName() {
+			return TEST_TABLE;
+		}
+
+		@Override
+		protected Tuple1<Integer> mapResultToTuple(Result r) {
+			return new Tuple1<>(Bytes.toInt(r.getValue(Bytes.toBytes(FAMILY1), Bytes.toBytes(F1COL1))));
+		}
+	}
+
+	@Test
+	public void testTableInputFormat() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(4);
+
+		DataSet<Tuple1<Integer>> result = env
+			.createInput(new InputFormatForTestTable())
+			.reduce(new ReduceFunction<Tuple1<Integer>>(){
+
+				@Override
+				public Tuple1<Integer> reduce(Tuple1<Integer> v1, Tuple1<Integer> v2) throws Exception {
+					return Tuple1.of(v1.f0 + v2.f0);
+				}
+			});
+
+		List<Tuple1<Integer>> resultSet = result.collect();
+
+		assertEquals(1, resultSet.size());
+		assertEquals(360, (int)resultSet.get(0).f0);
+	}
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/87d09342/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java
deleted file mode 100644
index 57ccafd..0000000
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.addons.hbase;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class TableInputFormatITCase extends HBaseTestingClusterAutostarter {
-	private static final String TEST_TABLE_NAME = "TableInputFormatTestTable";
-	private static final byte[] TEST_TABLE_FAMILY_NAME = "F".getBytes();
-	private static final byte[] TEST_TABLE_COLUMN_NAME = "Col".getBytes();
-
-	// These are the row ids AND also the values we will put in the test table
-	private static final String[] ROW_IDS = {"000", "111", "222", "333", "444", "555", "666", "777", "888", "999"};
-
-	@BeforeClass
-	public static void activateHBaseCluster(){
-		registerHBaseMiniClusterInClasspath();
-	}
-
-	@Before
-	public void createTestTable() throws IOException {
-		TableName tableName = TableName.valueOf(TEST_TABLE_NAME);
-		byte[][] splitKeys = {"0".getBytes(), "3".getBytes(), "6".getBytes(), "9".getBytes()};
-		byte[][] famNames = new byte[1][];
-		famNames[0] = TEST_TABLE_FAMILY_NAME;
-		createTable(tableName, famNames, splitKeys);
-		HTable table = openTable(tableName);
-
-		for (String rowId : ROW_IDS) {
-			byte[] rowIdBytes = rowId.getBytes();
-			Put p = new Put(rowIdBytes);
-			// Use the rowId as the value to facilitate the testing better
-			p.add(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME, rowIdBytes);
-			table.put(p);
-		}
-
-		table.close();
-	}
-
-	class InputFormatForTestTable extends TableInputFormat<Tuple1<String>> {
-		@Override
-		protected Scan getScanner() {
-			return new Scan();
-		}
-
-		@Override
-		protected String getTableName() {
-			return TEST_TABLE_NAME;
-		}
-
-		@Override
-		protected Tuple1<String> mapResultToType(Result r) {
-			return new Tuple1<>(new String(r.getValue(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME)));
-		}
-	}
-
-	@Test
-	public void testTableInputFormat() {
-		ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
-		environment.setParallelism(1);
-
-		DataSet<String> resultDataSet =
-			environment.createInput(new InputFormatForTestTable()).map(new MapFunction<Tuple1<String>, String>() {
-				@Override
-				public String map(Tuple1<String> value) throws Exception {
-					return value.f0;
-				}
-			});
-
-		List<String> resultSet = new ArrayList<>();
-		resultDataSet.output(new LocalCollectionOutputFormat<>(resultSet));
-
-		try {
-			environment.execute("HBase InputFormat Test");
-		} catch (Exception e) {
-			Assert.fail("HBase InputFormat test failed. " + e.getMessage());
-		}
-
-		for (String rowId : ROW_IDS) {
-			assertTrue("Missing rowId from table: " + rowId, resultSet.contains(rowId));
-		}
-
-		assertEquals("The number of records is wrong.", ROW_IDS.length, resultSet.size());
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/87d09342/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
index 5377791..dccf876 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
@@ -63,7 +63,7 @@ public class HBaseReadExample {
 				private Tuple2<String, String> reuse = new Tuple2<String, String>();
 				
 				@Override
-				protected Tuple2<String, String> mapResultToType(Result r) {
+				protected Tuple2<String, String> mapResultToTuple(Result r) {
 					String key = Bytes.toString(r.getRow());
 					String val = Bytes.toString(r.getValue(HBaseFlinkTestConstants.CF_SOME, HBaseFlinkTestConstants.Q_SOME));
 					reuse.setField(key, 0);

http://git-wip-us.apache.org/repos/asf/flink/blob/87d09342/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java
deleted file mode 100644
index 8882d5c..0000000
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Copyright The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.addons.hbase.example;
-
-import org.apache.flink.addons.hbase.HBaseTableSchema;
-import org.apache.flink.addons.hbase.HBaseTableSource;
-import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.java.BatchTableEnvironment;
-import org.apache.flink.table.sources.BatchTableSource;
-import org.apache.flink.test.util.TestBaseUtils;
-import org.apache.flink.types.Row;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-
-public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter {
-
-	private static final byte[] ROW_1 = Bytes.toBytes("row1");
-	private static final byte[] ROW_2 = Bytes.toBytes("row2");
-	private static final byte[] ROW_3 = Bytes.toBytes("row3");
-	private static final byte[] F_1 = Bytes.toBytes("f1");
-	private static final byte[] F_2 = Bytes.toBytes("f2");
-	private static final byte[] Q_1 = Bytes.toBytes("q1");
-	private static final byte[] Q_2 = Bytes.toBytes("q2");
-	private static final byte[] Q_3 = Bytes.toBytes("q3");
-
-	@BeforeClass
-	public static void activateHBaseCluster(){
-		registerHBaseMiniClusterInClasspath();
-	}
-
-	@Test
-	public void testHBaseTableSourceWithSingleColumnFamily() throws Exception {
-		// create a table with single region
-		TableName tableName = TableName.valueOf("test");
-		// no split keys
-		byte[][] famNames = new byte[1][];
-		famNames[0] = F_1;
-		createTable(tableName, famNames, null);
-		// get the htable instance
-		HTable table = openTable(tableName);
-		List<Put> puts = new ArrayList<Put>();
-		// add some data
-		Put put = new Put(ROW_1);
-		// add 3 qualifiers per row
-		//1st qual is integer
-		put.addColumn(F_1, Q_1, Bytes.toBytes(100));
-		//2nd qual is String
-		put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue"));
-		// 3rd qual is long
-		put.addColumn(F_1, Q_3, Bytes.toBytes(19991L));
-		puts.add(put);
-
-		put = new Put(ROW_2);
-		// add 3 qualifiers per row
-		//1st qual is integer
-		put.addColumn(F_1, Q_1, Bytes.toBytes(101));
-		//2nd qual is String
-		put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue1"));
-		// 3rd qual is long
-		put.addColumn(F_1, Q_3, Bytes.toBytes(19992L));
-		puts.add(put);
-
-		put = new Put(ROW_3);
-		// add 3 qualifiers per row
-		//1st qual is integer
-		put.addColumn(F_1, Q_1, Bytes.toBytes(102));
-		//2nd qual is String
-		put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue2"));
-		// 3rd qual is long
-		put.addColumn(F_1, Q_3, Bytes.toBytes(19993L));
-		puts.add(put);
-		// add the mutations to the table
-		table.put(puts);
-		table.close();
-		// preparetion is done
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, new TableConfig());
-		// fetch row2 from the table till the end
-		HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), tableName.getNameAsString());
-		hbaseTable.addColumn(Bytes.toString(F_1), Bytes.toString(Q_1), Integer.class);
-		hbaseTable.addColumn(Bytes.toString(F_1), Bytes.toString(Q_2), String.class);
-		hbaseTable.addColumn(Bytes.toString(F_1), Bytes.toString(Q_3), Long.class);
-		tableEnv.registerTableSource("test", hbaseTable);
-		Table result = tableEnv
-			.sql("SELECT test.f1.q1, test.f1.q2, test.f1.q3 FROM test");
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = resultSet.collect();
-
-		String expected = "100,strvalue,19991\n" +
-			"101,strvalue1,19992\n" +
-			"102,strvalue2,19993\n";
-		TestBaseUtils.compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testHBaseTableSourceWithTwoColumnFamily() throws Exception {
-		// create a table with single region
-		TableName tableName = TableName.valueOf("test1");
-		// no split keys
-		byte[][] famNames = new byte[2][];
-		famNames[0] = F_1;
-		famNames[1] = F_2;
-		createTable(tableName, famNames, null);
-		// get the htable instance
-		HTable table = openTable(tableName);
-		List<Put> puts = new ArrayList<Put>();
-		// add some data
-		Put put = new Put(ROW_1);
-		// add 3 qualifiers per row
-		//1st qual is integer
-		put.addColumn(F_1, Q_1, Bytes.toBytes(100));
-		//2nd qual is String
-		put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue"));
-		// 3rd qual is long
-		put.addColumn(F_1, Q_3, Bytes.toBytes(19991L));
-		puts.add(put);
-
-		put = new Put(ROW_2);
-		// add 3 qualifiers per row
-		//1st qual is integer
-		put.addColumn(F_2, Q_1, Bytes.toBytes(201));
-		//2nd qual is String
-		put.addColumn(F_2, Q_2, Bytes.toBytes("newvalue1"));
-		// 3rd qual is long
-		put.addColumn(F_2, Q_3, Bytes.toBytes(29992L));
-		puts.add(put);
-
-		put = new Put(ROW_3);
-		// add 3 qualifiers per row
-		//1st qual is integer
-		put.addColumn(F_1, Q_1, Bytes.toBytes(102));
-		//2nd qual is String
-		put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue2"));
-		// 3rd qual is long
-		put.addColumn(F_1, Q_3, Bytes.toBytes(19993L));
-		puts.add(put);
-		// add the mutations to the table
-		table.put(puts);
-		table.close();
-		// preparation is done
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, new TableConfig());
-		// fetch row2 from the table till the end
-		HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), tableName.getNameAsString());
-		hbaseTable.addColumn(Bytes.toString(F_1), Bytes.toString(Q_1), Integer.class);
-		hbaseTable.addColumn(Bytes.toString(F_1), Bytes.toString(Q_2), String.class);
-		hbaseTable.addColumn(Bytes.toString(F_1), Bytes.toString(Q_3), Long.class);
-		hbaseTable.addColumn(Bytes.toString(F_2), Bytes.toString(Q_1), Integer.class);
-		hbaseTable.addColumn(Bytes.toString(F_2), Bytes.toString(Q_2), String.class);
-		hbaseTable.addColumn(Bytes.toString(F_2), Bytes.toString(Q_3), Long.class);
-		tableEnv.registerTableSource("test1", hbaseTable);
-		Table result = tableEnv
-			.sql("SELECT test1.f1.q1, test1.f1.q2, test1.f1.q3, test1.f2.q1, test1.f2.q2, test1.f2.q3 FROM test1");
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = resultSet.collect();
-
-		String expected = "100,strvalue,19991,null,null,null\n" +
-			"null,null,null,201,newvalue1,29992\n" +
-			"102,strvalue2,19993,null,null,null\n";
-		TestBaseUtils.compareResultAsText(results, expected);
-	}
-}