You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/05/12 14:01:33 UTC
[5/7] flink git commit: [FLINK-1928] [hbase] Added HBase write example
[FLINK-1928] [hbase] Added HBase write example
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3be621cf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3be621cf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3be621cf
Branch: refs/heads/master
Commit: 3be621cfc270e7aee24b0a5438acbbe191606aea
Parents: cafb876
Author: fpompermaier <f....@gmail.com>
Authored: Mon Apr 27 17:15:18 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue May 12 14:00:49 2015 +0200
----------------------------------------------------------------------
flink-staging/flink-hbase/pom.xml | 38 +++-
.../hbase/example/HBaseFlinkTestConstants.java | 10 +
.../addons/hbase/example/HBaseReadExample.java | 11 +-
.../addons/hbase/example/HBaseWriteExample.java | 202 +++++++++++++++++++
4 files changed, 251 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3be621cf/flink-staging/flink-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hbase/pom.xml b/flink-staging/flink-hbase/pom.xml
index f8c16f3..805ff4b 100644
--- a/flink-staging/flink-hbase/pom.xml
+++ b/flink-staging/flink-hbase/pom.xml
@@ -34,8 +34,8 @@ under the License.
<packaging>jar</packaging>
<properties>
- <hbase.hadoop1.version>0.98.6.1-hadoop1</hbase.hadoop1.version>
- <hbase.hadoop2.version>0.98.6.1-hadoop2</hbase.hadoop2.version>
+ <hbase.hadoop1.version>0.98.11-hadoop1</hbase.hadoop1.version>
+ <hbase.hadoop2.version>0.98.11-hadoop2</hbase.hadoop2.version>
</properties>
<dependencies>
@@ -82,10 +82,12 @@ under the License.
</exclusion>
</exclusions>
</dependency>
-
+
+ <!-- HBase server needed for TableOutputFormat -->
+ <!-- TODO implement bulk output format for HBase -->
<dependency>
<groupId>org.apache.hbase</groupId>
- <artifactId>hbase-client</artifactId>
+ <artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
<exclusions>
<!-- Remove unneeded dependency, which is conflicting with our jetty-util version. -->
@@ -93,6 +95,26 @@ under the License.
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-sslengine</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-2.1</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-api-2.1</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>servlet-api-2.5</artifactId>
+ </exclusion>
<!-- The hadoop dependencies are handled through flink-shaded-hadoop -->
<exclusion>
<groupId>org.apache.hadoop</groupId>
@@ -110,6 +132,14 @@ under the License.
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ </exclusion>
</exclusions>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/flink/blob/3be621cf/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java b/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
new file mode 100644
index 0000000..5881020
--- /dev/null
+++ b/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
@@ -0,0 +1,10 @@
+package org.apache.flink.addons.hbase.example;
+
+public class HBaseFlinkTestConstants {
+
+ public static final byte[] CF_SOME = "someCf".getBytes();
+ public static final byte[] Q_SOME = "someQual".getBytes();
+ public static final String TEST_TABLE_NAME = "test-table";
+ public static final String TMP_DIR = "/tmp/test";
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3be621cf/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java b/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
index b6f345a..dccf876 100644
--- a/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
+++ b/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
/**
- * Simple stub for HBase DataSet
+ * Simple stub for HBase DataSet read
*
* To run the test first create the test table with hbase shell.
*
@@ -47,17 +47,16 @@ public class HBaseReadExample {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@SuppressWarnings("serial")
DataSet<Tuple2<String, String>> hbaseDs = env.createInput(new TableInputFormat<Tuple2<String, String>>() {
- private final byte[] CF_SOME = "someCf".getBytes();
- private final byte[] Q_SOME = "someQual".getBytes();
+
@Override
public String getTableName() {
- return "test-table";
+ return HBaseFlinkTestConstants.TEST_TABLE_NAME;
}
@Override
protected Scan getScanner() {
Scan scan = new Scan();
- scan.addColumn(CF_SOME, Q_SOME);
+ scan.addColumn(HBaseFlinkTestConstants.CF_SOME, HBaseFlinkTestConstants.Q_SOME);
return scan;
}
@@ -66,7 +65,7 @@ public class HBaseReadExample {
@Override
protected Tuple2<String, String> mapResultToTuple(Result r) {
String key = Bytes.toString(r.getRow());
- String val = Bytes.toString(r.getValue(CF_SOME, Q_SOME));
+ String val = Bytes.toString(r.getValue(HBaseFlinkTestConstants.CF_SOME, HBaseFlinkTestConstants.Q_SOME));
reuse.setField(key, 0);
reuse.setField(val, 1);
return reuse;
http://git-wip-us.apache.org/repos/asf/flink/blob/3be621cf/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java b/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
new file mode 100644
index 0000000..483bdff
--- /dev/null
+++ b/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase.example;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+
+/**
+ * Simple stub for HBase DataSet write
+ *
+ * To run the test first create the test table with hbase shell.
+ *
+ * Use the following commands:
+ * <ul>
+ * <li>create 'test-table', 'someCf'</li>
+ * </ul>
+ *
+ */
+@SuppressWarnings("serial")
+public class HBaseWriteExample {
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ public static void main(String[] args) throws Exception {
+
+ if(!parseParameters(args)) {
+ return;
+ }
+
+ // set up the execution environment
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ // get input data
+ DataSet<String> text = getTextDataSet(env);
+
+ DataSet<Tuple2<String, Integer>> counts =
+ // split up the lines in pairs (2-tuples) containing: (word,1)
+ text.flatMap(new Tokenizer())
+ // group by the tuple field "0" and sum up tuple field "1"
+ .groupBy(0)
+ .sum(1);
+
+ // emit result
+ Job job = Job.getInstance();
+ job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, outputTableName);
+ // TODO is "mapred.output.dir" really useful?
+ job.getConfiguration().set("mapred.output.dir",HBaseFlinkTestConstants.TMP_DIR);
+ counts.map(new RichMapFunction <Tuple2<String,Integer>, Tuple2<Text,Mutation>>() {
+ private transient Tuple2<Text, Mutation> reuse;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ reuse = new Tuple2<Text, Mutation>();
+ }
+
+ @Override
+ public Tuple2<Text, Mutation> map(Tuple2<String, Integer> t) throws Exception {
+ reuse.f0 = new Text(t.f0);
+ Put put = new Put(t.f0.getBytes());
+ put.add(HBaseFlinkTestConstants.CF_SOME,HBaseFlinkTestConstants.Q_SOME, Bytes.toBytes(t.f1));
+ reuse.f1 = put;
+ return reuse;
+ }
+ }).output(new HadoopOutputFormat<Text, Mutation>(new TableOutputFormat<Text>(), job));
+
+ // execute program
+ env.execute("WordCount (HBase sink) Example");
+ }
+
+ // *************************************************************************
+ // USER FUNCTIONS
+ // *************************************************************************
+
+ /**
+ * Implements the string tokenizer that splits sentences into words as a user-defined
+ * FlatMapFunction. The function takes a line (String) and splits it into
+ * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
+ */
+ public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
+
+ @Override
+ public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
+ // normalize and split the line
+ String[] tokens = value.toLowerCase().split("\\W+");
+
+ // emit the pairs
+ for (String token : tokens) {
+ if (token.length() > 0) {
+ out.collect(new Tuple2<String, Integer>(token, 1));
+ }
+ }
+ }
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+ private static boolean fileOutput = false;
+ private static String textPath;
+ private static String outputTableName = HBaseFlinkTestConstants.TEST_TABLE_NAME;
+
+ private static boolean parseParameters(String[] args) {
+
+ if(args.length > 0) {
+ // parse input arguments
+ fileOutput = true;
+ if(args.length == 2) {
+ textPath = args[0];
+ outputTableName = args[1];
+ } else {
+ System.err.println("Usage: HBaseWriteExample <text path> <output table>");
+ return false;
+ }
+ } else {
+ System.out.println("Executing HBaseWriteExample example with built-in default data.");
+ System.out.println(" Provide parameters to read input data from a file.");
+ System.out.println(" Usage: HBaseWriteExample <text path> <output table>");
+ }
+ return true;
+ }
+
+ private static DataSet<String> getTextDataSet(ExecutionEnvironment env) {
+ if(fileOutput) {
+ // read the text file from given input path
+ return env.readTextFile(textPath);
+ } else {
+ // get default test text data
+ return getDefaultTextLineDataSet(env);
+ }
+ }
+ private static DataSet<String> getDefaultTextLineDataSet(ExecutionEnvironment env) {
+ return env.fromElements(WORDS);
+ }
+ private static final String[] WORDS = new String[] {
+ "To be, or not to be,--that is the question:--",
+ "Whether 'tis nobler in the mind to suffer",
+ "The slings and arrows of outrageous fortune",
+ "Or to take arms against a sea of troubles,",
+ "And by opposing end them?--To die,--to sleep,--",
+ "No more; and by a sleep to say we end",
+ "The heartache, and the thousand natural shocks",
+ "That flesh is heir to,--'tis a consummation",
+ "Devoutly to be wish'd. To die,--to sleep;--",
+ "To sleep! perchance to dream:--ay, there's the rub;",
+ "For in that sleep of death what dreams may come,",
+ "When we have shuffled off this mortal coil,",
+ "Must give us pause: there's the respect",
+ "That makes calamity of so long life;",
+ "For who would bear the whips and scorns of time,",
+ "The oppressor's wrong, the proud man's contumely,",
+ "The pangs of despis'd love, the law's delay,",
+ "The insolence of office, and the spurns",
+ "That patient merit of the unworthy takes,",
+ "When he himself might his quietus make",
+ "With a bare bodkin? who would these fardels bear,",
+ "To grunt and sweat under a weary life,",
+ "But that the dread of something after death,--",
+ "The undiscover'd country, from whose bourn",
+ "No traveller returns,--puzzles the will,",
+ "And makes us rather bear those ills we have",
+ "Than fly to others that we know not of?",
+ "Thus conscience does make cowards of us all;",
+ "And thus the native hue of resolution",
+ "Is sicklied o'er with the pale cast of thought;",
+ "And enterprises of great pith and moment,",
+ "With this regard, their currents turn awry,",
+ "And lose the name of action.--Soft you now!",
+ "The fair Ophelia!--Nymph, in thy orisons",
+ "Be all my sins remember'd."
+ };
+}
\ No newline at end of file