You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/27 18:55:57 UTC
[4/5] incubator-flink git commit: Upgraded HBase addon to HBase
0.98.x and new Tuple APIs
Upgraded HBase addon to HBase 0.98.x and new Tuple APIs
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/a1100af4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/a1100af4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/a1100af4
Branch: refs/heads/master
Commit: a1100af4247f77632f07ee9cea9fc7452104fac6
Parents: d554faa
Author: fpompermaier <fp...@github.com>
Authored: Thu Nov 20 00:08:02 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 27 18:18:26 2014 +0100
----------------------------------------------------------------------
flink-addons/flink-hbase/pom.xml | 162 ++++----
.../addons/hbase/GenericTableOutputFormat.java | 116 ------
.../flink/addons/hbase/HBaseDataSink.java | 47 ---
.../flink/addons/hbase/TableInputFormat.java | 378 +++++--------------
.../flink/addons/hbase/common/HBaseKey.java | 87 -----
.../flink/addons/hbase/common/HBaseResult.java | 69 ----
.../flink/addons/hbase/common/HBaseUtil.java | 68 ----
.../addons/hbase/example/HBaseReadExample.java | 129 -------
.../addons/hbase/example/HBaseReadExample.java | 93 +++++
.../src/test/resources/hbase-site.xml | 43 +++
.../src/test/resources/log4j.properties | 6 +
flink-addons/pom.xml | 2 +-
.../runtime/execution/RuntimeEnvironment.java | 9 +-
13 files changed, 336 insertions(+), 873 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a1100af4/flink-addons/flink-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/pom.xml b/flink-addons/flink-hbase/pom.xml
index 9a400a7..e0c40d7 100644
--- a/flink-addons/flink-hbase/pom.xml
+++ b/flink-addons/flink-hbase/pom.xml
@@ -28,108 +28,124 @@ under the License.
<version>0.8-incubating-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
-
- <repositories>
- <repository>
- <id>cloudera-releases</id>
- <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
- <releases>
- <enabled>true</enabled>
- </releases>
- <snapshots>
- <enabled>false</enabled>
- </snapshots>
- </repository>
- </repositories>
-
- <properties>
- <hbase.version>0.96.0-hadoop2</hbase.version>
- </properties>
<artifactId>flink-hbase</artifactId>
<name>flink-hbase</name>
<packaging>jar</packaging>
+ <properties>
+ <hbase.hadoop1.version>0.98.6.1-hadoop1</hbase.hadoop1.version>
+ <hbase.hadoop2.version>0.98.6.1-hadoop2</hbase.hadoop2.version>
+ </properties>
+
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
</dependency>
-
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${project.version}</version>
</dependency>
-
+
<dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase</artifactId>
- <version>0.94.2-cdh4.2.1</version>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients</artifactId>
+ <version>${project.version}</version>
<exclusions>
- <!-- jruby is used for the hbase shell. -->
<exclusion>
- <groupId>org.jruby</groupId>
- <artifactId>jruby-complete</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
</exclusion>
</exclusions>
+ <scope>test</scope>
</dependency>
-
+
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>${hadoop.version}</version>
- <exclusions>
- <exclusion>
- <groupId>asm</groupId>
- <artifactId>asm</artifactId>
- </exclusion>
- <exclusion>
- <groupId>tomcat</groupId>
- <artifactId>jasper-compiler</artifactId>
- </exclusion>
- <exclusion>
- <groupId>tomcat</groupId>
- <artifactId>jasper-runtime</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jsp-api-2.1</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jsp-2.1</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty-util</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.eclipse.jdt</groupId>
- <artifactId>core</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- </dependencies>
- <!-- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-server</artifactId>
- <version>${hbase.version}</version>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-hadoop-compatibility</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
</dependency>
+
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
- -->
+ </dependencies>
+
+ <profiles>
+ <profile>
+ <id>hadoop-1</id>
+ <activation>
+ <property>
+ <!-- Please do not remove the 'hadoop1' comment. See ./tools/generate_specific_pom.sh -->
+ <!--hadoop1--><name>hadoop.profile</name><value>1</value>
+ </property>
+ </activation>
+ <properties>
+ <hbase.version>${hbase.hadoop1.version}</hbase.version>
+ </properties>
+ </profile>
+
+ <profile>
+ <id>hadoop-2</id>
+ <activation>
+ <property>
+ <!-- Please do not remove the 'hadoop2' comment. See ./tools/generate_specific_pom.sh -->
+ <!--hadoop2--><name>!hadoop.profile</name>
+ </property>
+ </activation>
+ <properties>
+ <hbase.version>${hbase.hadoop2.version}</hbase.version>
+ </properties>
+ <dependencies>
+ <!-- Force hadoop-common dependency -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </dependency>
+ </dependencies>
+ </profile>
+
+ <profile>
+ <id>cdh5.1.3</id>
+ <properties>
+ <hadoop.profile>2</hadoop.profile>
+ <hbase.version>0.98.1-cdh5.1.3</hbase.version>
+ <hadoop.version>2.3.0-cdh5.1.3</hadoop.version>
+ <!-- Cloudera use different versions for hadoop core and commons-->
+ <!-- This profile could be removed if Cloudera fix this mismatch! -->
+ <hadoop.core.version>2.3.0-mr1-cdh5.1.3</hadoop.core.version>
+ </properties>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <version>${hadoop.core.version}</version>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+ <dependencies>
+ <!-- Force hadoop-common dependency -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ </dependencies>
+ </profile>
- <!-- hadoop-client is available for yarn and non-yarn, so there is no need
- to use profiles See ticket https://issues.apache.org/jira/browse/HADOOP-8009
- for description of hadoop-clients -->
+ </profiles>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a1100af4/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/GenericTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/GenericTableOutputFormat.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/GenericTableOutputFormat.java
deleted file mode 100644
index 49fcae3..0000000
--- a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/GenericTableOutputFormat.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.addons.hbase;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.Record;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskType;
-
-public abstract class GenericTableOutputFormat implements OutputFormat<Record> {
-
- private static final long serialVersionUID = 1L;
-
- public static final String JT_ID_KEY = "pact.hbase.jtkey";
-
- public static final String JOB_ID_KEY = "pact.job.id";
-
- private RecordWriter<ImmutableBytesWritable, KeyValue> writer;
-
- private Configuration config;
-
- private org.apache.hadoop.conf.Configuration hadoopConfig;
-
- private TaskAttemptContext context;
-
- private String jtID;
-
- private int jobId;
-
-
- @Override
- public void configure(Configuration parameters) {
- this.config = parameters;
-
- // get the ID parameters
- this.jtID = parameters.getString(JT_ID_KEY, null);
- if (this.jtID == null) {
- throw new RuntimeException("Missing JT_ID entry in hbase config.");
- }
- this.jobId = parameters.getInteger(JOB_ID_KEY, -1);
- if (this.jobId < 0) {
- throw new RuntimeException("Missing or invalid job id in input config.");
- }
- }
-
- @Override
- public void open(int taskNumber, int numTasks) throws IOException {
- this.hadoopConfig = getHadoopConfig(this.config);
-
- /**
- * PLASE NOTE:
- * If you are a Eclipse+Maven Integration user and you have two (or more) warnings here, please
- * close the pact-hbase project OR set the maven profile to hadoop_yarn
- *
- * pact-hbase requires hadoop_yarn, but Eclipse is not able to parse maven profiles properly. Therefore,
- * it imports the pact-hbase project even if it is not included in the standard profile (hadoop_v1)
- */
- final TaskAttemptID attemptId = new TaskAttemptID(this.jtID, this.jobId, TaskType.MAP, taskNumber - 1, 0);
-
- this.context = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(this.hadoopConfig, attemptId);
- final HFileOutputFormat outFormat = new HFileOutputFormat();
- try {
- this.writer = outFormat.getRecordWriter(this.context);
- } catch (InterruptedException iex) {
- throw new IOException("Opening the writer was interrupted.", iex);
- }
- }
-
- @Override
- public void close() throws IOException {
- final RecordWriter<ImmutableBytesWritable, KeyValue> writer = this.writer;
- this.writer = null;
- if (writer != null) {
- try {
- writer.close(this.context);
- } catch (InterruptedException iex) {
- throw new IOException("Closing was interrupted.", iex);
- }
- }
- }
-
- public void collectKeyValue(KeyValue kv) throws IOException {
- try {
- this.writer.write(null, kv);
- } catch (InterruptedException iex) {
- throw new IOException("Write request was interrupted.", iex);
- }
- }
-
- public abstract org.apache.hadoop.conf.Configuration getHadoopConfig(Configuration config);
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a1100af4/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseDataSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseDataSink.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseDataSink.java
deleted file mode 100644
index fc0f226..0000000
--- a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseDataSink.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.addons.hbase;
-
-import java.util.Random;
-
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.java.record.operators.GenericDataSink;
-
-/**
- * A sink for writing to HBase
- */
-public class HBaseDataSink extends GenericDataSink {
-
- private static final int IDENTIFYIER_LEN = 16;
-
- public HBaseDataSink(GenericTableOutputFormat f, Operator input, String name) {
- super(f, input, name);
-
- // generate a random unique identifier string
- final Random rnd = new Random();
- final StringBuilder bld = new StringBuilder();
- for (int i = 0; i < IDENTIFYIER_LEN; i++) {
- bld.append((char) (rnd.nextInt(26) + 'a'));
- }
-
- setParameter(GenericTableOutputFormat.JT_ID_KEY, bld.toString());
- setParameter(GenericTableOutputFormat.JOB_ID_KEY, rnd.nextInt());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a1100af4/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
old mode 100644
new mode 100755
index d2b5d04..9e0dad4
--- a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
+++ b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
@@ -16,189 +16,73 @@
* limitations under the License.
*/
-
package org.apache.flink.addons.hbase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.addons.hbase.common.HBaseKey;
-import org.apache.flink.addons.hbase.common.HBaseResult;
-import org.apache.flink.addons.hbase.common.HBaseUtil;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.OperatingSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.TableRecordReader;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* {@link InputFormat} subclass that wraps the access for HTables.
*/
-public class TableInputFormat implements InputFormat<Record, TableInputSplit> {
+public abstract class TableInputFormat<T extends Tuple> implements InputFormat<T, TableInputSplit>{
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(TableInputFormat.class);
- /** A handle on an HBase table */
- private HTable table;
-
- /** The scanner that performs the actual access on the table. HBase object */
- private Scan scan;
-
- /** Hbase' iterator wrapper */
- private TableRecordReader tableRecordReader;
-
/** helper variable to decide whether the input is exhausted or not */
private boolean endReached = false;
+
+ // TODO table and scan could be serialized when kryo serializer will be the default
+ protected transient HTable table;
+ protected transient Scan scan;
+
+ /** HBase iterator wrapper */
+ private ResultScanner rs;
- /** Job parameter that specifies the input table. */
- public static final String INPUT_TABLE = "hbase.inputtable";
-
- /** Location of the hbase-site.xml. If set, the HBaseAdmin will build inside */
- public static final String CONFIG_LOCATION = "hbase.config.location";
-
- /**
- * Base-64 encoded scanner. All other SCAN_ confs are ignored if this is specified.
- * See TableMapReduceUtil.convertScanToString(Scan) for more details.
- */
- public static final String SCAN = "hbase.scan";
-
- /** Column Family to Scan */
- public static final String SCAN_COLUMN_FAMILY = "hbase.scan.column.family";
-
- /** Space delimited list of columns to scan. */
- public static final String SCAN_COLUMNS = "hbase.scan.columns";
-
- /** The timestamp used to filter columns with a specific timestamp. */
- public static final String SCAN_TIMESTAMP = "hbase.scan.timestamp";
-
- /** The starting timestamp used to filter columns with a specific range of versions. */
- public static final String SCAN_TIMERANGE_START = "hbase.scan.timerange.start";
-
- /** The ending timestamp used to filter columns with a specific range of versions. */
- public static final String SCAN_TIMERANGE_END = "hbase.scan.timerange.end";
-
- /** The maximum number of version to return. */
- public static final String SCAN_MAXVERSIONS = "hbase.scan.maxversions";
-
- /** Set to false to disable server-side caching of blocks for this scan. */
- public static final String SCAN_CACHEBLOCKS = "hbase.scan.cacheblocks";
-
- /** The number of rows for caching that will be passed to scanners. */
- public static final String SCAN_CACHEDROWS = "hbase.scan.cachedrows";
-
- /** mutable objects that are used to avoid recreation of wrapper objects */
- protected HBaseKey hbaseKey;
-
- protected HBaseResult hbaseResult;
-
- private org.apache.hadoop.conf.Configuration hConf;
-
- @Override
- public void configure(Configuration parameters) {
- HTable table = createTable(parameters);
- setTable(table);
- Scan scan = createScanner(parameters);
- setScan(scan);
- }
-
+ // abstract methods allow for multiple table and scanners in the same job
+ protected abstract Scan getScanner();
+ protected abstract String getTableName();
+ protected abstract T mapResultToTuple(Result r);
+
/**
- * Read the configuration and creates a {@link Scan} object.
+ * creates a {@link Scan} object and a {@link HTable} connection
*
* @param parameters
- * @return The scanner
+ * @see Configuration
*/
- protected Scan createScanner(Configuration parameters) {
- Scan scan = null;
- if (parameters.getString(SCAN, null) != null) {
- try {
- scan = HBaseUtil.convertStringToScan(parameters.getString(SCAN, null));
- } catch (IOException e) {
- LOG.error("An error occurred.", e);
- }
- } else {
- try {
- scan = new Scan();
-
- // if (parameters.getString(SCAN_COLUMNS, null) != null) {
- // scan.addColumns(parameters.getString(SCAN_COLUMNS, null));
- // }
-
- if (parameters.getString(SCAN_COLUMN_FAMILY, null) != null) {
- scan.addFamily(Bytes.toBytes(parameters.getString(SCAN_COLUMN_FAMILY, null)));
- }
-
- if (parameters.getString(SCAN_TIMESTAMP, null) != null) {
- scan.setTimeStamp(Long.parseLong(parameters.getString(SCAN_TIMESTAMP, null)));
- }
-
- if (parameters.getString(SCAN_TIMERANGE_START, null) != null
- && parameters.getString(SCAN_TIMERANGE_END, null) != null) {
- scan.setTimeRange(
- Long.parseLong(parameters.getString(SCAN_TIMERANGE_START, null)),
- Long.parseLong(parameters.getString(SCAN_TIMERANGE_END, null)));
- }
-
- if (parameters.getString(SCAN_MAXVERSIONS, null) != null) {
- scan.setMaxVersions(Integer.parseInt(parameters.getString(SCAN_MAXVERSIONS, null)));
- }
-
- if (parameters.getString(SCAN_CACHEDROWS, null) != null) {
- scan.setCaching(Integer.parseInt(parameters.getString(SCAN_CACHEDROWS, null)));
- }
-
- // false by default, full table scans generate too much BC churn
- scan.setCacheBlocks((parameters.getBoolean(SCAN_CACHEBLOCKS, false)));
- } catch (Exception e) {
- LOG.error(StringUtils.stringifyException(e));
- }
- }
- return scan;
+ @Override
+ public void configure(Configuration parameters) {
+ this.table = createTable();
+ this.scan = getScanner();
}
-
- /**
- * Create an {@link HTable} instance and set it into this format.
- *
- * @param parameters
- * a {@link Configuration} that holds at least the table name.
- */
- protected HTable createTable(Configuration parameters) {
- String configLocation = parameters.getString(TableInputFormat.CONFIG_LOCATION, null);
- LOG.info("Got config location: " + configLocation);
- if (configLocation != null)
- {
- org.apache.hadoop.conf.Configuration dummyConf = new org.apache.hadoop.conf.Configuration();
- if(OperatingSystem.isWindows()) {
- dummyConf.addResource(new Path("file:/" + configLocation));
- } else {
- dummyConf.addResource(new Path("file://" + configLocation));
- }
- hConf = HBaseConfiguration.create(dummyConf);
- ;
- // hConf.set("hbase.master", "im1a5.internetmemory.org");
- LOG.info("hbase master: " + hConf.get("hbase.master"));
- LOG.info("zookeeper quorum: " + hConf.get("hbase.zookeeper.quorum"));
-
- }
- String tableName = parameters.getString(INPUT_TABLE, "");
+
+ /** Create an {@link HTable} instance and set it into this format */
+ private HTable createTable() {
+ LOG.info("Initializing HBaseConfiguration");
+ //use files found in the classpath
+ org.apache.hadoop.conf.Configuration hConf = HBaseConfiguration.create();
+
try {
- return new HTable(this.hConf, tableName);
+ return new HTable(hConf, getTableName());
} catch (Exception e) {
LOG.error(StringUtils.stringifyException(e));
}
@@ -206,161 +90,106 @@ public class TableInputFormat implements InputFormat<Record, TableInputSplit> {
}
@Override
- public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
public boolean reachedEnd() throws IOException {
return this.endReached;
}
- protected boolean nextResult() throws IOException {
- if (this.tableRecordReader == null)
- {
- throw new IOException("No table record reader provided!");
- }
-
- try {
- if (this.tableRecordReader.nextKeyValue())
- {
- ImmutableBytesWritable currentKey = this.tableRecordReader.getCurrentKey();
- Result currentValue = this.tableRecordReader.getCurrentValue();
-
- hbaseKey.setWritable(currentKey);
- hbaseResult.setResult(currentValue);
- } else
- {
- this.endReached = true;
- return false;
- }
- } catch (InterruptedException e) {
- LOG.error("Table reader has been interrupted", e);
- throw new IOException(e);
- }
-
- return true;
- }
-
@Override
- public Record nextRecord(Record record) throws IOException {
- if (nextResult()) {
- mapResultToRecord(record, hbaseKey, hbaseResult);
- return record;
- } else {
- return null;
+ public T nextRecord(T reuse) throws IOException {
+ if (this.rs == null){
+ throw new IOException("No table result scanner provided!");
}
- }
-
- /**
- * Maps the current HBase Result into a Record.
- * This implementation simply stores the HBaseKey at position 0, and the HBase Result object at position 1.
- *
- * @param record
- * @param key
- * @param result
- */
- public void mapResultToRecord(Record record, HBaseKey key, HBaseResult result) {
- record.setField(0, key);
- record.setField(1, result);
- }
- @Override
- public void close() throws IOException {
- this.tableRecordReader.close();
+ Result res = this.rs.next();
+ if (res != null){
+ return mapResultToTuple(res);
+ }
+ this.endReached = true;
+ return null;
}
@Override
public void open(TableInputSplit split) throws IOException {
- if (split == null)
- {
+ if (split == null){
throw new IOException("Input split is null!");
}
-
- if (this.table == null)
- {
+ if (table == null){
throw new IOException("No HTable provided!");
}
-
- if (this.scan == null)
- {
+ if (scan == null){
throw new IOException("No Scan instance provided");
}
- this.tableRecordReader = new TableRecordReader();
-
- this.tableRecordReader.setHTable(this.table);
-
- Scan sc = new Scan(this.scan);
- sc.setStartRow(split.getStartRow());
- LOG.info("split start row: " + new String(split.getStartRow()));
- sc.setStopRow(split.getEndRow());
- LOG.info("split end row: " + new String(split.getEndRow()));
-
- this.tableRecordReader.setScan(sc);
- this.tableRecordReader.restart(split.getStartRow());
-
- this.hbaseKey = new HBaseKey();
- this.hbaseResult = new HBaseResult();
-
+ logSplitInfo("opening", split);
+ scan.setStartRow(split.getStartRow());
+ scan.setStopRow(split.getEndRow());
+
+ this.rs = table.getScanner(scan);
endReached = false;
}
-
+
+ @Override
+ public void close() throws IOException {
+ this.rs.close();
+ this.table.close();
+ }
@Override
public TableInputSplit[] createInputSplits(final int minNumSplits) throws IOException {
-
- if (this.table == null) {
- throw new IOException("No table was provided.");
- }
-
- final Pair<byte[][], byte[][]> keys = this.table.getStartEndKeys();
-
+ //Gets the starting and ending row keys for every region in the currently open table
+ final Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
-
throw new IOException("Expecting at least one region.");
}
- int count = 0;
- final List<TableInputSplit> splits = new ArrayList<TableInputSplit>(keys.getFirst().length);
+ final byte[] startRow = scan.getStartRow();
+ final byte[] stopRow = scan.getStopRow();
+ final boolean scanWithNoLowerBound = startRow.length == 0;
+ final boolean scanWithNoUpperBound = stopRow.length == 0;
+
+ final List<TableInputSplit> splits = new ArrayList<TableInputSplit>(minNumSplits);
for (int i = 0; i < keys.getFirst().length; i++) {
-
- if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
+ final byte[] startKey = keys.getFirst()[i];
+ final byte[] endKey = keys.getSecond()[i];
+ final String regionLocation = table.getRegionLocation(startKey, false).getHostnamePort();
+ //Test if the given region is to be included in the InputSplit while splitting the regions of a table
+ if (!includeRegionInSplit(startKey, endKey)) {
continue;
}
-
- final String regionLocation = this.table.getRegionLocation(keys.getFirst()[i], false).getHostnamePort();
- final byte[] startRow = this.scan.getStartRow();
- final byte[] stopRow = this.scan.getStopRow();
-
- // determine if the given start an stop key fall into the region
- if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
- Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
- (stopRow.length == 0 ||
- Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
-
- final byte[] splitStart = startRow.length == 0 ||
- Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
- keys.getFirst()[i] : startRow;
- final byte[] splitStop = (stopRow.length == 0 ||
- Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
- keys.getSecond()[i].length > 0 ?
- keys.getSecond()[i] : stopRow;
- final TableInputSplit split = new TableInputSplit(splits.size(), new String[] { regionLocation },
- this.table.getTableName(), splitStart, splitStop);
+ //Finds the region on which the given row is being served
+ final String[] hosts = new String[] { regionLocation };
+
+ // determine if regions contains keys used by the scan
+ boolean isLastRegion = endKey.length == 0;
+ if ((scanWithNoLowerBound || isLastRegion || Bytes.compareTo(startRow, endKey) < 0) &&
+ (scanWithNoUpperBound || Bytes.compareTo(stopRow, startKey) > 0)) {
+
+ final byte[] splitStart = scanWithNoLowerBound || Bytes.compareTo(startKey, startRow) >= 0 ? startKey : startRow;
+ final byte[] splitStop = (scanWithNoUpperBound || Bytes.compareTo(endKey, stopRow) <= 0)
+ && !isLastRegion ? endKey : stopRow;
+ int id = splits.size();
+ final TableInputSplit split = new TableInputSplit(id, hosts, table.getTableName(), splitStart, splitStop);
splits.add(split);
- if (LOG.isDebugEnabled()) {
- LOG.debug("getSplits: split -> " + (count++) + " -> " + split);
- }
}
}
-
+ LOG.info("Created " + splits.size() + " splits");
+ for (TableInputSplit split : splits) {
+ logSplitInfo("created", split);
+ }
return splits.toArray(new TableInputSplit[0]);
}
+
+ private void logSplitInfo(String action, TableInputSplit split) {
+ int splitId = split.getSplitNumber();
+ String splitStart = Bytes.toString(split.getStartRow());
+ String splitEnd = Bytes.toString(split.getEndRow());
+ String splitStartKey = splitStart.isEmpty() ? "-" : splitStart;
+ String splitStopKey = splitEnd.isEmpty() ? "-" : splitEnd;
+ String[] hostnames = split.getHostnames();
+ LOG.info("{} split [{}|{}|{}|{}]",action, splitId, hostnames, splitStartKey, splitStopKey);
+ }
/**
- * Test if the given region is to be included in the InputSplit while splitting
- * the regions of a table.
+ * Test if the given region is to be included in the InputSplit while splitting the regions of a table.
* <p>
* This optimization is effective when there is a specific reasoning to exclude an entire region from the M-R job,
* (and hence, not contributing to the InputSplit), given the start and end keys of the same. <br>
@@ -386,20 +215,9 @@ public class TableInputFormat implements InputFormat<Record, TableInputSplit> {
public InputSplitAssigner getInputSplitAssigner(TableInputSplit[] inputSplits) {
return new LocatableInputSplitAssigner(inputSplits);
}
-
- public void setTable(HTable table) {
- this.table = table;
- }
- public HTable getTable() {
- return table;
- }
-
- public void setScan(Scan scan) {
- this.scan = scan;
- }
-
- public Scan getScan() {
- return scan;
+ @Override
+ public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
+ return null;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a1100af4/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseKey.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseKey.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseKey.java
deleted file mode 100644
index 4c08493..0000000
--- a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseKey.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.addons.hbase.common;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Key;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
-/**
- * Simple wrapper to encapsulate an HBase h{@link ImmutableBytesWritable} as a Key
- */
-public class HBaseKey implements Key<HBaseKey> {
-
- private static final long serialVersionUID = 1L;
-
- private ImmutableBytesWritable writable;
-
-
- public HBaseKey() {
- this.writable = new ImmutableBytesWritable();
- }
-
-
- public HBaseKey(ImmutableBytesWritable writable) {
- this.writable = writable;
- }
-
-
- public ImmutableBytesWritable getWritable() {
- return writable;
- }
-
- public void setWritable(ImmutableBytesWritable writable) {
- this.writable = writable;
- }
-
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void write(DataOutputView out) throws IOException {
- this.writable.write(out);
- }
-
- @Override
- public void read(DataInputView in) throws IOException {
- this.writable.readFields(in);
- }
-
- @Override
- public int hashCode() {
- return this.writable.hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj.getClass() == HBaseKey.class) {
- return this.writable.equals(((HBaseKey) obj).writable);
- } else {
- return false;
- }
- }
-
- @Override
- public int compareTo(HBaseKey other) {
- return this.writable.compareTo(other.writable);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a1100af4/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java
deleted file mode 100644
index dfae104..0000000
--- a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.addons.hbase.common;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Value;
-import org.apache.hadoop.hbase.client.Result;
-
-public class HBaseResult implements Value {
-
- private static final long serialVersionUID = 1L;
-
- private Result result;
-
-
- public HBaseResult() {
- this.result = new Result();
- }
-
- public HBaseResult(Result result) {
- this.result = result;
- }
-
-
- public Result getResult() {
- return this.result;
- }
-
- public void setResult(Result result) {
- this.result = result;
- }
-
- public String getStringData() {
- if(this.result != null) {
- return this.result.toString();
- }
- return null;
- }
-
- @Override
- public void read(DataInputView in) throws IOException {
- this.result.readFields(in);
- }
-
- @Override
- public void write(DataOutputView out) throws IOException {
- this.result.write(out);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a1100af4/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseUtil.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseUtil.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseUtil.java
deleted file mode 100644
index 607dd78..0000000
--- a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseUtil.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.addons.hbase.common;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Base64;
-
-/**
- * Utility for {@link TableInputFormat}
- */
-public class HBaseUtil {
-
- /**
- * Writes the given scan into a Base64 encoded string.
- *
- * @param scan
- * The scan to write out.
- * @return The scan saved in a Base64 encoded string.
- * @throws IOException
- * When writing the scan fails.
- */
- static String convertScanToString(Scan scan) throws IOException {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(out);
- scan.write(dos);
- return Base64.encodeBytes(out.toByteArray());
- }
-
- /**
- * Converts the given Base64 string back into a Scan instance.
- *
- * @param base64
- * The scan details.
- * @return The newly created Scan instance.
- * @throws IOException
- * When reading the scan instance fails.
- */
- public static Scan convertStringToScan(String base64) throws IOException {
- ByteArrayInputStream bis = new ByteArrayInputStream(Base64.decode(base64));
- DataInputStream dis = new DataInputStream(bis);
- Scan scan = new Scan();
- scan.readFields(dis);
- return scan;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a1100af4/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
deleted file mode 100644
index 881d06a..0000000
--- a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.addons.hbase.example;
-
-import org.apache.flink.addons.hbase.TableInputFormat;
-import org.apache.flink.addons.hbase.common.HBaseKey;
-import org.apache.flink.addons.hbase.common.HBaseResult;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.GenericDataSource;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-
-/**
- * Implements a word count which takes the input file and counts the number of
- * the occurrences of each word in the file.
- */
-public class HBaseReadExample implements Program, ProgramDescription {
-
- public static class MyTableInputFormat extends TableInputFormat {
-
- private static final long serialVersionUID = 1L;
-
- private final byte[] META_FAMILY = "meta".getBytes();
-
- private final byte[] USER_COLUMN = "user".getBytes();
-
- private final byte[] TIMESTAMP_COLUMN = "timestamp".getBytes();
-
- private final byte[] TEXT_FAMILY = "text".getBytes();
-
- private final byte[] TWEET_COLUMN = "tweet".getBytes();
-
- public MyTableInputFormat() {
- super();
-
- }
-
- @Override
- protected HTable createTable(Configuration parameters) {
- return super.createTable(parameters);
- }
-
- @Override
- protected Scan createScanner(Configuration parameters) {
- Scan scan = new Scan ();
- scan.addColumn (META_FAMILY, USER_COLUMN);
- scan.addColumn (META_FAMILY, TIMESTAMP_COLUMN);
- scan.addColumn (TEXT_FAMILY, TWEET_COLUMN);
- return scan;
- }
-
- StringValue row_string = new StringValue();
- StringValue user_string = new StringValue();
- StringValue timestamp_string = new StringValue();
- StringValue tweet_string = new StringValue();
-
- @Override
- public void mapResultToRecord(Record record, HBaseKey key,
- HBaseResult result) {
- Result res = result.getResult();
- res.getRow();
- record.setField(0, toString(row_string, res.getRow()));
- record.setField(1, toString (user_string, res.getValue(META_FAMILY, USER_COLUMN)));
- record.setField(2, toString (timestamp_string, res.getValue(META_FAMILY, TIMESTAMP_COLUMN)));
- record.setField(3, toString (tweet_string, res.getValue(TEXT_FAMILY, TWEET_COLUMN)));
- }
-
- private final StringValue toString (StringValue string, byte[] bytes) {
- string.setValueAscii(bytes, 0, bytes.length);
- return string;
- }
-
- }
-
-
- @Override
- public Plan getPlan(String... args) {
- // parse job parameters
- int numSubTasks = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
- String output = (args.length > 1 ? args[1] : "");
-
- GenericDataSource<TableInputFormat> source = new GenericDataSource<TableInputFormat>(new MyTableInputFormat(), "HBase Input");
- source.setParameter(TableInputFormat.INPUT_TABLE, "twitter");
- source.setParameter(TableInputFormat.CONFIG_LOCATION, "/etc/hbase/conf/hbase-site.xml");
- FileDataSink out = new FileDataSink(new CsvOutputFormat(), output, source, "HBase String dump");
- CsvOutputFormat.configureRecordFormat(out)
- .recordDelimiter('\n')
- .fieldDelimiter(' ')
- .field(StringValue.class, 0)
- .field(StringValue.class, 1)
- .field(StringValue.class, 2)
- .field(StringValue.class, 3);
-
- Plan plan = new Plan(out, "HBase access Example");
- plan.setDefaultParallelism(numSubTasks);
- return plan;
- }
-
-
- @Override
- public String getDescription() {
- return "Parameters: [numSubStasks] [input] [output]";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a1100af4/flink-addons/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java b/flink-addons/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
new file mode 100755
index 0000000..b6f345a
--- /dev/null
+++ b/flink-addons/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase.example;
+
+import org.apache.flink.addons.hbase.TableInputFormat;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Simple stub for HBase DataSet
+ *
+ * To run the test first create the test table with hbase shell.
+ *
+ * Use the following commands:
+ * <ul>
+ * <li>create 'test-table', 'someCf'</li>
+ * <li>put 'test-table', '1', 'someCf:someQual', 'someString'</li>
+ * <li>put 'test-table', '2', 'someCf:someQual', 'anotherString'</li>
+ * </ul>
+ *
+ * The test should return just the first entry.
+ *
+ */
+public class HBaseReadExample {
+ public static void main(String[] args) throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ @SuppressWarnings("serial")
+ DataSet<Tuple2<String, String>> hbaseDs = env.createInput(new TableInputFormat<Tuple2<String, String>>() {
+ private final byte[] CF_SOME = "someCf".getBytes();
+ private final byte[] Q_SOME = "someQual".getBytes();
+ @Override
+ public String getTableName() {
+ return "test-table";
+ }
+
+ @Override
+ protected Scan getScanner() {
+ Scan scan = new Scan();
+ scan.addColumn(CF_SOME, Q_SOME);
+ return scan;
+ }
+
+ private Tuple2<String, String> reuse = new Tuple2<String, String>();
+
+ @Override
+ protected Tuple2<String, String> mapResultToTuple(Result r) {
+ String key = Bytes.toString(r.getRow());
+ String val = Bytes.toString(r.getValue(CF_SOME, Q_SOME));
+ reuse.setField(key, 0);
+ reuse.setField(val, 1);
+ return reuse;
+ }
+ })
+ .filter(new FilterFunction<Tuple2<String,String>>() {
+
+ @Override
+ public boolean filter(Tuple2<String, String> t) throws Exception {
+ String val = t.getField(1);
+ if(val.startsWith("someStr"))
+ return true;
+ return false;
+ }
+ });
+
+ hbaseDs.print();
+
+ // kick off execution.
+ env.execute();
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a1100af4/flink-addons/flink-hbase/src/test/resources/hbase-site.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/test/resources/hbase-site.xml b/flink-addons/flink-hbase/src/test/resources/hbase-site.xml
new file mode 100644
index 0000000..2984063
--- /dev/null
+++ b/flink-addons/flink-hbase/src/test/resources/hbase-site.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration>
+
+ <property>
+ <name>hbase.tmp.dir</name>
+ <!--
+ <value>/media/Dati/hbase-0.98-data</value>
+ -->
+ <value>/opt/hbase-0.98.6.1-hadoop2/data</value>
+
+ </property>
+ <property>
+ <name>hbase.zookeeper.quorum</name>
+ <value>localhost</value>
+ </property>
+ <!--
+ <property>
+ <name>hadoop.security.group.mapping</name>
+ <value>org.apache.hadoop.security.ShellBasedUnixGroupsMapping</value>
+ </property>
+ -->
+</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a1100af4/flink-addons/flink-hbase/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/test/resources/log4j.properties b/flink-addons/flink-hbase/src/test/resources/log4j.properties
new file mode 100755
index 0000000..c83ec70
--- /dev/null
+++ b/flink-addons/flink-hbase/src/test/resources/log4j.properties
@@ -0,0 +1,6 @@
+log4j.rootLogger=${hadoop.root.logger}
+hadoop.root.logger=INFO,console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a1100af4/flink-addons/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/pom.xml b/flink-addons/pom.xml
index 6b4083e..1086c56 100644
--- a/flink-addons/pom.xml
+++ b/flink-addons/pom.xml
@@ -52,7 +52,7 @@ under the License.
</property>
</activation>
<modules>
- <!-- No extra modules: pact-hbase is currently not compatible with Hadoop v1 -->
+ <!-- No extra modules: flink-hbase is currently not compatible with Hadoop v1 -->
</modules>
</profile>
<profile>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a1100af4/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
index ce495d1..15fddb2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
@@ -183,9 +183,12 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
Thread currentThread = Thread.currentThread();
ClassLoader context = currentThread.getContextClassLoader();
currentThread.setContextClassLoader(userCodeClassLoader);
- this.invokable.registerInputOutput();
-
- currentThread.setContextClassLoader(context);
+ try {
+ this.invokable.registerInputOutput();
+ }
+ finally {
+ currentThread.setContextClassLoader(context);
+ }
}
List<GateDeploymentDescriptor> inGates = tdd.getInputGates();