You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/10/04 12:45:32 UTC

[2/2] flink git commit: [FLINK-4311] [hbase-connector] Fixed TableInputFormat to process multiple input splits.

[FLINK-4311] [hbase-connector] Fixed TableInputFormat to process multiple input splits.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/98b399d4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/98b399d4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/98b399d4

Branch: refs/heads/release-1.1
Commit: 98b399d4b4ddc9ab5d01e40dcb9ab0889f0d1067
Parents: 6f33361
Author: Niels Basjes <nb...@bol.com>
Authored: Tue Oct 4 12:13:12 2016 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Oct 4 14:43:53 2016 +0200

----------------------------------------------------------------------
 .../flink/addons/hbase/TableInputFormat.java    | 176 ++++++++++++-------
 1 file changed, 108 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/98b399d4/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java b/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
index 6ba6217..b847fe8 100644
--- a/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
+++ b/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
@@ -17,13 +17,9 @@
  */
 package org.apache.flink.addons.hbase;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.RichInputFormat;
 import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.RichInputFormat;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.configuration.Configuration;
@@ -38,11 +34,14 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * {@link InputFormat} subclass that wraps the access for HTables.
- *
  */
-public abstract class TableInputFormat<T extends Tuple> extends RichInputFormat<T, TableInputSplit>{
+public abstract class TableInputFormat<T extends Tuple> extends RichInputFormat<T, TableInputSplit> {
 
 	private static final long serialVersionUID = 1L;
 
@@ -51,34 +50,56 @@ public abstract class TableInputFormat<T extends Tuple> extends RichInputFormat<
 	/** helper variable to decide whether the input is exhausted or not */
 	private boolean endReached = false;
 
-	// TODO table and scan could be serialized when kryo serializer will be the default
-	protected transient HTable table;
-	protected transient Scan scan;
+	protected transient HTable table = null;
+	protected transient Scan scan = null;
 
 	/** HBase iterator wrapper */
-	private ResultScanner rs;
+	private ResultScanner resultScanner = null;
 
 	private byte[] lastRow;
 	private int scannedRows;
 
-	// abstract methods allow for multiple table and scanners in the same job
+	/**
+	 * Returns an instance of Scan that retrieves the required subset of records from the HBase table.
+	 * @return The appropriate instance of Scan for this usecase.
+	 */
 	protected abstract Scan getScanner();
+
+	/**
+	 * What table is to be read.
+	 * Per instance of a TableInputFormat derivative only a single tablename is possible.
+	 * @return The name of the table
+	 */
 	protected abstract String getTableName();
+
+	/**
+	 * The output from HBase is always an instance of {@link Result}.
+	 * This method is to copy the data in the Result instance into the required {@link Tuple}
+	 * @param r The Result instance from HBase that needs to be converted
+	 * @return The approriate instance of {@link Tuple} that contains the needed information.
+	 */
 	protected abstract T mapResultToTuple(Result r);
 
 	/**
-	 * creates a {@link Scan} object and a {@link HTable} connection
+	 * Creates a {@link Scan} object and opens the {@link HTable} connection.
+	 * These are opened here because they are needed in the createInputSplits
+	 * which is called before the openInputFormat method.
+	 * So the connection is opened in {@link #configure(Configuration)} and closed in {@link #closeInputFormat()}.
 	 *
-	 * @param parameters
+	 * @param parameters The configuration that is to be used
 	 * @see Configuration
 	 */
 	@Override
 	public void configure(Configuration parameters) {
-		this.table = createTable();
-		this.scan = getScanner();
+		table = createTable();
+		if (table != null) {
+			scan = getScanner();
+		}
 	}
 
-	/** Create an {@link HTable} instance and set it into this format */
+	/**
+	 * Create an {@link HTable} instance and set it into this format
+	 */
 	private HTable createTable() {
 		LOG.info("Initializing HBaseConfiguration");
 		//use files found in the classpath
@@ -93,32 +114,51 @@ public abstract class TableInputFormat<T extends Tuple> extends RichInputFormat<
 	}
 
 	@Override
+	public void open(TableInputSplit split) throws IOException {
+		if (table == null) {
+			throw new IOException("The HBase table has not been opened!");
+		}
+		if (scan == null) {
+			throw new IOException("getScanner returned null");
+		}
+		if (split == null) {
+			throw new IOException("Input split is null!");
+		}
+
+		logSplitInfo("opening", split);
+		scan.setStartRow(split.getStartRow());
+		lastRow = split.getEndRow();
+		scan.setStopRow(lastRow);
+
+		resultScanner = table.getScanner(scan);
+		endReached = false;
+		scannedRows = 0;
+	}
+
+	@Override
 	public boolean reachedEnd() throws IOException {
-		return this.endReached;
+		return endReached;
 	}
 
 	@Override
 	public T nextRecord(T reuse) throws IOException {
-		if (this.rs == null){
+		if (resultScanner == null) {
 			throw new IOException("No table result scanner provided!");
 		}
-		try{
-			Result res = this.rs.next();
-			if (res != null){
+		try {
+			Result res = resultScanner.next();
+			if (res != null) {
 				scannedRows++;
 				lastRow = res.getRow();
 				return mapResultToTuple(res);
 			}
-		}catch (Exception e) {
-			this.rs.close();
+		} catch (Exception e) {
+			resultScanner.close();
 			//workaround for timeout on scan
-			StringBuffer logMsg = new StringBuffer("Error after scan of ")
-					.append(scannedRows)
-					.append(" rows. Retry with a new scanner...");
-			LOG.warn(logMsg.toString(), e);
-			this.scan.setStartRow(lastRow);
-			this.rs = table.getScanner(scan);
-			Result res = this.rs.next();
+			LOG.warn("Error after scan of " + scannedRows + " rows. Retry with a new scanner...", e);
+			scan.setStartRow(lastRow);
+			resultScanner = table.getScanner(scan);
+			Result res = resultScanner.next();
 			if (res != null) {
 				scannedRows++;
 				lastRow = res.getRow();
@@ -126,46 +166,48 @@ public abstract class TableInputFormat<T extends Tuple> extends RichInputFormat<
 			}
 		}
 
-		this.endReached = true;
+		endReached = true;
 		return null;
 	}
 
 	@Override
-	public void open(TableInputSplit split) throws IOException {
-		if (split == null){
-			throw new IOException("Input split is null!");
-		}
-		if (table == null){
-			throw new IOException("No HTable provided!");
-		}
-		if (scan == null){
-			throw new IOException("No Scan instance provided");
+	public void close() throws IOException {
+		LOG.info("Closing split (scanned {} rows)", scannedRows);
+		lastRow = null;
+		try {
+			if (resultScanner != null) {
+				resultScanner.close();
+			}
+		} finally {
+			resultScanner = null;
 		}
-
-		logSplitInfo("opening", split);
-		scan.setStartRow(split.getStartRow());
-		lastRow = split.getEndRow();
-		scan.setStopRow(lastRow);
-
-		this.rs = table.getScanner(scan);
-		this.endReached = false;
-		this.scannedRows = 0;
 	}
 
 	@Override
-	public void close() throws IOException {
-		if(rs!=null){
-			this.rs.close();
-		}
-		if(table!=null){
-			this.table.close();
+	public void closeInputFormat() {
+		try {
+			if (table != null) {
+				table.close();
+			}
+		} catch (IOException ioe) {
+			// Cannot rethrow IOException due to limitations of RichInputFormat.closeInputFormat()
+			//   method. This should be fixed in the next minor version of Flink (1.2.x).
+			// Wrap exception in RuntimeException as a temporary fix for Flink 1.1.x.
+			throw new RuntimeException(ioe);
+		} finally {
+			table = null;
 		}
-		LOG.info("Closing split (scanned {} rows)", scannedRows);
-		this.lastRow = null;
 	}
 
 	@Override
 	public TableInputSplit[] createInputSplits(final int minNumSplits) throws IOException {
+		if (table == null) {
+			throw new IOException("The HBase table has not been opened!");
+		}
+		if (scan == null) {
+			throw new IOException("getScanner returned null");
+		}
+
 		//Gets the starting and ending row keys for every region in the currently open table
 		final Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
 		if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
@@ -186,16 +228,16 @@ public abstract class TableInputFormat<T extends Tuple> extends RichInputFormat<
 				continue;
 			}
 			//Finds the region on which the given row is being served
-			final String[] hosts = new String[] { regionLocation };
+			final String[] hosts = new String[]{regionLocation};
 
 			// determine if regions contains keys used by the scan
 			boolean isLastRegion = endKey.length == 0;
 			if ((scanWithNoLowerBound || isLastRegion || Bytes.compareTo(startRow, endKey) < 0) &&
-					(scanWithNoUpperBound || Bytes.compareTo(stopRow, startKey) > 0)) {
+				(scanWithNoUpperBound || Bytes.compareTo(stopRow, startKey) > 0)) {
 
 				final byte[] splitStart = scanWithNoLowerBound || Bytes.compareTo(startKey, startRow) >= 0 ? startKey : startRow;
 				final byte[] splitStop = (scanWithNoUpperBound || Bytes.compareTo(endKey, stopRow) <= 0)
-						&& !isLastRegion ? endKey : stopRow;
+					&& !isLastRegion ? endKey : stopRow;
 				int id = splits.size();
 				final TableInputSplit split = new TableInputSplit(id, hosts, table.getTableName(), splitStart, splitStop);
 				splits.add(split);
@@ -215,7 +257,7 @@ public abstract class TableInputFormat<T extends Tuple> extends RichInputFormat<
 		String splitStartKey = splitStart.isEmpty() ? "-" : splitStart;
 		String splitStopKey = splitEnd.isEmpty() ? "-" : splitEnd;
 		String[] hostnames = split.getHostnames();
-		LOG.info("{} split [{}|{}|{}|{}]",action, splitId, hostnames, splitStartKey, splitStopKey);
+		LOG.info("{} split (this={})[{}|{}|{}|{}]", action, this, splitId, hostnames, splitStartKey, splitStopKey);
 	}
 
 	/**
@@ -231,13 +273,11 @@ public abstract class TableInputFormat<T extends Tuple> extends RichInputFormat<
 	 * Override this method, if you want to bulk exclude regions altogether from M-R. By default, no region is excluded(
 	 * i.e. all regions are included).
 	 *
-	 * @param startKey
-	 *        Start key of the region
-	 * @param endKey
-	 *        End key of the region
+	 * @param startKey Start key of the region
+	 * @param endKey   End key of the region
 	 * @return true, if this region needs to be included as part of the input (default).
 	 */
-	private static boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) {
+	protected boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) {
 		return true;
 	}
 
@@ -251,4 +291,4 @@ public abstract class TableInputFormat<T extends Tuple> extends RichInputFormat<
 		return null;
 	}
 
-}
\ No newline at end of file
+}