You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:10 UTC
[20/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors]
Merge batch and streaming connectors into common Maven module.
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java
new file mode 100644
index 0000000..3dddd88
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TableInputFormatITCase extends HBaseTestingClusterAutostarter {
+ private static final String TEST_TABLE_NAME = "TableInputFormatTestTable";
+ private static final byte[] TEST_TABLE_FAMILY_NAME = "F".getBytes();
+ private static final byte[] TEST_TABLE_COLUMN_NAME = "Col".getBytes();
+
+ // These are the row ids AND also the values we will put in the test table
+ private static final String[] ROW_IDS = {"000", "111", "222", "333", "444", "555", "666", "777", "888", "999"};
+
+ @BeforeClass
+ public static void activateHBaseCluster(){
+ registerHBaseMiniClusterInClasspath();
+ }
+
+ @Before
+ public void createTestTable() throws IOException {
+ TableName tableName = TableName.valueOf(TEST_TABLE_NAME);
+ byte[][] splitKeys = {"0".getBytes(), "3".getBytes(), "6".getBytes(), "9".getBytes()};
+ createTable(tableName, TEST_TABLE_FAMILY_NAME, splitKeys);
+ HTable table = openTable(tableName);
+
+ for (String rowId : ROW_IDS) {
+ byte[] rowIdBytes = rowId.getBytes();
+ Put p = new Put(rowIdBytes);
+ // Use the rowId as the value to facilitate the testing better
+ p.add(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME, rowIdBytes);
+ table.put(p);
+ }
+
+ table.close();
+ }
+
+ class InputFormatForTestTable extends TableInputFormat<Tuple1<String>> {
+ @Override
+ protected Scan getScanner() {
+ return new Scan();
+ }
+
+ @Override
+ protected String getTableName() {
+ return TEST_TABLE_NAME;
+ }
+
+ @Override
+ protected Tuple1<String> mapResultToTuple(Result r) {
+ return new Tuple1<>(new String(r.getValue(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME)));
+ }
+ }
+
+ @Test
+ public void testTableInputFormat() {
+ ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
+ environment.setParallelism(1);
+
+ DataSet<String> resultDataSet =
+ environment.createInput(new InputFormatForTestTable()).map(new MapFunction<Tuple1<String>, String>() {
+ @Override
+ public String map(Tuple1<String> value) throws Exception {
+ return value.f0;
+ }
+ });
+
+ List<String> resultSet = new ArrayList<>();
+ resultDataSet.output(new LocalCollectionOutputFormat<>(resultSet));
+
+ try {
+ environment.execute("HBase InputFormat Test");
+ } catch (Exception e) {
+ Assert.fail("HBase InputFormat test failed. " + e.getMessage());
+ }
+
+ for (String rowId : ROW_IDS) {
+ assertTrue("Missing rowId from table: " + rowId, resultSet.contains(rowId));
+ }
+
+ assertEquals("The number of records is wrong.", ROW_IDS.length, resultSet.size());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
new file mode 100644
index 0000000..8579dee
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase.example;
+
+public class HBaseFlinkTestConstants {
+
+ public static final byte[] CF_SOME = "someCf".getBytes();
+ public static final byte[] Q_SOME = "someQual".getBytes();
+ public static final String TEST_TABLE_NAME = "test-table";
+ public static final String TMP_DIR = "/tmp/test";
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
new file mode 100644
index 0000000..dccf876
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase.example;
+
+import org.apache.flink.addons.hbase.TableInputFormat;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Simple stub for HBase DataSet read
+ *
+ * To run the test first create the test table with hbase shell.
+ *
+ * Use the following commands:
+ * <ul>
+ * <li>create 'test-table', 'someCf'</li>
+ * <li>put 'test-table', '1', 'someCf:someQual', 'someString'</li>
+ * <li>put 'test-table', '2', 'someCf:someQual', 'anotherString'</li>
+ * </ul>
+ *
+ * The test should return just the first entry.
+ *
+ */
+public class HBaseReadExample {
+ public static void main(String[] args) throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ @SuppressWarnings("serial")
+ DataSet<Tuple2<String, String>> hbaseDs = env.createInput(new TableInputFormat<Tuple2<String, String>>() {
+
+ @Override
+ public String getTableName() {
+ return HBaseFlinkTestConstants.TEST_TABLE_NAME;
+ }
+
+ @Override
+ protected Scan getScanner() {
+ Scan scan = new Scan();
+ scan.addColumn(HBaseFlinkTestConstants.CF_SOME, HBaseFlinkTestConstants.Q_SOME);
+ return scan;
+ }
+
+ private Tuple2<String, String> reuse = new Tuple2<String, String>();
+
+ @Override
+ protected Tuple2<String, String> mapResultToTuple(Result r) {
+ String key = Bytes.toString(r.getRow());
+ String val = Bytes.toString(r.getValue(HBaseFlinkTestConstants.CF_SOME, HBaseFlinkTestConstants.Q_SOME));
+ reuse.setField(key, 0);
+ reuse.setField(val, 1);
+ return reuse;
+ }
+ })
+ .filter(new FilterFunction<Tuple2<String,String>>() {
+
+ @Override
+ public boolean filter(Tuple2<String, String> t) throws Exception {
+ String val = t.getField(1);
+ if(val.startsWith("someStr"))
+ return true;
+ return false;
+ }
+ });
+
+ hbaseDs.print();
+
+ // kick off execution.
+ env.execute();
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
new file mode 100644
index 0000000..483bdff
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase.example;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+
+/**
+ * Simple stub for HBase DataSet write
+ *
+ * To run the test first create the test table with hbase shell.
+ *
+ * Use the following commands:
+ * <ul>
+ * <li>create 'test-table', 'someCf'</li>
+ * </ul>
+ *
+ */
+@SuppressWarnings("serial")
+public class HBaseWriteExample {
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ public static void main(String[] args) throws Exception {
+
+ if(!parseParameters(args)) {
+ return;
+ }
+
+ // set up the execution environment
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ // get input data
+ DataSet<String> text = getTextDataSet(env);
+
+ DataSet<Tuple2<String, Integer>> counts =
+ // split up the lines in pairs (2-tuples) containing: (word,1)
+ text.flatMap(new Tokenizer())
+ // group by the tuple field "0" and sum up tuple field "1"
+ .groupBy(0)
+ .sum(1);
+
+ // emit result
+ Job job = Job.getInstance();
+ job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, outputTableName);
+ // TODO is "mapred.output.dir" really useful?
+ job.getConfiguration().set("mapred.output.dir",HBaseFlinkTestConstants.TMP_DIR);
+ counts.map(new RichMapFunction <Tuple2<String,Integer>, Tuple2<Text,Mutation>>() {
+ private transient Tuple2<Text, Mutation> reuse;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ reuse = new Tuple2<Text, Mutation>();
+ }
+
+ @Override
+ public Tuple2<Text, Mutation> map(Tuple2<String, Integer> t) throws Exception {
+ reuse.f0 = new Text(t.f0);
+ Put put = new Put(t.f0.getBytes());
+ put.add(HBaseFlinkTestConstants.CF_SOME,HBaseFlinkTestConstants.Q_SOME, Bytes.toBytes(t.f1));
+ reuse.f1 = put;
+ return reuse;
+ }
+ }).output(new HadoopOutputFormat<Text, Mutation>(new TableOutputFormat<Text>(), job));
+
+ // execute program
+ env.execute("WordCount (HBase sink) Example");
+ }
+
+ // *************************************************************************
+ // USER FUNCTIONS
+ // *************************************************************************
+
+ /**
+ * Implements the string tokenizer that splits sentences into words as a user-defined
+ * FlatMapFunction. The function takes a line (String) and splits it into
+ * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
+ */
+ public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
+
+ @Override
+ public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
+ // normalize and split the line
+ String[] tokens = value.toLowerCase().split("\\W+");
+
+ // emit the pairs
+ for (String token : tokens) {
+ if (token.length() > 0) {
+ out.collect(new Tuple2<String, Integer>(token, 1));
+ }
+ }
+ }
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+ private static boolean fileOutput = false;
+ private static String textPath;
+ private static String outputTableName = HBaseFlinkTestConstants.TEST_TABLE_NAME;
+
+ private static boolean parseParameters(String[] args) {
+
+ if(args.length > 0) {
+ // parse input arguments
+ fileOutput = true;
+ if(args.length == 2) {
+ textPath = args[0];
+ outputTableName = args[1];
+ } else {
+ System.err.println("Usage: HBaseWriteExample <text path> <output table>");
+ return false;
+ }
+ } else {
+ System.out.println("Executing HBaseWriteExample example with built-in default data.");
+ System.out.println(" Provide parameters to read input data from a file.");
+ System.out.println(" Usage: HBaseWriteExample <text path> <output table>");
+ }
+ return true;
+ }
+
+ private static DataSet<String> getTextDataSet(ExecutionEnvironment env) {
+ if(fileOutput) {
+ // read the text file from given input path
+ return env.readTextFile(textPath);
+ } else {
+ // get default test text data
+ return getDefaultTextLineDataSet(env);
+ }
+ }
+ private static DataSet<String> getDefaultTextLineDataSet(ExecutionEnvironment env) {
+ return env.fromElements(WORDS);
+ }
+ private static final String[] WORDS = new String[] {
+ "To be, or not to be,--that is the question:--",
+ "Whether 'tis nobler in the mind to suffer",
+ "The slings and arrows of outrageous fortune",
+ "Or to take arms against a sea of troubles,",
+ "And by opposing end them?--To die,--to sleep,--",
+ "No more; and by a sleep to say we end",
+ "The heartache, and the thousand natural shocks",
+ "That flesh is heir to,--'tis a consummation",
+ "Devoutly to be wish'd. To die,--to sleep;--",
+ "To sleep! perchance to dream:--ay, there's the rub;",
+ "For in that sleep of death what dreams may come,",
+ "When we have shuffled off this mortal coil,",
+ "Must give us pause: there's the respect",
+ "That makes calamity of so long life;",
+ "For who would bear the whips and scorns of time,",
+ "The oppressor's wrong, the proud man's contumely,",
+ "The pangs of despis'd love, the law's delay,",
+ "The insolence of office, and the spurns",
+ "That patient merit of the unworthy takes,",
+ "When he himself might his quietus make",
+ "With a bare bodkin? who would these fardels bear,",
+ "To grunt and sweat under a weary life,",
+ "But that the dread of something after death,--",
+ "The undiscover'd country, from whose bourn",
+ "No traveller returns,--puzzles the will,",
+ "And makes us rather bear those ills we have",
+ "Than fly to others that we know not of?",
+ "Thus conscience does make cowards of us all;",
+ "And thus the native hue of resolution",
+ "Is sicklied o'er with the pale cast of thought;",
+ "And enterprises of great pith and moment,",
+ "With this regard, their currents turn awry,",
+ "And lose the name of action.--Soft you now!",
+ "The fair Ophelia!--Nymph, in thy orisons",
+ "Be all my sins remember'd."
+ };
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
new file mode 100644
index 0000000..05398db
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase.example;
+
+import java.io.IOException;
+
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ *
+ * This is an example how to write streams into HBase. In this example the
+ * stream will be written into a local Hbase but it is possible to adapt this
+ * example for an HBase running in a cloud. You need a running local HBase with a
+ * table "flinkExample" and a column "entry". If your HBase configuration does
+ * not fit the hbase-site.xml in the resource folder then you gave to delete temporary this
+ * hbase-site.xml to execute the example properly.
+ *
+ */
+public class HBaseWriteStreamExample {
+
+ public static void main(String[] args) throws Exception {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment
+ .getExecutionEnvironment();
+
+ // data stream with random numbers
+ DataStream<String> dataStream = env.addSource(new SourceFunction<String>() {
+ private static final long serialVersionUID = 1L;
+
+ private volatile boolean isRunning = true;
+
+ @Override
+ public void run(SourceContext<String> out) throws Exception {
+ while (isRunning) {
+ out.collect(String.valueOf(Math.floor(Math.random() * 100)));
+ }
+
+ }
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+ }
+ });
+ dataStream.writeUsingOutputFormat(new HBaseOutputFormat());
+
+ env.execute();
+ }
+
+ /**
+ *
+ * This class implements an OutputFormat for HBase
+ *
+ */
+ private static class HBaseOutputFormat implements OutputFormat<String> {
+
+ private org.apache.hadoop.conf.Configuration conf = null;
+ private HTable table = null;
+ private String taskNumber = null;
+ private int rowNumber = 0;
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void configure(Configuration parameters) {
+ conf = HBaseConfiguration.create();
+ }
+
+ @Override
+ public void open(int taskNumber, int numTasks) throws IOException {
+ table = new HTable(conf, "flinkExample");
+ this.taskNumber = String.valueOf(taskNumber);
+ }
+
+ @Override
+ public void writeRecord(String record) throws IOException {
+ Put put = new Put(Bytes.toBytes(taskNumber + rowNumber));
+ put.add(Bytes.toBytes("entry"), Bytes.toBytes("entry"),
+ Bytes.toBytes(rowNumber));
+ rowNumber++;
+ table.put(put);
+ }
+
+ @Override
+ public void close() throws IOException {
+ table.flushCommits();
+ table.close();
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hbase/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/resources/log4j-test.properties b/flink-connectors/flink-hbase/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..804ff45
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/test/resources/log4j-test.properties
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+log4j.rootLogger=DEBUG, stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.threshold=INFO
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %-5p %30c{1}:%4L - %m%n
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hcatalog/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hcatalog/pom.xml b/flink-connectors/flink-hcatalog/pom.xml
new file mode 100644
index 0000000..dde7996
--- /dev/null
+++ b/flink-connectors/flink-hcatalog/pom.xml
@@ -0,0 +1,182 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connectors</artifactId>
+ <version>1.2-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-hcatalog</artifactId>
+ <name>flink-hcatalog</name>
+
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-hadoop-compatibility_2.10</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hive.hcatalog</groupId>
+ <artifactId>hcatalog-core</artifactId>
+ <version>0.12.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.json</groupId>
+ <artifactId>json</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <!-- Scala Compiler -->
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <executions>
+ <!-- Run scala compiler in the process-resources phase, so that dependencies on
+ scala classes can be resolved later in the (Java) compile phase -->
+ <execution>
+ <id>scala-compile-first</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+
+ <!-- Run scala compiler in the process-test-resources phase, so that dependencies on
+ scala classes can be resolved later in the (Java) test-compile phase -->
+ <execution>
+ <id>scala-test-compile</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <jvmArgs>
+ <jvmArg>-Xms128m</jvmArg>
+ <jvmArg>-Xmx512m</jvmArg>
+ </jvmArgs>
+ </configuration>
+ </plugin>
+
+ <!-- Eclipse Integration -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-eclipse-plugin</artifactId>
+ <version>2.8</version>
+ <configuration>
+ <downloadSources>true</downloadSources>
+ <projectnatures>
+ <projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+ <projectnature>org.eclipse.jdt.core.javanature</projectnature>
+ </projectnatures>
+ <buildcommands>
+ <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+ </buildcommands>
+ <classpathContainers>
+ <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+ <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+ </classpathContainers>
+ <excludes>
+ <exclude>org.scala-lang:scala-library</exclude>
+ <exclude>org.scala-lang:scala-compiler</exclude>
+ </excludes>
+ <sourceIncludes>
+ <sourceInclude>**/*.scala</sourceInclude>
+ <sourceInclude>**/*.java</sourceInclude>
+ </sourceIncludes>
+ </configuration>
+ </plugin>
+
+ <!-- Adding scala source directories to build path -->
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.7</version>
+ <executions>
+ <!-- Add src/main/scala to eclipse build path -->
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/main/scala</source>
+ </sources>
+ </configuration>
+ </execution>
+ <!-- Add src/test/scala to eclipse build path -->
+ <execution>
+ <id>add-test-source</id>
+ <phase>generate-test-sources</phase>
+ <goals>
+ <goal>add-test-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/test/scala</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- Scala Code Style, most of the configuration done via plugin management -->
+ <plugin>
+ <groupId>org.scalastyle</groupId>
+ <artifactId>scalastyle-maven-plugin</artifactId>
+ <configuration>
+ <configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
+ </configuration>
+ </plugin>
+
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java b/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
new file mode 100644
index 0000000..859b706
--- /dev/null
+++ b/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hcatalog;
+
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils;
+import org.apache.flink.api.java.hadoop.mapreduce.wrapper.HadoopInputSplit;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.WritableTypeInfo;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hive.hcatalog.common.HCatException;
+import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.hive.hcatalog.data.DefaultHCatRecord;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hive.hcatalog.data.schema.HCatSchema;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A InputFormat to read from HCatalog tables.
+ * The InputFormat supports projection (selection and order of fields) and partition filters.
+ *
+ * Data can be returned as {@link org.apache.hive.hcatalog.data.HCatRecord} or Flink-native tuple.
+ *
+ * Note: Flink tuples might only support a limited number of fields (depending on the API).
+ *
+ * @param <T>
+ */
+public abstract class HCatInputFormatBase<T> extends RichInputFormat<T, HadoopInputSplit> implements ResultTypeQueryable<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ private Configuration configuration;
+
+ private org.apache.hive.hcatalog.mapreduce.HCatInputFormat hCatInputFormat;
+ private RecordReader<WritableComparable, HCatRecord> recordReader;
+ private boolean fetched = false;
+ private boolean hasNext;
+
+ protected String[] fieldNames = new String[0];
+ protected HCatSchema outputSchema;
+
+ private TypeInformation<T> resultType;
+
+ public HCatInputFormatBase() { }
+
+ /**
+ * Creates a HCatInputFormat for the given database and table.
+ * By default, the InputFormat returns {@link org.apache.hive.hcatalog.data.HCatRecord}.
+ * The return type of the InputFormat can be changed to Flink-native tuples by calling
+ * {@link HCatInputFormatBase#asFlinkTuples()}.
+ *
+ * @param database The name of the database to read from.
+ * @param table The name of the table to read.
+ * @throws java.io.IOException
+ */
+ public HCatInputFormatBase(String database, String table) throws IOException {
+ this(database, table, new Configuration());
+ }
+
+ /**
+ * Creates a HCatInputFormat for the given database, table, and
+ * {@link org.apache.hadoop.conf.Configuration}.
+ * By default, the InputFormat returns {@link org.apache.hive.hcatalog.data.HCatRecord}.
+ * The return type of the InputFormat can be changed to Flink-native tuples by calling
+ * {@link HCatInputFormatBase#asFlinkTuples()}.
+ *
+ * @param database The name of the database to read from.
+ * @param table The name of the table to read.
+ * @param config The Configuration for the InputFormat.
+ * @throws java.io.IOException
+ */
+ public HCatInputFormatBase(String database, String table, Configuration config) throws IOException {
+ super();
+ this.configuration = config;
+ HadoopUtils.mergeHadoopConf(this.configuration);
+
+ this.hCatInputFormat = org.apache.hive.hcatalog.mapreduce.HCatInputFormat.setInput(this.configuration, database, table);
+ this.outputSchema = org.apache.hive.hcatalog.mapreduce.HCatInputFormat.getTableSchema(this.configuration);
+
+ // configure output schema of HCatFormat
+ configuration.set("mapreduce.lib.hcat.output.schema", HCatUtil.serialize(outputSchema));
+ // set type information
+ this.resultType = new WritableTypeInfo(DefaultHCatRecord.class);
+ }
+
+ /**
+ * Specifies the fields which are returned by the InputFormat and their order.
+ *
+ * @param fields The fields and their order which are returned by the InputFormat.
+ * @return This InputFormat with specified return fields.
+ * @throws java.io.IOException
+ */
+ public HCatInputFormatBase<T> getFields(String... fields) throws IOException {
+
+ // build output schema
+ ArrayList<HCatFieldSchema> fieldSchemas = new ArrayList<HCatFieldSchema>(fields.length);
+ for(String field : fields) {
+ fieldSchemas.add(this.outputSchema.get(field));
+ }
+ this.outputSchema = new HCatSchema(fieldSchemas);
+
+ // update output schema configuration
+ configuration.set("mapreduce.lib.hcat.output.schema", HCatUtil.serialize(outputSchema));
+
+ return this;
+ }
+
+ /**
+ * Specifies a SQL-like filter condition on the table's partition columns.
+ * Filter conditions on non-partition columns are invalid.
+ * A partition filter can significantly reduce the amount of data to be read.
+ *
+ * @param filter A SQL-like filter condition on the table's partition columns.
+ * @return This InputFormat with specified partition filter.
+ * @throws java.io.IOException
+ */
+ public HCatInputFormatBase<T> withFilter(String filter) throws IOException {
+
+ // set filter
+ this.hCatInputFormat.setFilter(filter);
+
+ return this;
+ }
+
+ /**
+ * Specifies that the InputFormat returns Flink tuples instead of
+ * {@link org.apache.hive.hcatalog.data.HCatRecord}.
+ *
+ * Note: Flink tuples might only support a limited number of fields (depending on the API).
+ *
+ * @return This InputFormat.
+ * @throws org.apache.hive.hcatalog.common.HCatException
+ */
+ public HCatInputFormatBase<T> asFlinkTuples() throws HCatException {
+
+ // build type information
+ int numFields = outputSchema.getFields().size();
+ if(numFields > this.getMaxFlinkTupleSize()) {
+ throw new IllegalArgumentException("Only up to "+this.getMaxFlinkTupleSize()+
+ " fields can be returned as Flink tuples.");
+ }
+
+ TypeInformation[] fieldTypes = new TypeInformation[numFields];
+ fieldNames = new String[numFields];
+ for (String fieldName : outputSchema.getFieldNames()) {
+ HCatFieldSchema field = outputSchema.get(fieldName);
+
+ int fieldPos = outputSchema.getPosition(fieldName);
+ TypeInformation fieldType = getFieldType(field);
+
+ fieldTypes[fieldPos] = fieldType;
+ fieldNames[fieldPos] = fieldName;
+
+ }
+ this.resultType = new TupleTypeInfo(fieldTypes);
+
+ return this;
+ }
+
+ protected abstract int getMaxFlinkTupleSize();
+
+ private TypeInformation getFieldType(HCatFieldSchema fieldSchema) {
+
+ switch(fieldSchema.getType()) {
+ case INT:
+ return BasicTypeInfo.INT_TYPE_INFO;
+ case TINYINT:
+ return BasicTypeInfo.BYTE_TYPE_INFO;
+ case SMALLINT:
+ return BasicTypeInfo.SHORT_TYPE_INFO;
+ case BIGINT:
+ return BasicTypeInfo.LONG_TYPE_INFO;
+ case BOOLEAN:
+ return BasicTypeInfo.BOOLEAN_TYPE_INFO;
+ case FLOAT:
+ return BasicTypeInfo.FLOAT_TYPE_INFO;
+ case DOUBLE:
+ return BasicTypeInfo.DOUBLE_TYPE_INFO;
+ case STRING:
+ return BasicTypeInfo.STRING_TYPE_INFO;
+ case BINARY:
+ return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
+ case ARRAY:
+ return new GenericTypeInfo(List.class);
+ case MAP:
+ return new GenericTypeInfo(Map.class);
+ case STRUCT:
+ return new GenericTypeInfo(List.class);
+ default:
+ throw new IllegalArgumentException("Unknown data type \""+fieldSchema.getType()+"\" encountered.");
+ }
+ }
+
+ /**
+ * Returns the {@link org.apache.hadoop.conf.Configuration} of the HCatInputFormat.
+ *
+ * @return The Configuration of the HCatInputFormat.
+ */
+ public Configuration getConfiguration() {
+ return this.configuration;
+ }
+
+ /**
+ * Returns the {@link org.apache.hive.hcatalog.data.schema.HCatSchema} of the {@link org.apache.hive.hcatalog.data.HCatRecord}
+ * returned by this InputFormat.
+ *
+ * @return The HCatSchema of the HCatRecords returned by this InputFormat.
+ */
+ public HCatSchema getOutputSchema() {
+ return this.outputSchema;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // InputFormat
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void configure(org.apache.flink.configuration.Configuration parameters) {
+ // nothing to do
+ }
+
+ @Override
+ public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException {
+ // no statistics provided at the moment
+ return null;
+ }
+
+ @Override
+ public HadoopInputSplit[] createInputSplits(int minNumSplits)
+ throws IOException {
+ configuration.setInt("mapreduce.input.fileinputformat.split.minsize", minNumSplits);
+
+ JobContext jobContext = null;
+ try {
+ jobContext = HadoopUtils.instantiateJobContext(configuration, new JobID());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ List<InputSplit> splits;
+ try {
+ splits = this.hCatInputFormat.getSplits(jobContext);
+ } catch (InterruptedException e) {
+ throw new IOException("Could not get Splits.", e);
+ }
+ HadoopInputSplit[] hadoopInputSplits = new HadoopInputSplit[splits.size()];
+
+ for(int i = 0; i < hadoopInputSplits.length; i++){
+ hadoopInputSplits[i] = new HadoopInputSplit(i, splits.get(i), jobContext);
+ }
+ return hadoopInputSplits;
+ }
+
+ @Override
+ public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] inputSplits) {
+ return new LocatableInputSplitAssigner(inputSplits);
+ }
+
+ @Override
+ public void open(HadoopInputSplit split) throws IOException {
+ TaskAttemptContext context = null;
+ try {
+ context = HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID());
+ } catch(Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ try {
+ this.recordReader = this.hCatInputFormat
+ .createRecordReader(split.getHadoopInputSplit(), context);
+ this.recordReader.initialize(split.getHadoopInputSplit(), context);
+ } catch (InterruptedException e) {
+ throw new IOException("Could not create RecordReader.", e);
+ } finally {
+ this.fetched = false;
+ }
+ }
+
+ @Override
+ public boolean reachedEnd() throws IOException {
+ if(!this.fetched) {
+ fetchNext();
+ }
+ return !this.hasNext;
+ }
+
+ private void fetchNext() throws IOException {
+ try {
+ this.hasNext = this.recordReader.nextKeyValue();
+ } catch (InterruptedException e) {
+ throw new IOException("Could not fetch next KeyValue pair.", e);
+ } finally {
+ this.fetched = true;
+ }
+ }
+
+ @Override
+ public T nextRecord(T record) throws IOException {
+ if(!this.fetched) {
+ // first record
+ fetchNext();
+ }
+ if(!this.hasNext) {
+ return null;
+ }
+ try {
+
+ // get next HCatRecord
+ HCatRecord v = this.recordReader.getCurrentValue();
+ this.fetched = false;
+
+ if(this.fieldNames.length > 0) {
+ // return as Flink tuple
+ return this.buildFlinkTuple(record, v);
+
+ } else {
+ // return as HCatRecord
+ return (T)v;
+ }
+
+ } catch (InterruptedException e) {
+ throw new IOException("Could not get next record.", e);
+ }
+ }
+
+ protected abstract T buildFlinkTuple(T t, HCatRecord record) throws HCatException;
+
+ @Override
+ public void close() throws IOException {
+ this.recordReader.close();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Custom de/serialization methods
+ // --------------------------------------------------------------------------------------------
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ out.writeInt(this.fieldNames.length);
+ for(String fieldName : this.fieldNames) {
+ out.writeUTF(fieldName);
+ }
+ this.configuration.write(out);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ this.fieldNames = new String[in.readInt()];
+ for(int i=0; i<this.fieldNames.length; i++) {
+ this.fieldNames[i] = in.readUTF();
+ }
+
+ Configuration configuration = new Configuration();
+ configuration.readFields(in);
+
+ if(this.configuration == null) {
+ this.configuration = configuration;
+ }
+
+ this.hCatInputFormat = new org.apache.hive.hcatalog.mapreduce.HCatInputFormat();
+ this.outputSchema = (HCatSchema)HCatUtil.deserialize(this.configuration.get("mapreduce.lib.hcat.output.schema"));
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Result type business
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public TypeInformation<T> getProducedType() {
+ return this.resultType;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java b/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
new file mode 100644
index 0000000..46f3cd5
--- /dev/null
+++ b/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hcatalog.java;
+
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.hcatalog.HCatInputFormatBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hive.hcatalog.common.HCatException;
+import org.apache.hive.hcatalog.data.HCatRecord;
+
+/**
+ * A InputFormat to read from HCatalog tables.
+ * The InputFormat supports projection (selection and order of fields) and partition filters.
+ *
+ * Data can be returned as {@link HCatRecord} or Flink {@link org.apache.flink.api.java.tuple.Tuple}.
+ * Flink tuples support only up to 25 fields.
+ *
+ * @param <T>
+ */
+public class HCatInputFormat<T> extends HCatInputFormatBase<T> {
+ private static final long serialVersionUID = 1L;
+
+ public HCatInputFormat() {}
+
+ public HCatInputFormat(String database, String table) throws Exception {
+ super(database, table);
+ }
+
+ public HCatInputFormat(String database, String table, Configuration config) throws Exception {
+ super(database, table, config);
+ }
+
+
+ @Override
+ protected int getMaxFlinkTupleSize() {
+ return 25;
+ }
+
+ @Override
+ protected T buildFlinkTuple(T t, HCatRecord record) throws HCatException {
+
+ Tuple tuple = (Tuple)t;
+
+ // Extract all fields from HCatRecord
+ for(int i=0; i < this.fieldNames.length; i++) {
+
+ // get field value
+ Object o = record.get(this.fieldNames[i], this.outputSchema);
+
+ // Set field value in Flink tuple.
+ // Partition columns are returned as String and
+ // need to be converted to original type.
+ switch(this.outputSchema.get(i).getType()) {
+ case INT:
+ if(o instanceof String) {
+ tuple.setField(Integer.parseInt((String) o), i);
+ } else {
+ tuple.setField(o, i);
+ }
+ break;
+ case TINYINT:
+ if(o instanceof String) {
+ tuple.setField(Byte.parseByte((String) o), i);
+ } else {
+ tuple.setField(o, i);
+ }
+ break;
+ case SMALLINT:
+ if(o instanceof String) {
+ tuple.setField(Short.parseShort((String) o), i);
+ } else {
+ tuple.setField(o, i);
+ }
+ break;
+ case BIGINT:
+ if(o instanceof String) {
+ tuple.setField(Long.parseLong((String) o), i);
+ } else {
+ tuple.setField(o, i);
+ }
+ break;
+ case BOOLEAN:
+ if(o instanceof String) {
+ tuple.setField(Boolean.parseBoolean((String) o), i);
+ } else {
+ tuple.setField(o, i);
+ }
+ break;
+ case FLOAT:
+ if(o instanceof String) {
+ tuple.setField(Float.parseFloat((String) o), i);
+ } else {
+ tuple.setField(o, i);
+ }
+ break;
+ case DOUBLE:
+ if(o instanceof String) {
+ tuple.setField(Double.parseDouble((String) o), i);
+ } else {
+ tuple.setField(o, i);
+ }
+ break;
+ case STRING:
+ tuple.setField(o, i);
+ break;
+ case BINARY:
+ if(o instanceof String) {
+ throw new RuntimeException("Cannot handle partition keys of type BINARY.");
+ } else {
+ tuple.setField(o, i);
+ }
+ break;
+ case ARRAY:
+ if(o instanceof String) {
+ throw new RuntimeException("Cannot handle partition keys of type ARRAY.");
+ } else {
+ tuple.setField(o, i);
+ }
+ break;
+ case MAP:
+ if(o instanceof String) {
+ throw new RuntimeException("Cannot handle partition keys of type MAP.");
+ } else {
+ tuple.setField(o, i);
+ }
+ break;
+ case STRUCT:
+ if(o instanceof String) {
+ throw new RuntimeException("Cannot handle partition keys of type STRUCT.");
+ } else {
+ tuple.setField(o, i);
+ }
+ break;
+ default:
+ throw new RuntimeException("Invalid Type");
+ }
+ }
+
+ return (T)tuple;
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala b/flink-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
new file mode 100644
index 0000000..0299ee1
--- /dev/null
+++ b/flink-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hcatalog.scala
+
+import org.apache.flink.configuration
+import org.apache.flink.hcatalog.HCatInputFormatBase
+import org.apache.hadoop.conf.Configuration
+import org.apache.hive.hcatalog.data.HCatRecord
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema
+
+/**
+ * A InputFormat to read from HCatalog tables.
+ * The InputFormat supports projection (selection and order of fields) and partition filters.
+ *
+ * Data can be returned as [[HCatRecord]] or Scala tuples.
+ * Scala tuples support only up to 22 fields.
+ *
+ */
+class HCatInputFormat[T](
+ database: String,
+ table: String,
+ config: Configuration
+ ) extends HCatInputFormatBase[T](database, table, config) {
+
+ def this(database: String, table: String) {
+ this(database, table, new Configuration)
+ }
+
+ var vals: Array[Any] = Array[Any]()
+
+ override def configure(parameters: configuration.Configuration): Unit = {
+ super.configure(parameters)
+ vals = new Array[Any](fieldNames.length)
+ }
+
+ override protected def getMaxFlinkTupleSize: Int = 22
+
+ override protected def buildFlinkTuple(t: T, record: HCatRecord): T = {
+
+ // Extract all fields from HCatRecord
+ var i: Int = 0
+ while (i < this.fieldNames.length) {
+
+ val o: AnyRef = record.get(this.fieldNames(i), this.outputSchema)
+
+ // partition columns are returned as String
+ // Check and convert to actual type.
+ this.outputSchema.get(i).getType match {
+ case HCatFieldSchema.Type.INT =>
+ if (o.isInstanceOf[String]) {
+ vals(i) = o.asInstanceOf[String].toInt
+ }
+ else {
+ vals(i) = o.asInstanceOf[Int]
+ }
+ case HCatFieldSchema.Type.TINYINT =>
+ if (o.isInstanceOf[String]) {
+ vals(i) = o.asInstanceOf[String].toInt.toByte
+ }
+ else {
+ vals(i) = o.asInstanceOf[Byte]
+ }
+ case HCatFieldSchema.Type.SMALLINT =>
+ if (o.isInstanceOf[String]) {
+ vals(i) = o.asInstanceOf[String].toInt.toShort
+ }
+ else {
+ vals(i) = o.asInstanceOf[Short]
+ }
+ case HCatFieldSchema.Type.BIGINT =>
+ if (o.isInstanceOf[String]) {
+ vals(i) = o.asInstanceOf[String].toLong
+ }
+ else {
+ vals(i) = o.asInstanceOf[Long]
+ }
+ case HCatFieldSchema.Type.BOOLEAN =>
+ if (o.isInstanceOf[String]) {
+ vals(i) = o.asInstanceOf[String].toBoolean
+ }
+ else {
+ vals(i) = o.asInstanceOf[Boolean]
+ }
+ case HCatFieldSchema.Type.FLOAT =>
+ if (o.isInstanceOf[String]) {
+ vals(i) = o.asInstanceOf[String].toFloat
+ }
+ else {
+ vals(i) = o.asInstanceOf[Float]
+ }
+ case HCatFieldSchema.Type.DOUBLE =>
+ if (o.isInstanceOf[String]) {
+ vals(i) = o.asInstanceOf[String].toDouble
+ }
+ else {
+ vals(i) = o.asInstanceOf[Double]
+ }
+ case HCatFieldSchema.Type.STRING =>
+ vals(i) = o
+ case HCatFieldSchema.Type.BINARY =>
+ if (o.isInstanceOf[String]) {
+ throw new RuntimeException("Cannot handle partition keys of type BINARY.")
+ }
+ else {
+ vals(i) = o.asInstanceOf[Array[Byte]]
+ }
+ case HCatFieldSchema.Type.ARRAY =>
+ if (o.isInstanceOf[String]) {
+ throw new RuntimeException("Cannot handle partition keys of type ARRAY.")
+ }
+ else {
+ vals(i) = o.asInstanceOf[List[Object]]
+ }
+ case HCatFieldSchema.Type.MAP =>
+ if (o.isInstanceOf[String]) {
+ throw new RuntimeException("Cannot handle partition keys of type MAP.")
+ }
+ else {
+ vals(i) = o.asInstanceOf[Map[Object, Object]]
+ }
+ case HCatFieldSchema.Type.STRUCT =>
+ if (o.isInstanceOf[String]) {
+ throw new RuntimeException("Cannot handle partition keys of type STRUCT.")
+ }
+ else {
+ vals(i) = o.asInstanceOf[List[Object]]
+ }
+ case _ =>
+ throw new RuntimeException("Invalid type " + this.outputSchema.get(i).getType +
+ " encountered.")
+ }
+
+ i += 1
+ }
+ createScalaTuple(vals)
+ }
+
+ private def createScalaTuple(vals: Array[Any]): T = {
+
+ this.fieldNames.length match {
+ case 1 =>
+ new Tuple1(vals(0)).asInstanceOf[T]
+ case 2 =>
+ new Tuple2(vals(0), vals(1)).asInstanceOf[T]
+ case 3 =>
+ new Tuple3(vals(0), vals(1), vals(2)).asInstanceOf[T]
+ case 4 =>
+ new Tuple4(vals(0), vals(1), vals(2), vals(3)).asInstanceOf[T]
+ case 5 =>
+ new Tuple5(vals(0), vals(1), vals(2), vals(3), vals(4)).asInstanceOf[T]
+ case 6 =>
+ new Tuple6(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5)).asInstanceOf[T]
+ case 7 =>
+ new Tuple7(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6)).asInstanceOf[T]
+ case 8 =>
+ new Tuple8(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7))
+ .asInstanceOf[T]
+ case 9 =>
+ new Tuple9(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+ vals(8)).asInstanceOf[T]
+ case 10 =>
+ new Tuple10(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+ vals(8), vals(9)).asInstanceOf[T]
+ case 11 =>
+ new Tuple11(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+ vals(8), vals(9), vals(10)).asInstanceOf[T]
+ case 12 =>
+ new Tuple12(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+ vals(8), vals(9), vals(10), vals(11)).asInstanceOf[T]
+ case 13 =>
+ new Tuple13(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+ vals(8), vals(9), vals(10), vals(11), vals(12)).asInstanceOf[T]
+ case 14 =>
+ new Tuple14(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+ vals(8), vals(9), vals(10), vals(11), vals(12), vals(13)).asInstanceOf[T]
+ case 15 =>
+ new Tuple15(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+ vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14)).asInstanceOf[T]
+ case 16 =>
+ new Tuple16(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+ vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15))
+ .asInstanceOf[T]
+ case 17 =>
+ new Tuple17(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+ vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15),
+ vals(16)).asInstanceOf[T]
+ case 18 =>
+ new Tuple18(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+ vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15),
+ vals(16), vals(17)).asInstanceOf[T]
+ case 19 =>
+ new Tuple19(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+ vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15),
+ vals(16), vals(17), vals(18)).asInstanceOf[T]
+ case 20 =>
+ new Tuple20(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+ vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15),
+ vals(16), vals(17), vals(18), vals(19)).asInstanceOf[T]
+ case 21 =>
+ new Tuple21(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+ vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15),
+ vals(16), vals(17), vals(18), vals(19), vals(20)).asInstanceOf[T]
+ case 22 =>
+ new Tuple22(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+ vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15),
+ vals(16), vals(17), vals(18), vals(19), vals(20), vals(21)).asInstanceOf[T]
+ case _ =>
+ throw new RuntimeException("Only up to 22 fields supported for Scala Tuples.")
+
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/pom.xml b/flink-connectors/flink-jdbc/pom.xml
new file mode 100644
index 0000000..be42648
--- /dev/null
+++ b/flink-connectors/flink-jdbc/pom.xml
@@ -0,0 +1,66 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connectors</artifactId>
+ <version>1.2-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-jdbc</artifactId>
+ <name>flink-jdbc</name>
+
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table_2.10</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_2.10</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.derby</groupId>
+ <artifactId>derby</artifactId>
+ <version>10.10.1.1</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
new file mode 100644
index 0000000..b4246f5
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Array;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Arrays;
+
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * InputFormat to read data from a database and generate Rows.
+ * The InputFormat has to be configured using the supplied InputFormatBuilder.
+ * A valid RowTypeInfo must be properly configured in the builder, e.g.: </br>
+ *
+ * <pre><code>
+ * TypeInformation<?>[] fieldTypes = new TypeInformation<?>[] {
+ * BasicTypeInfo.INT_TYPE_INFO,
+ * BasicTypeInfo.STRING_TYPE_INFO,
+ * BasicTypeInfo.STRING_TYPE_INFO,
+ * BasicTypeInfo.DOUBLE_TYPE_INFO,
+ * BasicTypeInfo.INT_TYPE_INFO
+ * };
+ *
+ * RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
+ *
+ * JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+ * .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+ * .setDBUrl("jdbc:derby:memory:ebookshop")
+ * .setQuery("select * from books")
+ * .setRowTypeInfo(rowTypeInfo)
+ * .finish();
+ * </code></pre>
+ *
+ * In order to query the JDBC source in parallel, you need to provide a
+ * parameterized query template (i.e. a valid {@link PreparedStatement}) and
+ * a {@link ParameterValuesProvider} which provides binding values for the
+ * query parameters. E.g.:</br>
+ *
+ * <pre><code>
+ *
+ * Serializable[][] queryParameters = new String[2][1];
+ * queryParameters[0] = new String[]{"Kumar"};
+ * queryParameters[1] = new String[]{"Tan Ah Teck"};
+ *
+ * JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+ * .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+ * .setDBUrl("jdbc:derby:memory:ebookshop")
+ * .setQuery("select * from books WHERE author = ?")
+ * .setRowTypeInfo(rowTypeInfo)
+ * .setParametersProvider(new GenericParameterValuesProvider(queryParameters))
+ * .finish();
+ * </code></pre>
+ *
+ * @see Row
+ * @see ParameterValuesProvider
+ * @see PreparedStatement
+ * @see DriverManager
+ */
+public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements ResultTypeQueryable {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG = LoggerFactory.getLogger(JDBCInputFormat.class);
+
+ private String username;
+ private String password;
+ private String drivername;
+ private String dbURL;
+ private String queryTemplate;
+ private int resultSetType;
+ private int resultSetConcurrency;
+ private RowTypeInfo rowTypeInfo;
+
+ private transient Connection dbConn;
+ private transient PreparedStatement statement;
+ private transient ResultSet resultSet;
+
+ private boolean hasNext;
+ private Object[][] parameterValues;
+
+ public JDBCInputFormat() {
+ }
+
+ @Override
+ public RowTypeInfo getProducedType() {
+ return rowTypeInfo;
+ }
+
+ @Override
+ public void configure(Configuration parameters) {
+ //do nothing here
+ }
+
+ @Override
+ public void openInputFormat() {
+ //called once per inputFormat (on open)
+ try {
+ Class.forName(drivername);
+ if (username == null) {
+ dbConn = DriverManager.getConnection(dbURL);
+ } else {
+ dbConn = DriverManager.getConnection(dbURL, username, password);
+ }
+ statement = dbConn.prepareStatement(queryTemplate, resultSetType, resultSetConcurrency);
+ } catch (SQLException se) {
+ throw new IllegalArgumentException("open() failed." + se.getMessage(), se);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IllegalArgumentException("JDBC-Class not found. - " + cnfe.getMessage(), cnfe);
+ }
+ }
+
+ @Override
+ public void closeInputFormat() {
+ //called once per inputFormat (on close)
+ try {
+ if(statement != null) {
+ statement.close();
+ }
+ } catch (SQLException se) {
+ LOG.info("Inputformat Statement couldn't be closed - " + se.getMessage());
+ } finally {
+ statement = null;
+ }
+
+ try {
+ if(dbConn != null) {
+ dbConn.close();
+ }
+ } catch (SQLException se) {
+ LOG.info("Inputformat couldn't be closed - " + se.getMessage());
+ } finally {
+ dbConn = null;
+ }
+
+ parameterValues = null;
+ }
+
+ /**
+ * Connects to the source database and executes the query in a <b>parallel
+ * fashion</b> if
+ * this {@link InputFormat} is built using a parameterized query (i.e. using
+ * a {@link PreparedStatement})
+ * and a proper {@link ParameterValuesProvider}, in a <b>non-parallel
+ * fashion</b> otherwise.
+ *
+ * @param inputSplit which is ignored if this InputFormat is executed as a
+ * non-parallel source,
+ * a "hook" to the query parameters otherwise (using its
+ * <i>splitNumber</i>)
+ * @throws IOException if there's an error during the execution of the query
+ */
+ @Override
+ public void open(InputSplit inputSplit) throws IOException {
+ try {
+ if (inputSplit != null && parameterValues != null) {
+ for (int i = 0; i < parameterValues[inputSplit.getSplitNumber()].length; i++) {
+ Object param = parameterValues[inputSplit.getSplitNumber()][i];
+ if (param instanceof String) {
+ statement.setString(i + 1, (String) param);
+ } else if (param instanceof Long) {
+ statement.setLong(i + 1, (Long) param);
+ } else if (param instanceof Integer) {
+ statement.setInt(i + 1, (Integer) param);
+ } else if (param instanceof Double) {
+ statement.setDouble(i + 1, (Double) param);
+ } else if (param instanceof Boolean) {
+ statement.setBoolean(i + 1, (Boolean) param);
+ } else if (param instanceof Float) {
+ statement.setFloat(i + 1, (Float) param);
+ } else if (param instanceof BigDecimal) {
+ statement.setBigDecimal(i + 1, (BigDecimal) param);
+ } else if (param instanceof Byte) {
+ statement.setByte(i + 1, (Byte) param);
+ } else if (param instanceof Short) {
+ statement.setShort(i + 1, (Short) param);
+ } else if (param instanceof Date) {
+ statement.setDate(i + 1, (Date) param);
+ } else if (param instanceof Time) {
+ statement.setTime(i + 1, (Time) param);
+ } else if (param instanceof Timestamp) {
+ statement.setTimestamp(i + 1, (Timestamp) param);
+ } else if (param instanceof Array) {
+ statement.setArray(i + 1, (Array) param);
+ } else {
+ //extends with other types if needed
+ throw new IllegalArgumentException("open() failed. Parameter " + i + " of type " + param.getClass() + " is not handled (yet)." );
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Executing '%s' with parameters %s", queryTemplate, Arrays.deepToString(parameterValues[inputSplit.getSplitNumber()])));
+ }
+ }
+ resultSet = statement.executeQuery();
+ hasNext = resultSet.next();
+ } catch (SQLException se) {
+ throw new IllegalArgumentException("open() failed." + se.getMessage(), se);
+ }
+ }
+
+ /**
+ * Closes all resources used.
+ *
+ * @throws IOException Indicates that a resource could not be closed.
+ */
+ @Override
+ public void close() throws IOException {
+ if(resultSet == null) {
+ return;
+ }
+ try {
+ resultSet.close();
+ } catch (SQLException se) {
+ LOG.info("Inputformat ResultSet couldn't be closed - " + se.getMessage());
+ }
+ }
+
+ /**
+ * Checks whether all data has been read.
+ *
+ * @return boolean value indication whether all data has been read.
+ * @throws IOException
+ */
+ @Override
+ public boolean reachedEnd() throws IOException {
+ return !hasNext;
+ }
+
+ /**
+ * Stores the next resultSet row in a tuple
+ *
+ * @param row row to be reused.
+ * @return row containing next {@link Row}
+ * @throws java.io.IOException
+ */
+ @Override
+ public Row nextRecord(Row row) throws IOException {
+ try {
+ if (!hasNext) {
+ return null;
+ }
+ for (int pos = 0; pos < row.productArity(); pos++) {
+ row.setField(pos, resultSet.getObject(pos + 1));
+ }
+ //update hasNext after we've read the record
+ hasNext = resultSet.next();
+ return row;
+ } catch (SQLException se) {
+ throw new IOException("Couldn't read data - " + se.getMessage(), se);
+ } catch (NullPointerException npe) {
+ throw new IOException("Couldn't access resultSet", npe);
+ }
+ }
+
+ @Override
+ public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
+ return cachedStatistics;
+ }
+
+ @Override
+ public InputSplit[] createInputSplits(int minNumSplits) throws IOException {
+ if (parameterValues == null) {
+ return new GenericInputSplit[]{new GenericInputSplit(0, 1)};
+ }
+ GenericInputSplit[] ret = new GenericInputSplit[parameterValues.length];
+ for (int i = 0; i < ret.length; i++) {
+ ret[i] = new GenericInputSplit(i, ret.length);
+ }
+ return ret;
+ }
+
+ @Override
+ public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {
+ return new DefaultInputSplitAssigner(inputSplits);
+ }
+
+ /**
+ * A builder used to set parameters to the output format's configuration in a fluent way.
+ * @return builder
+ */
+ public static JDBCInputFormatBuilder buildJDBCInputFormat() {
+ return new JDBCInputFormatBuilder();
+ }
+
+ public static class JDBCInputFormatBuilder {
+ private final JDBCInputFormat format;
+
+ public JDBCInputFormatBuilder() {
+ this.format = new JDBCInputFormat();
+ //using TYPE_FORWARD_ONLY for high performance reads
+ this.format.resultSetType = ResultSet.TYPE_FORWARD_ONLY;
+ this.format.resultSetConcurrency = ResultSet.CONCUR_READ_ONLY;
+ }
+
+ public JDBCInputFormatBuilder setUsername(String username) {
+ format.username = username;
+ return this;
+ }
+
+ public JDBCInputFormatBuilder setPassword(String password) {
+ format.password = password;
+ return this;
+ }
+
+ public JDBCInputFormatBuilder setDrivername(String drivername) {
+ format.drivername = drivername;
+ return this;
+ }
+
+ public JDBCInputFormatBuilder setDBUrl(String dbURL) {
+ format.dbURL = dbURL;
+ return this;
+ }
+
+ public JDBCInputFormatBuilder setQuery(String query) {
+ format.queryTemplate = query;
+ return this;
+ }
+
+ public JDBCInputFormatBuilder setResultSetType(int resultSetType) {
+ format.resultSetType = resultSetType;
+ return this;
+ }
+
+ public JDBCInputFormatBuilder setResultSetConcurrency(int resultSetConcurrency) {
+ format.resultSetConcurrency = resultSetConcurrency;
+ return this;
+ }
+
+ public JDBCInputFormatBuilder setParametersProvider(ParameterValuesProvider parameterValuesProvider) {
+ format.parameterValues = parameterValuesProvider.getParameterValues();
+ return this;
+ }
+
+ public JDBCInputFormatBuilder setRowTypeInfo(RowTypeInfo rowTypeInfo) {
+ format.rowTypeInfo = rowTypeInfo;
+ return this;
+ }
+
+ public JDBCInputFormat finish() {
+ if (format.username == null) {
+ LOG.info("Username was not supplied separately.");
+ }
+ if (format.password == null) {
+ LOG.info("Password was not supplied separately.");
+ }
+ if (format.dbURL == null) {
+ throw new IllegalArgumentException("No database URL supplied");
+ }
+ if (format.queryTemplate == null) {
+ throw new IllegalArgumentException("No query supplied");
+ }
+ if (format.drivername == null) {
+ throw new IllegalArgumentException("No driver supplied");
+ }
+ if (format.rowTypeInfo == null) {
+ throw new IllegalArgumentException("No " + RowTypeInfo.class.getSimpleName() + " supplied");
+ }
+ if (format.parameterValues == null) {
+ LOG.debug("No input splitting configured (data will be read with parallelism 1).");
+ }
+ return format;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
new file mode 100644
index 0000000..da4b1ad
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * OutputFormat to write tuples into a database.
+ * The OutputFormat has to be configured using the supplied OutputFormatBuilder.
+ *
+ * @see Tuple
+ * @see DriverManager
+ */
+public class JDBCOutputFormat extends RichOutputFormat<Row> {
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(JDBCOutputFormat.class);
+
+ private String username;
+ private String password;
+ private String drivername;
+ private String dbURL;
+ private String query;
+ private int batchInterval = 5000;
+
+ private Connection dbConn;
+ private PreparedStatement upload;
+
+ private int batchCount = 0;
+
+ public int[] typesArray;
+
+ public JDBCOutputFormat() {
+ }
+
+ @Override
+ public void configure(Configuration parameters) {
+ }
+
+ /**
+ * Connects to the target database and initializes the prepared statement.
+ *
+ * @param taskNumber The number of the parallel instance.
+ * @throws IOException Thrown, if the output could not be opened due to an
+ * I/O problem.
+ */
+ @Override
+ public void open(int taskNumber, int numTasks) throws IOException {
+ try {
+ establishConnection();
+ upload = dbConn.prepareStatement(query);
+ } catch (SQLException sqe) {
+ throw new IllegalArgumentException("open() failed.", sqe);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
+ }
+ }
+
+ private void establishConnection() throws SQLException, ClassNotFoundException {
+ Class.forName(drivername);
+ if (username == null) {
+ dbConn = DriverManager.getConnection(dbURL);
+ } else {
+ dbConn = DriverManager.getConnection(dbURL, username, password);
+ }
+ }
+
+ /**
+ * Adds a record to the prepared statement.
+ * <p>
+ * When this method is called, the output format is guaranteed to be opened.
+ * </p>
+ *
+ * WARNING: this may fail when no column types specified (because a best effort approach is attempted in order to
+ * insert a null value but it's not guaranteed that the JDBC driver handles PreparedStatement.setObject(pos, null))
+ *
+ * @param row The records to add to the output.
+ * @see PreparedStatement
+ * @throws IOException Thrown, if the records could not be added due to an I/O problem.
+ */
+ @Override
+ public void writeRecord(Row row) throws IOException {
+
+ if (typesArray != null && typesArray.length > 0 && typesArray.length != row.productArity()) {
+ LOG.warn("Column SQL types array doesn't match arity of passed Row! Check the passed array...");
+ }
+ try {
+
+ if (typesArray == null ) {
+ // no types provided
+ for (int index = 0; index < row.productArity(); index++) {
+ LOG.warn("Unknown column type for column %s. Best effort approach to set its value: %s.", index + 1, row.productElement(index));
+ upload.setObject(index + 1, row.productElement(index));
+ }
+ } else {
+ // types provided
+ for (int index = 0; index < row.productArity(); index++) {
+
+ if (row.productElement(index) == null) {
+ upload.setNull(index + 1, typesArray[index]);
+ } else {
+ // casting values as suggested by http://docs.oracle.com/javase/1.5.0/docs/guide/jdbc/getstart/mapping.html
+ switch (typesArray[index]) {
+ case java.sql.Types.NULL:
+ upload.setNull(index + 1, typesArray[index]);
+ break;
+ case java.sql.Types.BOOLEAN:
+ case java.sql.Types.BIT:
+ upload.setBoolean(index + 1, (boolean) row.productElement(index));
+ break;
+ case java.sql.Types.CHAR:
+ case java.sql.Types.NCHAR:
+ case java.sql.Types.VARCHAR:
+ case java.sql.Types.LONGVARCHAR:
+ case java.sql.Types.LONGNVARCHAR:
+ upload.setString(index + 1, (String) row.productElement(index));
+ break;
+ case java.sql.Types.TINYINT:
+ upload.setByte(index + 1, (byte) row.productElement(index));
+ break;
+ case java.sql.Types.SMALLINT:
+ upload.setShort(index + 1, (short) row.productElement(index));
+ break;
+ case java.sql.Types.INTEGER:
+ upload.setInt(index + 1, (int) row.productElement(index));
+ break;
+ case java.sql.Types.BIGINT:
+ upload.setLong(index + 1, (long) row.productElement(index));
+ break;
+ case java.sql.Types.REAL:
+ upload.setFloat(index + 1, (float) row.productElement(index));
+ break;
+ case java.sql.Types.FLOAT:
+ case java.sql.Types.DOUBLE:
+ upload.setDouble(index + 1, (double) row.productElement(index));
+ break;
+ case java.sql.Types.DECIMAL:
+ case java.sql.Types.NUMERIC:
+ upload.setBigDecimal(index + 1, (java.math.BigDecimal) row.productElement(index));
+ break;
+ case java.sql.Types.DATE:
+ upload.setDate(index + 1, (java.sql.Date) row.productElement(index));
+ break;
+ case java.sql.Types.TIME:
+ upload.setTime(index + 1, (java.sql.Time) row.productElement(index));
+ break;
+ case java.sql.Types.TIMESTAMP:
+ upload.setTimestamp(index + 1, (java.sql.Timestamp) row.productElement(index));
+ break;
+ case java.sql.Types.BINARY:
+ case java.sql.Types.VARBINARY:
+ case java.sql.Types.LONGVARBINARY:
+ upload.setBytes(index + 1, (byte[]) row.productElement(index));
+ break;
+ default:
+ upload.setObject(index + 1, row.productElement(index));
+ LOG.warn("Unmanaged sql type (%s) for column %s. Best effort approach to set its value: %s.",
+ typesArray[index], index + 1, row.productElement(index));
+ // case java.sql.Types.SQLXML
+ // case java.sql.Types.ARRAY:
+ // case java.sql.Types.JAVA_OBJECT:
+ // case java.sql.Types.BLOB:
+ // case java.sql.Types.CLOB:
+ // case java.sql.Types.NCLOB:
+ // case java.sql.Types.DATALINK:
+ // case java.sql.Types.DISTINCT:
+ // case java.sql.Types.OTHER:
+ // case java.sql.Types.REF:
+ // case java.sql.Types.ROWID:
+ // case java.sql.Types.STRUC
+ }
+ }
+ }
+ }
+ upload.addBatch();
+ batchCount++;
+ if (batchCount >= batchInterval) {
+ upload.executeBatch();
+ batchCount = 0;
+ }
+ } catch (SQLException | IllegalArgumentException e) {
+ throw new IllegalArgumentException("writeRecord() failed", e);
+ }
+ }
+
+ /**
+ * Executes prepared statement and closes all resources of this instance.
+ *
+ * @throws IOException Thrown, if the input could not be closed properly.
+ */
+ @Override
+ public void close() throws IOException {
+ try {
+ if (upload != null) {
+ upload.executeBatch();
+ upload.close();
+ }
+ } catch (SQLException se) {
+ LOG.info("Inputformat couldn't be closed - " + se.getMessage());
+ } finally {
+ upload = null;
+ batchCount = 0;
+ }
+
+ try {
+ if (dbConn != null) {
+ dbConn.close();
+ }
+ } catch (SQLException se) {
+ LOG.info("Inputformat couldn't be closed - " + se.getMessage());
+ } finally {
+ dbConn = null;
+ }
+ }
+
+ public static JDBCOutputFormatBuilder buildJDBCOutputFormat() {
+ return new JDBCOutputFormatBuilder();
+ }
+
+ public static class JDBCOutputFormatBuilder {
+ private final JDBCOutputFormat format;
+
+ protected JDBCOutputFormatBuilder() {
+ this.format = new JDBCOutputFormat();
+ }
+
+ public JDBCOutputFormatBuilder setUsername(String username) {
+ format.username = username;
+ return this;
+ }
+
+ public JDBCOutputFormatBuilder setPassword(String password) {
+ format.password = password;
+ return this;
+ }
+
+ public JDBCOutputFormatBuilder setDrivername(String drivername) {
+ format.drivername = drivername;
+ return this;
+ }
+
+ public JDBCOutputFormatBuilder setDBUrl(String dbURL) {
+ format.dbURL = dbURL;
+ return this;
+ }
+
+ public JDBCOutputFormatBuilder setQuery(String query) {
+ format.query = query;
+ return this;
+ }
+
+ public JDBCOutputFormatBuilder setBatchInterval(int batchInterval) {
+ format.batchInterval = batchInterval;
+ return this;
+ }
+
+ public JDBCOutputFormatBuilder setSqlTypes(int[] typesArray) {
+ format.typesArray = typesArray;
+ return this;
+ }
+
+ /**
+ * Finalizes the configuration and checks validity.
+ *
+ * @return Configured JDBCOutputFormat
+ */
+ public JDBCOutputFormat finish() {
+ if (format.username == null) {
+ LOG.info("Username was not supplied separately.");
+ }
+ if (format.password == null) {
+ LOG.info("Password was not supplied separately.");
+ }
+ if (format.dbURL == null) {
+ throw new IllegalArgumentException("No dababase URL supplied.");
+ }
+ if (format.query == null) {
+ throw new IllegalArgumentException("No query suplied");
+ }
+ if (format.drivername == null) {
+ throw new IllegalArgumentException("No driver supplied");
+ }
+
+ return format;
+ }
+ }
+
+}