You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/27 18:55:57 UTC

[4/5] incubator-flink git commit: Upgraded HBase addon to HBase 0.98.x and new Tuple APIs

Upgraded HBase addon to HBase 0.98.x and new Tuple APIs


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/a1100af4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/a1100af4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/a1100af4

Branch: refs/heads/master
Commit: a1100af4247f77632f07ee9cea9fc7452104fac6
Parents: d554faa
Author: fpompermaier <fp...@github.com>
Authored: Thu Nov 20 00:08:02 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 27 18:18:26 2014 +0100

----------------------------------------------------------------------
 flink-addons/flink-hbase/pom.xml                | 162 ++++----
 .../addons/hbase/GenericTableOutputFormat.java  | 116 ------
 .../flink/addons/hbase/HBaseDataSink.java       |  47 ---
 .../flink/addons/hbase/TableInputFormat.java    | 378 +++++--------------
 .../flink/addons/hbase/common/HBaseKey.java     |  87 -----
 .../flink/addons/hbase/common/HBaseResult.java  |  69 ----
 .../flink/addons/hbase/common/HBaseUtil.java    |  68 ----
 .../addons/hbase/example/HBaseReadExample.java  | 129 -------
 .../addons/hbase/example/HBaseReadExample.java  |  93 +++++
 .../src/test/resources/hbase-site.xml           |  43 +++
 .../src/test/resources/log4j.properties         |   6 +
 flink-addons/pom.xml                            |   2 +-
 .../runtime/execution/RuntimeEnvironment.java   |   9 +-
 13 files changed, 336 insertions(+), 873 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a1100af4/flink-addons/flink-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/pom.xml b/flink-addons/flink-hbase/pom.xml
index 9a400a7..e0c40d7 100644
--- a/flink-addons/flink-hbase/pom.xml
+++ b/flink-addons/flink-hbase/pom.xml
@@ -28,108 +28,124 @@ under the License.
 		<version>0.8-incubating-SNAPSHOT</version>
 		<relativePath>..</relativePath>
 	</parent>
-	
-	<repositories>
-		<repository>
-			<id>cloudera-releases</id>
-			<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
-			<releases>
-				<enabled>true</enabled>
-			</releases>
-			<snapshots>
-				<enabled>false</enabled>
-			</snapshots>
-		</repository>
-	</repositories>
-
-	<properties>
- 		<hbase.version>0.96.0-hadoop2</hbase.version>
-	</properties>
 
 	<artifactId>flink-hbase</artifactId>
 	<name>flink-hbase</name>
 	<packaging>jar</packaging>
 
+	<properties>
+		<hbase.hadoop1.version>0.98.6.1-hadoop1</hbase.hadoop1.version>
+		<hbase.hadoop2.version>0.98.6.1-hadoop2</hbase.hadoop2.version>
+	</properties>
+
 	<dependencies>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-core</artifactId>
 			<version>${project.version}</version>
 		</dependency>
-
+		
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-java</artifactId>
 			<version>${project.version}</version>
 		</dependency>
-
+		
 		<dependency>
-			<groupId>org.apache.hbase</groupId>
-			<artifactId>hbase</artifactId>
-			<version>0.94.2-cdh4.2.1</version>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-clients</artifactId>
+			<version>${project.version}</version>
 			<exclusions>
-				<!-- jruby is used for the hbase shell. -->
 				<exclusion>
-					<groupId>org.jruby</groupId>
-					<artifactId>jruby-complete</artifactId>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-core</artifactId>
 				</exclusion>
 			</exclusions>
+			<scope>test</scope>
 		</dependency>
-
+		
 		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-client</artifactId>
-			<version>${hadoop.version}</version>
-			<exclusions>
-				<exclusion>
-					<groupId>asm</groupId>
-					<artifactId>asm</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>tomcat</groupId>
-					<artifactId>jasper-compiler</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>tomcat</groupId>
-					<artifactId>jasper-runtime</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.mortbay.jetty</groupId>
-					<artifactId>jetty</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.mortbay.jetty</groupId>
-					<artifactId>jsp-api-2.1</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.mortbay.jetty</groupId>
-					<artifactId>jsp-2.1</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.mortbay.jetty</groupId>
-					<artifactId>jetty-util</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.eclipse.jdt</groupId>
-					<artifactId>core</artifactId>
-				</exclusion>
-			</exclusions>
-		</dependency>
-	</dependencies>
-		<!-- <dependency>
-			<groupId>org.apache.hbase</groupId>
-			<artifactId>hbase-server</artifactId>
-			<version>${hbase.version}</version>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-hadoop-compatibility</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
 		</dependency>
+		
 		<dependency>
 			<groupId>org.apache.hbase</groupId>
 			<artifactId>hbase-client</artifactId>
 			<version>${hbase.version}</version>
 		</dependency>
-		 -->
+	</dependencies>
+
+	<profiles>
+		<profile>
+			<id>hadoop-1</id>
+			<activation>
+				<property>
+					<!-- Please do not remove the 'hadoop1' comment. See ./tools/generate_specific_pom.sh -->
+					<!--hadoop1--><name>hadoop.profile</name><value>1</value>
+				</property>
+			</activation>
+			<properties>
+				<hbase.version>${hbase.hadoop1.version}</hbase.version>
+			</properties>
+		</profile>
+		
+		<profile>
+			<id>hadoop-2</id>
+			<activation>
+				<property>
+					<!-- Please do not remove the 'hadoop2' comment. See ./tools/generate_specific_pom.sh -->
+					<!--hadoop2--><name>!hadoop.profile</name>
+				</property>
+			</activation>
+			<properties>
+				<hbase.version>${hbase.hadoop2.version}</hbase.version>
+			</properties>
+			<dependencies>
+				<!-- Force hadoop-common dependency -->
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-common</artifactId>
+				</dependency>
+			</dependencies>
+		</profile>
+		
+		<profile>
+			<id>cdh5.1.3</id>
+			<properties>
+				<hadoop.profile>2</hadoop.profile>
+				<hbase.version>0.98.1-cdh5.1.3</hbase.version>
+				<hadoop.version>2.3.0-cdh5.1.3</hadoop.version>
+				<!-- Cloudera use different versions for hadoop core and commons-->
+				<!-- This profile could be removed if Cloudera fix this mismatch! -->
+				<hadoop.core.version>2.3.0-mr1-cdh5.1.3</hadoop.core.version>
+			</properties>
+			<dependencyManagement>
+				<dependencies>
+					<dependency>
+						<groupId>org.apache.hadoop</groupId>
+						<artifactId>hadoop-core</artifactId>
+						<version>${hadoop.core.version}</version>
+					</dependency>
+				</dependencies>
+			</dependencyManagement>
+			<dependencies>
+				<!-- Force hadoop-common dependency -->
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-common</artifactId>
+					<version>${hadoop.version}</version>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-mapreduce-client-core</artifactId>
+					<version>${hadoop.version}</version>
+				</dependency>
+			</dependencies>
+		</profile>
 
-	<!-- hadoop-client is available for yarn and non-yarn, so there is no need 
-		to use profiles See ticket https://issues.apache.org/jira/browse/HADOOP-8009 
-		for description of hadoop-clients -->
+	</profiles>
 
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a1100af4/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/GenericTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/GenericTableOutputFormat.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/GenericTableOutputFormat.java
deleted file mode 100644
index 49fcae3..0000000
--- a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/GenericTableOutputFormat.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.addons.hbase;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.Record;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskType;
-
-public abstract class GenericTableOutputFormat implements OutputFormat<Record> {
-
-	private static final long serialVersionUID = 1L;
-
-	public static final String JT_ID_KEY = "pact.hbase.jtkey";
-
-	public static final String JOB_ID_KEY = "pact.job.id";
-
-	private RecordWriter<ImmutableBytesWritable, KeyValue> writer;
-
-	private Configuration config;
-
-	private org.apache.hadoop.conf.Configuration hadoopConfig;
-
-	private TaskAttemptContext context;
-
-	private String jtID;
-
-	private int jobId;
-
-
-	@Override
-	public void configure(Configuration parameters) {
-		this.config = parameters;
-
-		// get the ID parameters
-		this.jtID = parameters.getString(JT_ID_KEY, null);
-		if (this.jtID == null) {
-			throw new RuntimeException("Missing JT_ID entry in hbase config.");
-		}
-		this.jobId = parameters.getInteger(JOB_ID_KEY, -1);
-		if (this.jobId < 0) {
-			throw new RuntimeException("Missing or invalid job id in input config.");
-		}
-	}
-
-	@Override
-	public void open(int taskNumber, int numTasks) throws IOException {
-		this.hadoopConfig = getHadoopConfig(this.config);
-		
-		/**
-		 * PLASE NOTE:
-		 * If you are a Eclipse+Maven Integration user and you have two (or more) warnings here, please
-		 * close the pact-hbase project OR set the maven profile to hadoop_yarn
-		 * 
-		 * pact-hbase requires hadoop_yarn, but Eclipse is not able to parse maven profiles properly. Therefore,
-		 * it imports the pact-hbase project even if it is not included in the standard profile (hadoop_v1)
-		 */
-		final TaskAttemptID attemptId = new TaskAttemptID(this.jtID, this.jobId, TaskType.MAP, taskNumber - 1, 0);
-
-		this.context = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(this.hadoopConfig, attemptId);
-		final HFileOutputFormat outFormat = new HFileOutputFormat();
-		try {
-			this.writer = outFormat.getRecordWriter(this.context);
-		} catch (InterruptedException iex) {
-			throw new IOException("Opening the writer was interrupted.", iex);
-		}
-	}
-
-	@Override
-	public void close() throws IOException {
-		final RecordWriter<ImmutableBytesWritable, KeyValue> writer = this.writer;
-		this.writer = null;
-		if (writer != null) {
-			try {
-				writer.close(this.context);
-			} catch (InterruptedException iex) {
-				throw new IOException("Closing was interrupted.", iex);
-			}
-		}
-	}
-
-	public void collectKeyValue(KeyValue kv) throws IOException {
-		try {
-			this.writer.write(null, kv);
-		} catch (InterruptedException iex) {
-			throw new IOException("Write request was interrupted.", iex);
-		}
-	}
-
-	public abstract org.apache.hadoop.conf.Configuration getHadoopConfig(Configuration config);
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a1100af4/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseDataSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseDataSink.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseDataSink.java
deleted file mode 100644
index fc0f226..0000000
--- a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseDataSink.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.addons.hbase;
-
-import java.util.Random;
-
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.java.record.operators.GenericDataSink;
-
-/**
- * A sink for writing to HBase
- */
-public class HBaseDataSink extends GenericDataSink {
-	
-	private static final int IDENTIFYIER_LEN = 16;
-	
-	public HBaseDataSink(GenericTableOutputFormat f, Operator input, String name) {
-		super(f, input, name);
-		
-		// generate a random unique identifier string
-		final Random rnd = new Random();
-		final StringBuilder bld = new StringBuilder();
-		for (int i = 0; i < IDENTIFYIER_LEN; i++) {
-			bld.append((char) (rnd.nextInt(26) + 'a'));
-		}
-		
-		setParameter(GenericTableOutputFormat.JT_ID_KEY, bld.toString());
-		setParameter(GenericTableOutputFormat.JOB_ID_KEY, rnd.nextInt());
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a1100af4/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
old mode 100644
new mode 100755
index d2b5d04..9e0dad4
--- a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
+++ b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
@@ -16,189 +16,73 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.addons.hbase;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.addons.hbase.common.HBaseKey;
-import org.apache.flink.addons.hbase.common.HBaseResult;
-import org.apache.flink.addons.hbase.common.HBaseUtil;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.OperatingSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.TableRecordReader;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * {@link InputFormat} subclass that wraps the access for HTables.
  */
-public class TableInputFormat implements InputFormat<Record, TableInputSplit> {
+public abstract class TableInputFormat<T extends Tuple> implements InputFormat<T, TableInputSplit>{
 
 	private static final long serialVersionUID = 1L;
 
 	private static final Logger LOG = LoggerFactory.getLogger(TableInputFormat.class);
 
-	/** A handle on an HBase table */
-	private HTable table;
-
-	/** The scanner that performs the actual access on the table. HBase object */
-	private Scan scan;
-
-	/** Hbase' iterator wrapper */
-	private TableRecordReader tableRecordReader;
-
 	/** helper variable to decide whether the input is exhausted or not */
 	private boolean endReached = false;
+	
+	// TODO table and scan could be serialized when kryo serializer will be the default
+	protected transient HTable table;
+	protected transient Scan scan;
+	
+	/** HBase iterator wrapper */
+	private ResultScanner rs;
 
-	/** Job parameter that specifies the input table. */
-	public static final String INPUT_TABLE = "hbase.inputtable";
-
-	/** Location of the hbase-site.xml. If set, the HBaseAdmin will build inside */
-	public static final String CONFIG_LOCATION = "hbase.config.location";
-
-	/**
-	 * Base-64 encoded scanner. All other SCAN_ confs are ignored if this is specified.
-	 * See TableMapReduceUtil.convertScanToString(Scan) for more details.
-	 */
-	public static final String SCAN = "hbase.scan";
-
-	/** Column Family to Scan */
-	public static final String SCAN_COLUMN_FAMILY = "hbase.scan.column.family";
-
-	/** Space delimited list of columns to scan. */
-	public static final String SCAN_COLUMNS = "hbase.scan.columns";
-
-	/** The timestamp used to filter columns with a specific timestamp. */
-	public static final String SCAN_TIMESTAMP = "hbase.scan.timestamp";
-
-	/** The starting timestamp used to filter columns with a specific range of versions. */
-	public static final String SCAN_TIMERANGE_START = "hbase.scan.timerange.start";
-
-	/** The ending timestamp used to filter columns with a specific range of versions. */
-	public static final String SCAN_TIMERANGE_END = "hbase.scan.timerange.end";
-
-	/** The maximum number of version to return. */
-	public static final String SCAN_MAXVERSIONS = "hbase.scan.maxversions";
-
-	/** Set to false to disable server-side caching of blocks for this scan. */
-	public static final String SCAN_CACHEBLOCKS = "hbase.scan.cacheblocks";
-
-	/** The number of rows for caching that will be passed to scanners. */
-	public static final String SCAN_CACHEDROWS = "hbase.scan.cachedrows";
-
-	/** mutable objects that are used to avoid recreation of wrapper objects */
-	protected HBaseKey hbaseKey;
-
-	protected HBaseResult hbaseResult;
-
-	private org.apache.hadoop.conf.Configuration hConf;
-
-	@Override
-	public void configure(Configuration parameters) {
-		HTable table = createTable(parameters);
-		setTable(table);
-		Scan scan = createScanner(parameters);
-		setScan(scan);
-	}
-
+	// abstract methods allow for multiple table and scanners in the same job
+	protected abstract Scan getScanner();
+	protected abstract String getTableName();
+	protected abstract T mapResultToTuple(Result r);
+	
 	/**
-	 * Read the configuration and creates a {@link Scan} object.
+	 * creates a {@link Scan} object and a {@link HTable} connection
 	 * 
 	 * @param parameters
-	 * @return The scanner
+	 * @see Configuration
 	 */
-	protected Scan createScanner(Configuration parameters) {
-		Scan scan = null;
-		if (parameters.getString(SCAN, null) != null) {
-			try {
-				scan = HBaseUtil.convertStringToScan(parameters.getString(SCAN, null));
-			} catch (IOException e) {
-				LOG.error("An error occurred.", e);
-			}
-		} else {
-			try {
-				scan = new Scan();
-
-				// if (parameters.getString(SCAN_COLUMNS, null) != null) {
-				// scan.addColumns(parameters.getString(SCAN_COLUMNS, null));
-				// }
-
-				if (parameters.getString(SCAN_COLUMN_FAMILY, null) != null) {
-					scan.addFamily(Bytes.toBytes(parameters.getString(SCAN_COLUMN_FAMILY, null)));
-				}
-
-				if (parameters.getString(SCAN_TIMESTAMP, null) != null) {
-					scan.setTimeStamp(Long.parseLong(parameters.getString(SCAN_TIMESTAMP, null)));
-				}
-
-				if (parameters.getString(SCAN_TIMERANGE_START, null) != null
-					&& parameters.getString(SCAN_TIMERANGE_END, null) != null) {
-					scan.setTimeRange(
-						Long.parseLong(parameters.getString(SCAN_TIMERANGE_START, null)),
-						Long.parseLong(parameters.getString(SCAN_TIMERANGE_END, null)));
-				}
-
-				if (parameters.getString(SCAN_MAXVERSIONS, null) != null) {
-					scan.setMaxVersions(Integer.parseInt(parameters.getString(SCAN_MAXVERSIONS, null)));
-				}
-
-				if (parameters.getString(SCAN_CACHEDROWS, null) != null) {
-					scan.setCaching(Integer.parseInt(parameters.getString(SCAN_CACHEDROWS, null)));
-				}
-
-				// false by default, full table scans generate too much BC churn
-				scan.setCacheBlocks((parameters.getBoolean(SCAN_CACHEBLOCKS, false)));
-			} catch (Exception e) {
-				LOG.error(StringUtils.stringifyException(e));
-			}
-		}
-		return scan;
+	@Override
+	public void configure(Configuration parameters) {
+		this.table = createTable();
+		this.scan = getScanner();
 	}
-
-	/**
-	 * Create an {@link HTable} instance and set it into this format.
-	 * 
-	 * @param parameters
-	 *        a {@link Configuration} that holds at least the table name.
-	 */
-	protected HTable createTable(Configuration parameters) {
-		String configLocation = parameters.getString(TableInputFormat.CONFIG_LOCATION, null);
-		LOG.info("Got config location: " + configLocation);
-		if (configLocation != null)
-		{
-			org.apache.hadoop.conf.Configuration dummyConf = new org.apache.hadoop.conf.Configuration();
-			if(OperatingSystem.isWindows()) {
-				dummyConf.addResource(new Path("file:/" + configLocation));
-			} else {
-				dummyConf.addResource(new Path("file://" + configLocation));
-			}
-			hConf = HBaseConfiguration.create(dummyConf);
-			;
-			// hConf.set("hbase.master", "im1a5.internetmemory.org");
-			LOG.info("hbase master: " + hConf.get("hbase.master"));
-			LOG.info("zookeeper quorum: " + hConf.get("hbase.zookeeper.quorum"));
-
-		}
-		String tableName = parameters.getString(INPUT_TABLE, "");
+	
+	/** Create an {@link HTable} instance and set it into this format */
+	private HTable createTable() {
+		LOG.info("Initializing HBaseConfiguration");
+		//use files found in the classpath
+		org.apache.hadoop.conf.Configuration hConf = HBaseConfiguration.create();
+		
 		try {
-			return new HTable(this.hConf, tableName);
+			return new HTable(hConf, getTableName());
 		} catch (Exception e) {
 			LOG.error(StringUtils.stringifyException(e));
 		}
@@ -206,161 +90,106 @@ public class TableInputFormat implements InputFormat<Record, TableInputSplit> {
 	}
 
 	@Override
-	public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
-		// TODO Auto-generated method stub
-		return null;
-	}
-
-	@Override
 	public boolean reachedEnd() throws IOException {
 		return this.endReached;
 	}
 
-	protected boolean nextResult() throws IOException {
-		if (this.tableRecordReader == null)
-		{
-			throw new IOException("No table record reader provided!");
-		}
-
-		try {
-			if (this.tableRecordReader.nextKeyValue())
-			{
-				ImmutableBytesWritable currentKey = this.tableRecordReader.getCurrentKey();
-				Result currentValue = this.tableRecordReader.getCurrentValue();
-
-				hbaseKey.setWritable(currentKey);
-				hbaseResult.setResult(currentValue);
-			} else
-			{
-				this.endReached = true;
-				return false;
-			}
-		} catch (InterruptedException e) {
-			LOG.error("Table reader has been interrupted", e);
-			throw new IOException(e);
-		}
-
-		return true;
-	}
-
 	@Override
-	public Record nextRecord(Record record) throws IOException {
-		if (nextResult()) {
-			mapResultToRecord(record, hbaseKey, hbaseResult);
-			return record;
-		} else {
-			return null;
+	public T nextRecord(T reuse) throws IOException {
+		if (this.rs == null){
+			throw new IOException("No table result scanner provided!");
 		}
-	}
-
-	/**
-	 * Maps the current HBase Result into a Record.
-	 * This implementation simply stores the HBaseKey at position 0, and the HBase Result object at position 1.
-	 * 
-	 * @param record
-	 * @param key
-	 * @param result
-	 */
-	public void mapResultToRecord(Record record, HBaseKey key, HBaseResult result) {
-		record.setField(0, key);
-		record.setField(1, result);
-	}
 
-	@Override
-	public void close() throws IOException {
-		this.tableRecordReader.close();
+		Result res = this.rs.next();
+		if (res != null){
+			return mapResultToTuple(res);
+		} 
+		this.endReached = true;
+		return null;
 	}
 
 	@Override
 	public void open(TableInputSplit split) throws IOException {
-		if (split == null)
-		{
+		if (split == null){
 			throw new IOException("Input split is null!");
 		}
-
-		if (this.table == null)
-		{
+		if (table == null){
 			throw new IOException("No HTable provided!");
 		}
-
-		if (this.scan == null)
-		{
+		if (scan == null){
 			throw new IOException("No Scan instance provided");
 		}
 
-		this.tableRecordReader = new TableRecordReader();
-
-		this.tableRecordReader.setHTable(this.table);
-
-		Scan sc = new Scan(this.scan);
-		sc.setStartRow(split.getStartRow());
-		LOG.info("split start row: " + new String(split.getStartRow()));
-		sc.setStopRow(split.getEndRow());
-		LOG.info("split end row: " + new String(split.getEndRow()));
-
-		this.tableRecordReader.setScan(sc);
-		this.tableRecordReader.restart(split.getStartRow());
-
-		this.hbaseKey = new HBaseKey();
-		this.hbaseResult = new HBaseResult();
-
+		logSplitInfo("opening", split);
+		scan.setStartRow(split.getStartRow());
+		scan.setStopRow(split.getEndRow());
+		
+		this.rs = table.getScanner(scan);
 		endReached = false;
 	}
-
+	
+	@Override
+	public void close() throws IOException {
+		this.rs.close();
+		this.table.close();
+	}
 
 	@Override
 	public TableInputSplit[] createInputSplits(final int minNumSplits) throws IOException {
-
-		if (this.table == null) {
-			throw new IOException("No table was provided.");
-		}
-
-		final Pair<byte[][], byte[][]> keys = this.table.getStartEndKeys();
-
+		//Gets the starting and ending row keys for every region in the currently open table
+		final Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
 		if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
-
 			throw new IOException("Expecting at least one region.");
 		}
-		int count = 0;
-		final List<TableInputSplit> splits = new ArrayList<TableInputSplit>(keys.getFirst().length);
+		final byte[] startRow = scan.getStartRow();
+		final byte[] stopRow = scan.getStopRow();
+		final boolean scanWithNoLowerBound = startRow.length == 0;
+		final boolean scanWithNoUpperBound = stopRow.length == 0;
+		
+		final List<TableInputSplit> splits = new ArrayList<TableInputSplit>(minNumSplits);
 		for (int i = 0; i < keys.getFirst().length; i++) {
-
-			if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
+			final byte[] startKey = keys.getFirst()[i];
+			final byte[] endKey = keys.getSecond()[i];
+			final String regionLocation = table.getRegionLocation(startKey, false).getHostnamePort();
+			//Test if the given region is to be included in the InputSplit while splitting the regions of a table
+			if (!includeRegionInSplit(startKey, endKey)) {
 				continue;
 			}
-
-			final String regionLocation = this.table.getRegionLocation(keys.getFirst()[i], false).getHostnamePort();
-			final byte[] startRow = this.scan.getStartRow();
-			final byte[] stopRow = this.scan.getStopRow();
-
-			// determine if the given start an stop key fall into the region
-			if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
-				Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
-				(stopRow.length == 0 ||
-				Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
-
-				final byte[] splitStart = startRow.length == 0 ||
-					Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
-					keys.getFirst()[i] : startRow;
-				final byte[] splitStop = (stopRow.length == 0 ||
-					Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
-					keys.getSecond()[i].length > 0 ?
-					keys.getSecond()[i] : stopRow;
-				final TableInputSplit split = new TableInputSplit(splits.size(), new String[] { regionLocation },
-					this.table.getTableName(), splitStart, splitStop);
+			//Finds the region on which the given row is being served
+			final String[] hosts = new String[] { regionLocation };
+
+			// determine if regions contains keys used by the scan
+			boolean isLastRegion = endKey.length == 0;
+			if ((scanWithNoLowerBound || isLastRegion || Bytes.compareTo(startRow, endKey) < 0) &&
+				(scanWithNoUpperBound || Bytes.compareTo(stopRow, startKey) > 0)) {
+
+				final byte[] splitStart = scanWithNoLowerBound || Bytes.compareTo(startKey, startRow) >= 0 ? startKey : startRow;
+				final byte[] splitStop = (scanWithNoUpperBound || Bytes.compareTo(endKey, stopRow) <= 0)
+						&& !isLastRegion ? endKey : stopRow;
+				int id = splits.size();
+				final TableInputSplit split = new TableInputSplit(id, hosts, table.getTableName(), splitStart, splitStop);
 				splits.add(split);
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("getSplits: split -> " + (count++) + " -> " + split);
-				}
 			}
 		}
-
+		LOG.info("Created " + splits.size() + " splits");
+		for (TableInputSplit split : splits) {
+			logSplitInfo("created", split);
+		}
 		return splits.toArray(new TableInputSplit[0]);
 	}
+	
+	private void logSplitInfo(String action, TableInputSplit split) {
+		int splitId = split.getSplitNumber();
+		String splitStart = Bytes.toString(split.getStartRow());
+		String splitEnd = Bytes.toString(split.getEndRow());
+		String splitStartKey = splitStart.isEmpty() ? "-" : splitStart;
+		String splitStopKey = splitEnd.isEmpty() ? "-" : splitEnd;
+		String[] hostnames = split.getHostnames();
+		LOG.info("{} split [{}|{}|{}|{}]",action, splitId, hostnames, splitStartKey, splitStopKey);
+	}
 
 	/**
-	 * Test if the given region is to be included in the InputSplit while splitting
-	 * the regions of a table.
+	 * Test if the given region is to be included in the InputSplit while splitting the regions of a table.
 	 * <p>
 	 * This optimization is effective when there is a specific reasoning to exclude an entire region from the M-R job,
 	 * (and hence, not contributing to the InputSplit), given the start and end keys of the same. <br>
@@ -386,20 +215,9 @@ public class TableInputFormat implements InputFormat<Record, TableInputSplit> {
 	public InputSplitAssigner getInputSplitAssigner(TableInputSplit[] inputSplits) {
 		return new LocatableInputSplitAssigner(inputSplits);
 	}
-	
-	public void setTable(HTable table) {
-		this.table = table;
-	}
 
-	public HTable getTable() {
-		return table;
-	}
-
-	public void setScan(Scan scan) {
-		this.scan = scan;
-	}
-
-	public Scan getScan() {
-		return scan;
+	@Override
+	public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
+		return null;
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a1100af4/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseKey.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseKey.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseKey.java
deleted file mode 100644
index 4c08493..0000000
--- a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseKey.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.addons.hbase.common;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Key;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
-/**
- * Simple wrapper to encapsulate an HBase h{@link ImmutableBytesWritable} as a Key
- */
-public class HBaseKey implements Key<HBaseKey> {
-
-	private static final long serialVersionUID = 1L;
-
-	private ImmutableBytesWritable writable;
-	
-
-	public HBaseKey() {
-		this.writable = new ImmutableBytesWritable();
-	}
-	
-
-	public HBaseKey(ImmutableBytesWritable writable) {
-		this.writable = writable;
-	}
-	
-	
-	public ImmutableBytesWritable getWritable() {
-		return writable;
-	}
-
-	public void setWritable(ImmutableBytesWritable writable) {
-		this.writable = writable;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		this.writable.write(out);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		this.writable.readFields(in);
-	}
-
-	@Override
-	public int hashCode() {
-		return this.writable.hashCode();
-	}
-	
-	@Override
-	public boolean equals(Object obj) {
-		if (obj.getClass() == HBaseKey.class) {
-			return this.writable.equals(((HBaseKey) obj).writable);
-		} else {
-			return false;
-		}
-	}
-	
-	@Override
-	public int compareTo(HBaseKey other) {
-		return this.writable.compareTo(other.writable);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a1100af4/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java
deleted file mode 100644
index dfae104..0000000
--- a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.addons.hbase.common;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Value;
-import org.apache.hadoop.hbase.client.Result;
-
-public class HBaseResult implements Value {
-	
-	private static final long serialVersionUID = 1L;
-
-	private Result result;
-	
-	
-	public HBaseResult() {
-		this.result = new Result();
-	}
-	
-	public HBaseResult(Result result) {
-		this.result = result;
-	}
-	
-	
-	public Result getResult() {
-		return this.result;
-	}
-	
-	public void setResult(Result result) {
-		this.result = result;
-	}
-	
-	public String getStringData() {
-		if(this.result != null) {
-			return this.result.toString();
-		}
-		return null;
-	}
-	
-	@Override
-	public void read(DataInputView in) throws IOException {
-		this.result.readFields(in);
-	}
-	
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		this.result.write(out);	
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a1100af4/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseUtil.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseUtil.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseUtil.java
deleted file mode 100644
index 607dd78..0000000
--- a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseUtil.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.addons.hbase.common;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Base64;
-
-/**
- * Utility for {@link TableInputFormat}
- */
-public class HBaseUtil {
-
-	/**
-	 * Writes the given scan into a Base64 encoded string.
-	 * 
-	 * @param scan
-	 *        The scan to write out.
-	 * @return The scan saved in a Base64 encoded string.
-	 * @throws IOException
-	 *         When writing the scan fails.
-	 */
-	static String convertScanToString(Scan scan) throws IOException {
-		ByteArrayOutputStream out = new ByteArrayOutputStream();
-		DataOutputStream dos = new DataOutputStream(out);
-		scan.write(dos);
-		return Base64.encodeBytes(out.toByteArray());
-	}
-
-	/**
-	 * Converts the given Base64 string back into a Scan instance.
-	 * 
-	 * @param base64
-	 *        The scan details.
-	 * @return The newly created Scan instance.
-	 * @throws IOException
-	 *         When reading the scan instance fails.
-	 */
-	public static Scan convertStringToScan(String base64) throws IOException {
-		ByteArrayInputStream bis = new ByteArrayInputStream(Base64.decode(base64));
-		DataInputStream dis = new DataInputStream(bis);
-		Scan scan = new Scan();
-		scan.readFields(dis);
-		return scan;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a1100af4/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
deleted file mode 100644
index 881d06a..0000000
--- a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.addons.hbase.example;
-
-import org.apache.flink.addons.hbase.TableInputFormat;
-import org.apache.flink.addons.hbase.common.HBaseKey;
-import org.apache.flink.addons.hbase.common.HBaseResult;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.GenericDataSource;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-
-/**
- * Implements a word count which takes the input file and counts the number of
- * the occurrences of each word in the file.
- */
-public class HBaseReadExample implements Program, ProgramDescription {
-	
-	public static class MyTableInputFormat extends  TableInputFormat {
-		
-		private static final long serialVersionUID = 1L;
-
-		private final byte[] META_FAMILY = "meta".getBytes();
-		
-		private final byte[] USER_COLUMN = "user".getBytes();
-		
-		private final byte[] TIMESTAMP_COLUMN = "timestamp".getBytes();
-		
-		private final byte[] TEXT_FAMILY = "text".getBytes();
-		
-		private final byte[] TWEET_COLUMN = "tweet".getBytes();
-		
-		public MyTableInputFormat() {
-			super();
-			
-		}
-		
-		@Override
-		protected HTable createTable(Configuration parameters) {
-			return super.createTable(parameters);
-		}
-		
-		@Override
-		protected Scan createScanner(Configuration parameters) {
-			Scan scan = new Scan ();
-			scan.addColumn (META_FAMILY, USER_COLUMN);
-			scan.addColumn (META_FAMILY, TIMESTAMP_COLUMN);
-			scan.addColumn (TEXT_FAMILY, TWEET_COLUMN);
-			return scan;
-		}
-		
-		StringValue row_string = new StringValue();
-		StringValue user_string = new StringValue();
-		StringValue timestamp_string = new StringValue();
-		StringValue tweet_string = new StringValue();
-		
-		@Override
-		public void mapResultToRecord(Record record, HBaseKey key,
-				HBaseResult result) {
-			Result res = result.getResult();
-			res.getRow();
-			record.setField(0, toString(row_string, res.getRow()));
-			record.setField(1, toString (user_string, res.getValue(META_FAMILY, USER_COLUMN)));
-			record.setField(2, toString (timestamp_string, res.getValue(META_FAMILY, TIMESTAMP_COLUMN)));
-			record.setField(3, toString (tweet_string, res.getValue(TEXT_FAMILY, TWEET_COLUMN)));
-		}
-		
-		private final StringValue toString (StringValue string, byte[] bytes) {
-			string.setValueAscii(bytes, 0, bytes.length);
-			return string;
-		}
-		
-	}
-	
-
-	@Override
-	public Plan getPlan(String... args) {
-		// parse job parameters
-		int numSubTasks   = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
-		String output    = (args.length > 1 ? args[1] : "");
-
-		GenericDataSource<TableInputFormat> source = new GenericDataSource<TableInputFormat>(new MyTableInputFormat(), "HBase Input");
-		source.setParameter(TableInputFormat.INPUT_TABLE, "twitter");
-		source.setParameter(TableInputFormat.CONFIG_LOCATION, "/etc/hbase/conf/hbase-site.xml");
-		FileDataSink out = new FileDataSink(new CsvOutputFormat(), output, source, "HBase String dump");
-		CsvOutputFormat.configureRecordFormat(out)
-			.recordDelimiter('\n')
-			.fieldDelimiter(' ')
-			.field(StringValue.class, 0)
-			.field(StringValue.class, 1)
-			.field(StringValue.class, 2)
-			.field(StringValue.class, 3);
-		
-		Plan plan = new Plan(out, "HBase access Example");
-		plan.setDefaultParallelism(numSubTasks);
-		return plan;
-	}
-
-
-	@Override
-	public String getDescription() {
-		return "Parameters: [numSubStasks] [input] [output]";
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a1100af4/flink-addons/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java b/flink-addons/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
new file mode 100755
index 0000000..b6f345a
--- /dev/null
+++ b/flink-addons/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase.example;
+
+import org.apache.flink.addons.hbase.TableInputFormat;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Simple stub for HBase DataSet
+ * 
+ * To run the test first create the test table with hbase shell.
+ * 
+ * Use the following commands:
+ * <ul>
+ *     <li>create 'test-table', 'someCf'</li>
+ *     <li>put 'test-table', '1', 'someCf:someQual', 'someString'</li>
+ *     <li>put 'test-table', '2', 'someCf:someQual', 'anotherString'</li>
+ * </ul>
+ * 
+ * The test should return just the first entry.
+ * 
+ */
+public class HBaseReadExample {
+	public static void main(String[] args) throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		@SuppressWarnings("serial")
+		DataSet<Tuple2<String, String>> hbaseDs = env.createInput(new TableInputFormat<Tuple2<String, String>>() {
+			private final byte[] CF_SOME = "someCf".getBytes();
+			private final byte[] Q_SOME = "someQual".getBytes();
+				@Override
+				public String getTableName() {
+					return "test-table";
+				}
+
+				@Override
+				protected Scan getScanner() {
+					Scan scan = new Scan();
+					scan.addColumn(CF_SOME, Q_SOME);
+					return scan;
+				}
+
+				private Tuple2<String, String> reuse = new Tuple2<String, String>();
+				
+				@Override
+				protected Tuple2<String, String> mapResultToTuple(Result r) {
+					String key = Bytes.toString(r.getRow());
+					String val = Bytes.toString(r.getValue(CF_SOME, Q_SOME));
+					reuse.setField(key, 0);
+					reuse.setField(val, 1);
+					return reuse;
+				}
+		})
+		.filter(new FilterFunction<Tuple2<String,String>>() {
+
+			@Override
+			public boolean filter(Tuple2<String, String> t) throws Exception {
+				String val = t.getField(1);
+				if(val.startsWith("someStr"))
+					return true;
+				return false;
+			}
+		});
+		
+		hbaseDs.print();
+		
+		// kick off execution.
+		env.execute();
+				
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a1100af4/flink-addons/flink-hbase/src/test/resources/hbase-site.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/test/resources/hbase-site.xml b/flink-addons/flink-hbase/src/test/resources/hbase-site.xml
new file mode 100644
index 0000000..2984063
--- /dev/null
+++ b/flink-addons/flink-hbase/src/test/resources/hbase-site.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration>
+
+  <property>
+    <name>hbase.tmp.dir</name>
+    <!-- 
+    <value>/media/Dati/hbase-0.98-data</value>
+    --> 
+    <value>/opt/hbase-0.98.6.1-hadoop2/data</value>
+
+  </property>
+  <property>
+    <name>hbase.zookeeper.quorum</name>
+    <value>localhost</value>
+  </property>
+    <!-- 
+  <property>
+    <name>hadoop.security.group.mapping</name>
+    <value>org.apache.hadoop.security.ShellBasedUnixGroupsMapping</value>
+  </property>
+  -->
+</configuration>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a1100af4/flink-addons/flink-hbase/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/test/resources/log4j.properties b/flink-addons/flink-hbase/src/test/resources/log4j.properties
new file mode 100755
index 0000000..c83ec70
--- /dev/null
+++ b/flink-addons/flink-hbase/src/test/resources/log4j.properties
@@ -0,0 +1,6 @@
+log4j.rootLogger=${hadoop.root.logger}
+hadoop.root.logger=INFO,console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a1100af4/flink-addons/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/pom.xml b/flink-addons/pom.xml
index 6b4083e..1086c56 100644
--- a/flink-addons/pom.xml
+++ b/flink-addons/pom.xml
@@ -52,7 +52,7 @@ under the License.
 				</property>
 			</activation>
 			<modules>
-				<!-- No extra modules: pact-hbase is currently not compatible with Hadoop v1 -->
+				<!-- No extra modules: flink-hbase is currently not compatible with Hadoop v1 -->
 			</modules>
 		</profile>
 		<profile>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a1100af4/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
index ce495d1..15fddb2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
@@ -183,9 +183,12 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 			Thread currentThread = Thread.currentThread();
 			ClassLoader context = currentThread.getContextClassLoader();
 			currentThread.setContextClassLoader(userCodeClassLoader);
-			this.invokable.registerInputOutput();
-
-			currentThread.setContextClassLoader(context);
+			try {
+				this.invokable.registerInputOutput();
+			}
+			finally {
+				currentThread.setContextClassLoader(context);
+			}
 		}
 
 		List<GateDeploymentDescriptor> inGates = tdd.getInputGates();