You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/10/04 12:45:32 UTC
[2/2] flink git commit: [FLINK-4311] [hbase-connector] Fixed
TableInputFormat to process multiple input splits.
[FLINK-4311] [hbase-connector] Fixed TableInputFormat to process multiple input splits.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/98b399d4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/98b399d4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/98b399d4
Branch: refs/heads/release-1.1
Commit: 98b399d4b4ddc9ab5d01e40dcb9ab0889f0d1067
Parents: 6f33361
Author: Niels Basjes <nb...@bol.com>
Authored: Tue Oct 4 12:13:12 2016 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Oct 4 14:43:53 2016 +0200
----------------------------------------------------------------------
.../flink/addons/hbase/TableInputFormat.java | 176 ++++++++++++-------
1 file changed, 108 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/98b399d4/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java b/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
index 6ba6217..b847fe8 100644
--- a/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
+++ b/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
@@ -17,13 +17,9 @@
*/
package org.apache.flink.addons.hbase;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
@@ -38,11 +34,14 @@ import org.apache.hadoop.hbase.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
/**
* {@link InputFormat} subclass that wraps the access for HTables.
- *
*/
-public abstract class TableInputFormat<T extends Tuple> extends RichInputFormat<T, TableInputSplit>{
+public abstract class TableInputFormat<T extends Tuple> extends RichInputFormat<T, TableInputSplit> {
private static final long serialVersionUID = 1L;
@@ -51,34 +50,56 @@ public abstract class TableInputFormat<T extends Tuple> extends RichInputFormat<
/** helper variable to decide whether the input is exhausted or not */
private boolean endReached = false;
- // TODO table and scan could be serialized when kryo serializer will be the default
- protected transient HTable table;
- protected transient Scan scan;
+ protected transient HTable table = null;
+ protected transient Scan scan = null;
/** HBase iterator wrapper */
- private ResultScanner rs;
+ private ResultScanner resultScanner = null;
private byte[] lastRow;
private int scannedRows;
- // abstract methods allow for multiple table and scanners in the same job
+ /**
+ * Returns an instance of Scan that retrieves the required subset of records from the HBase table.
+ * @return The appropriate instance of Scan for this usecase.
+ */
protected abstract Scan getScanner();
+
+ /**
+ * What table is to be read.
+ * Per instance of a TableInputFormat derivative only a single tablename is possible.
+ * @return The name of the table
+ */
protected abstract String getTableName();
+
+ /**
+ * The output from HBase is always an instance of {@link Result}.
+ * This method is to copy the data in the Result instance into the required {@link Tuple}
+ * @param r The Result instance from HBase that needs to be converted
+ * @return The approriate instance of {@link Tuple} that contains the needed information.
+ */
protected abstract T mapResultToTuple(Result r);
/**
- * creates a {@link Scan} object and a {@link HTable} connection
+ * Creates a {@link Scan} object and opens the {@link HTable} connection.
+ * These are opened here because they are needed in the createInputSplits
+ * which is called before the openInputFormat method.
+ * So the connection is opened in {@link #configure(Configuration)} and closed in {@link #closeInputFormat()}.
*
- * @param parameters
+ * @param parameters The configuration that is to be used
* @see Configuration
*/
@Override
public void configure(Configuration parameters) {
- this.table = createTable();
- this.scan = getScanner();
+ table = createTable();
+ if (table != null) {
+ scan = getScanner();
+ }
}
- /** Create an {@link HTable} instance and set it into this format */
+ /**
+ * Create an {@link HTable} instance and set it into this format
+ */
private HTable createTable() {
LOG.info("Initializing HBaseConfiguration");
//use files found in the classpath
@@ -93,32 +114,51 @@ public abstract class TableInputFormat<T extends Tuple> extends RichInputFormat<
}
@Override
+ public void open(TableInputSplit split) throws IOException {
+ if (table == null) {
+ throw new IOException("The HBase table has not been opened!");
+ }
+ if (scan == null) {
+ throw new IOException("getScanner returned null");
+ }
+ if (split == null) {
+ throw new IOException("Input split is null!");
+ }
+
+ logSplitInfo("opening", split);
+ scan.setStartRow(split.getStartRow());
+ lastRow = split.getEndRow();
+ scan.setStopRow(lastRow);
+
+ resultScanner = table.getScanner(scan);
+ endReached = false;
+ scannedRows = 0;
+ }
+
+ @Override
public boolean reachedEnd() throws IOException {
- return this.endReached;
+ return endReached;
}
@Override
public T nextRecord(T reuse) throws IOException {
- if (this.rs == null){
+ if (resultScanner == null) {
throw new IOException("No table result scanner provided!");
}
- try{
- Result res = this.rs.next();
- if (res != null){
+ try {
+ Result res = resultScanner.next();
+ if (res != null) {
scannedRows++;
lastRow = res.getRow();
return mapResultToTuple(res);
}
- }catch (Exception e) {
- this.rs.close();
+ } catch (Exception e) {
+ resultScanner.close();
//workaround for timeout on scan
- StringBuffer logMsg = new StringBuffer("Error after scan of ")
- .append(scannedRows)
- .append(" rows. Retry with a new scanner...");
- LOG.warn(logMsg.toString(), e);
- this.scan.setStartRow(lastRow);
- this.rs = table.getScanner(scan);
- Result res = this.rs.next();
+ LOG.warn("Error after scan of " + scannedRows + " rows. Retry with a new scanner...", e);
+ scan.setStartRow(lastRow);
+ resultScanner = table.getScanner(scan);
+ Result res = resultScanner.next();
if (res != null) {
scannedRows++;
lastRow = res.getRow();
@@ -126,46 +166,48 @@ public abstract class TableInputFormat<T extends Tuple> extends RichInputFormat<
}
}
- this.endReached = true;
+ endReached = true;
return null;
}
@Override
- public void open(TableInputSplit split) throws IOException {
- if (split == null){
- throw new IOException("Input split is null!");
- }
- if (table == null){
- throw new IOException("No HTable provided!");
- }
- if (scan == null){
- throw new IOException("No Scan instance provided");
+ public void close() throws IOException {
+ LOG.info("Closing split (scanned {} rows)", scannedRows);
+ lastRow = null;
+ try {
+ if (resultScanner != null) {
+ resultScanner.close();
+ }
+ } finally {
+ resultScanner = null;
}
-
- logSplitInfo("opening", split);
- scan.setStartRow(split.getStartRow());
- lastRow = split.getEndRow();
- scan.setStopRow(lastRow);
-
- this.rs = table.getScanner(scan);
- this.endReached = false;
- this.scannedRows = 0;
}
@Override
- public void close() throws IOException {
- if(rs!=null){
- this.rs.close();
- }
- if(table!=null){
- this.table.close();
+ public void closeInputFormat() {
+ try {
+ if (table != null) {
+ table.close();
+ }
+ } catch (IOException ioe) {
+ // Cannot rethrow IOException due to limitations of RichInputFormat.closeInputFormat()
+ // method. This should be fixed in the next minor version of Flink (1.2.x).
+ // Wrap exception in RuntimeException as a temporary fix for Flink 1.1.x.
+ throw new RuntimeException(ioe);
+ } finally {
+ table = null;
}
- LOG.info("Closing split (scanned {} rows)", scannedRows);
- this.lastRow = null;
}
@Override
public TableInputSplit[] createInputSplits(final int minNumSplits) throws IOException {
+ if (table == null) {
+ throw new IOException("The HBase table has not been opened!");
+ }
+ if (scan == null) {
+ throw new IOException("getScanner returned null");
+ }
+
//Gets the starting and ending row keys for every region in the currently open table
final Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
@@ -186,16 +228,16 @@ public abstract class TableInputFormat<T extends Tuple> extends RichInputFormat<
continue;
}
//Finds the region on which the given row is being served
- final String[] hosts = new String[] { regionLocation };
+ final String[] hosts = new String[]{regionLocation};
// determine if regions contains keys used by the scan
boolean isLastRegion = endKey.length == 0;
if ((scanWithNoLowerBound || isLastRegion || Bytes.compareTo(startRow, endKey) < 0) &&
- (scanWithNoUpperBound || Bytes.compareTo(stopRow, startKey) > 0)) {
+ (scanWithNoUpperBound || Bytes.compareTo(stopRow, startKey) > 0)) {
final byte[] splitStart = scanWithNoLowerBound || Bytes.compareTo(startKey, startRow) >= 0 ? startKey : startRow;
final byte[] splitStop = (scanWithNoUpperBound || Bytes.compareTo(endKey, stopRow) <= 0)
- && !isLastRegion ? endKey : stopRow;
+ && !isLastRegion ? endKey : stopRow;
int id = splits.size();
final TableInputSplit split = new TableInputSplit(id, hosts, table.getTableName(), splitStart, splitStop);
splits.add(split);
@@ -215,7 +257,7 @@ public abstract class TableInputFormat<T extends Tuple> extends RichInputFormat<
String splitStartKey = splitStart.isEmpty() ? "-" : splitStart;
String splitStopKey = splitEnd.isEmpty() ? "-" : splitEnd;
String[] hostnames = split.getHostnames();
- LOG.info("{} split [{}|{}|{}|{}]",action, splitId, hostnames, splitStartKey, splitStopKey);
+ LOG.info("{} split (this={})[{}|{}|{}|{}]", action, this, splitId, hostnames, splitStartKey, splitStopKey);
}
/**
@@ -231,13 +273,11 @@ public abstract class TableInputFormat<T extends Tuple> extends RichInputFormat<
* Override this method, if you want to bulk exclude regions altogether from M-R. By default, no region is excluded(
* i.e. all regions are included).
*
- * @param startKey
- * Start key of the region
- * @param endKey
- * End key of the region
+ * @param startKey Start key of the region
+ * @param endKey End key of the region
* @return true, if this region needs to be included as part of the input (default).
*/
- private static boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) {
+ protected boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) {
return true;
}
@@ -251,4 +291,4 @@ public abstract class TableInputFormat<T extends Tuple> extends RichInputFormat<
return null;
}
-}
\ No newline at end of file
+}