You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:10 UTC

[20/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors] Merge batch and streaming connectors into common Maven module.

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java
new file mode 100644
index 0000000..3dddd88
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TableInputFormatITCase extends HBaseTestingClusterAutostarter {
+	private static final String TEST_TABLE_NAME = "TableInputFormatTestTable";
+	private static final byte[] TEST_TABLE_FAMILY_NAME = "F".getBytes();
+	private static final byte[] TEST_TABLE_COLUMN_NAME = "Col".getBytes();
+
+	// These are the row ids AND also the values we will put in the test table
+	private static final String[] ROW_IDS = {"000", "111", "222", "333", "444", "555", "666", "777", "888", "999"};
+
+	@BeforeClass
+	public static void activateHBaseCluster(){
+		registerHBaseMiniClusterInClasspath();
+	}
+
+	@Before
+	public void createTestTable() throws IOException {
+		TableName tableName = TableName.valueOf(TEST_TABLE_NAME);
+		byte[][] splitKeys = {"0".getBytes(), "3".getBytes(), "6".getBytes(), "9".getBytes()};
+		createTable(tableName, TEST_TABLE_FAMILY_NAME, splitKeys);
+		HTable table = openTable(tableName);
+
+		for (String rowId : ROW_IDS) {
+			byte[] rowIdBytes = rowId.getBytes();
+			Put p = new Put(rowIdBytes);
+			// Use the rowId as the value to facilitate the testing better
+			p.add(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME, rowIdBytes);
+			table.put(p);
+		}
+
+		table.close();
+	}
+
+	class InputFormatForTestTable extends TableInputFormat<Tuple1<String>> {
+		@Override
+		protected Scan getScanner() {
+			return new Scan();
+		}
+
+		@Override
+		protected String getTableName() {
+			return TEST_TABLE_NAME;
+		}
+
+		@Override
+		protected Tuple1<String> mapResultToTuple(Result r) {
+			return new Tuple1<>(new String(r.getValue(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME)));
+		}
+	}
+
+	@Test
+	public void testTableInputFormat() {
+		ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
+		environment.setParallelism(1);
+
+		DataSet<String> resultDataSet =
+			environment.createInput(new InputFormatForTestTable()).map(new MapFunction<Tuple1<String>, String>() {
+				@Override
+				public String map(Tuple1<String> value) throws Exception {
+					return value.f0;
+				}
+			});
+
+		List<String> resultSet = new ArrayList<>();
+		resultDataSet.output(new LocalCollectionOutputFormat<>(resultSet));
+
+		try {
+			environment.execute("HBase InputFormat Test");
+		} catch (Exception e) {
+			Assert.fail("HBase InputFormat test failed. " + e.getMessage());
+		}
+
+		for (String rowId : ROW_IDS) {
+			assertTrue("Missing rowId from table: " + rowId, resultSet.contains(rowId));
+		}
+
+		assertEquals("The number of records is wrong.", ROW_IDS.length, resultSet.size());
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
new file mode 100644
index 0000000..8579dee
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase.example;
+
+public class HBaseFlinkTestConstants {
+	
+	public static final byte[] CF_SOME = "someCf".getBytes();
+	public static final byte[] Q_SOME = "someQual".getBytes();
+	public static final String TEST_TABLE_NAME = "test-table";
+	public static final String TMP_DIR = "/tmp/test";
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
new file mode 100644
index 0000000..dccf876
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase.example;
+
+import org.apache.flink.addons.hbase.TableInputFormat;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Simple stub for HBase DataSet read
+ * 
+ * To run the test first create the test table with hbase shell.
+ * 
+ * Use the following commands:
+ * <ul>
+ *     <li>create 'test-table', 'someCf'</li>
+ *     <li>put 'test-table', '1', 'someCf:someQual', 'someString'</li>
+ *     <li>put 'test-table', '2', 'someCf:someQual', 'anotherString'</li>
+ * </ul>
+ * 
+ * The test should return just the first entry.
+ * 
+ */
+public class HBaseReadExample {
+	public static void main(String[] args) throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		@SuppressWarnings("serial")
+		DataSet<Tuple2<String, String>> hbaseDs = env.createInput(new TableInputFormat<Tuple2<String, String>>() {
+			
+				@Override
+				public String getTableName() {
+					return HBaseFlinkTestConstants.TEST_TABLE_NAME;
+				}
+
+				@Override
+				protected Scan getScanner() {
+					Scan scan = new Scan();
+					scan.addColumn(HBaseFlinkTestConstants.CF_SOME, HBaseFlinkTestConstants.Q_SOME);
+					return scan;
+				}
+
+				private Tuple2<String, String> reuse = new Tuple2<String, String>();
+				
+				@Override
+				protected Tuple2<String, String> mapResultToTuple(Result r) {
+					String key = Bytes.toString(r.getRow());
+					String val = Bytes.toString(r.getValue(HBaseFlinkTestConstants.CF_SOME, HBaseFlinkTestConstants.Q_SOME));
+					reuse.setField(key, 0);
+					reuse.setField(val, 1);
+					return reuse;
+				}
+		})
+		.filter(new FilterFunction<Tuple2<String,String>>() {
+
+			@Override
+			public boolean filter(Tuple2<String, String> t) throws Exception {
+				String val = t.getField(1);
+				if(val.startsWith("someStr"))
+					return true;
+				return false;
+			}
+		});
+		
+		hbaseDs.print();
+		
+		// kick off execution.
+		env.execute();
+				
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
new file mode 100644
index 0000000..483bdff
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase.example;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+
+/**
+ * Simple stub for HBase DataSet write
+ * 
+ * To run the test first create the test table with hbase shell.
+ * 
+ * Use the following commands:
+ * <ul>
+ *     <li>create 'test-table', 'someCf'</li>
+ * </ul>
+ * 
+ */
+@SuppressWarnings("serial")
+public class HBaseWriteExample {
+	
+	// *************************************************************************
+	//     PROGRAM
+	// *************************************************************************
+	
+	public static void main(String[] args) throws Exception {
+
+		if(!parseParameters(args)) {
+			return;
+		}
+		
+		// set up the execution environment
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		// get input data
+		DataSet<String> text = getTextDataSet(env);
+		
+		DataSet<Tuple2<String, Integer>> counts = 
+				// split up the lines in pairs (2-tuples) containing: (word,1)
+				text.flatMap(new Tokenizer())
+				// group by the tuple field "0" and sum up tuple field "1"
+				.groupBy(0)
+				.sum(1);
+
+		// emit result
+		Job job = Job.getInstance();
+		job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, outputTableName);
+		// TODO is "mapred.output.dir" really useful?
+		job.getConfiguration().set("mapred.output.dir",HBaseFlinkTestConstants.TMP_DIR);
+		counts.map(new RichMapFunction <Tuple2<String,Integer>, Tuple2<Text,Mutation>>() {
+			private transient Tuple2<Text, Mutation> reuse;
+
+			@Override
+			public void open(Configuration parameters) throws Exception {
+				super.open(parameters);
+				reuse = new Tuple2<Text, Mutation>();
+			}
+
+			@Override
+			public Tuple2<Text, Mutation> map(Tuple2<String, Integer> t) throws Exception {
+				reuse.f0 = new Text(t.f0);
+				Put put = new Put(t.f0.getBytes());
+				put.add(HBaseFlinkTestConstants.CF_SOME,HBaseFlinkTestConstants.Q_SOME, Bytes.toBytes(t.f1));
+				reuse.f1 = put;
+				return reuse;
+			}
+		}).output(new HadoopOutputFormat<Text, Mutation>(new TableOutputFormat<Text>(), job));
+		
+		// execute program
+		env.execute("WordCount (HBase sink) Example");
+	}
+	
+	// *************************************************************************
+	//     USER FUNCTIONS
+	// *************************************************************************
+	
+	/**
+	 * Implements the string tokenizer that splits sentences into words as a user-defined
+	 * FlatMapFunction. The function takes a line (String) and splits it into 
+	 * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
+	 */
+	public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
+
+		@Override
+		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
+			// normalize and split the line
+			String[] tokens = value.toLowerCase().split("\\W+");
+			
+			// emit the pairs
+			for (String token : tokens) {
+				if (token.length() > 0) {
+					out.collect(new Tuple2<String, Integer>(token, 1));
+				}
+			}
+		}
+	}
+	
+	// *************************************************************************
+	//     UTIL METHODS
+	// *************************************************************************
+	private static boolean fileOutput = false;
+	private static String textPath;
+	private static String outputTableName = HBaseFlinkTestConstants.TEST_TABLE_NAME;
+	
+	private static boolean parseParameters(String[] args) {
+		
+		if(args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if(args.length == 2) {
+				textPath = args[0];
+				outputTableName = args[1];
+			} else {
+				System.err.println("Usage: HBaseWriteExample <text path> <output table>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing HBaseWriteExample example with built-in default data.");
+			System.out.println("  Provide parameters to read input data from a file.");
+			System.out.println("  Usage: HBaseWriteExample <text path> <output table>");
+		}
+		return true;
+	}
+	
+	private static DataSet<String> getTextDataSet(ExecutionEnvironment env) {
+		if(fileOutput) {
+			// read the text file from given input path
+			return env.readTextFile(textPath);
+		} else {
+			// get default test text data
+			return getDefaultTextLineDataSet(env);
+		}
+	}
+	private static DataSet<String> getDefaultTextLineDataSet(ExecutionEnvironment env) {
+		return env.fromElements(WORDS);
+	}
+	private static final String[] WORDS = new String[] {
+		"To be, or not to be,--that is the question:--",
+		"Whether 'tis nobler in the mind to suffer",
+		"The slings and arrows of outrageous fortune",
+		"Or to take arms against a sea of troubles,",
+		"And by opposing end them?--To die,--to sleep,--",
+		"No more; and by a sleep to say we end",
+		"The heartache, and the thousand natural shocks",
+		"That flesh is heir to,--'tis a consummation",
+		"Devoutly to be wish'd. To die,--to sleep;--",
+		"To sleep! perchance to dream:--ay, there's the rub;",
+		"For in that sleep of death what dreams may come,",
+		"When we have shuffled off this mortal coil,",
+		"Must give us pause: there's the respect",
+		"That makes calamity of so long life;",
+		"For who would bear the whips and scorns of time,",
+		"The oppressor's wrong, the proud man's contumely,",
+		"The pangs of despis'd love, the law's delay,",
+		"The insolence of office, and the spurns",
+		"That patient merit of the unworthy takes,",
+		"When he himself might his quietus make",
+		"With a bare bodkin? who would these fardels bear,",
+		"To grunt and sweat under a weary life,",
+		"But that the dread of something after death,--",
+		"The undiscover'd country, from whose bourn",
+		"No traveller returns,--puzzles the will,",
+		"And makes us rather bear those ills we have",
+		"Than fly to others that we know not of?",
+		"Thus conscience does make cowards of us all;",
+		"And thus the native hue of resolution",
+		"Is sicklied o'er with the pale cast of thought;",
+		"And enterprises of great pith and moment,",
+		"With this regard, their currents turn awry,",
+		"And lose the name of action.--Soft you now!",
+		"The fair Ophelia!--Nymph, in thy orisons",
+		"Be all my sins remember'd."
+	};
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
new file mode 100644
index 0000000..05398db
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase.example;
+
+import java.io.IOException;
+
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * 
+ * This is an example how to write streams into HBase. In this example the
+ * stream will be written into a local Hbase but it is possible to adapt this
+ * example for an HBase running in a cloud. You need a running local HBase with a
+ * table "flinkExample" and a column "entry". If your HBase configuration does
+ * not fit the hbase-site.xml in the resource folder then you gave to delete temporary this
+ * hbase-site.xml to execute the example properly.
+ * 
+ */
+public class HBaseWriteStreamExample {
+
+	public static void main(String[] args) throws Exception {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment
+				.getExecutionEnvironment();
+
+		// data stream with random numbers
+		DataStream<String> dataStream = env.addSource(new SourceFunction<String>() {
+			private static final long serialVersionUID = 1L;
+
+			private volatile boolean isRunning = true;
+
+			@Override
+			public void run(SourceContext<String> out) throws Exception {
+				while (isRunning) {
+					out.collect(String.valueOf(Math.floor(Math.random() * 100)));
+				}
+
+			}
+
+			@Override
+			public void cancel() {
+				isRunning = false;
+			}
+		});
+		dataStream.writeUsingOutputFormat(new HBaseOutputFormat());
+
+		env.execute();
+	}
+
+	/**
+	 * 
+	 * This class implements an OutputFormat for HBase
+	 *
+	 */
+	private static class HBaseOutputFormat implements OutputFormat<String> {
+
+		private org.apache.hadoop.conf.Configuration conf = null;
+		private HTable table = null;
+		private String taskNumber = null;
+		private int rowNumber = 0;
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void configure(Configuration parameters) {
+			conf = HBaseConfiguration.create();
+		}
+
+		@Override
+		public void open(int taskNumber, int numTasks) throws IOException {
+			table = new HTable(conf, "flinkExample");
+			this.taskNumber = String.valueOf(taskNumber);
+		}
+
+		@Override
+		public void writeRecord(String record) throws IOException {
+			Put put = new Put(Bytes.toBytes(taskNumber + rowNumber));
+			put.add(Bytes.toBytes("entry"), Bytes.toBytes("entry"),
+					Bytes.toBytes(rowNumber));
+			rowNumber++;
+			table.put(put);
+		}
+
+		@Override
+		public void close() throws IOException {
+			table.flushCommits();
+			table.close();
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hbase/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/resources/log4j-test.properties b/flink-connectors/flink-hbase/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..804ff45
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/test/resources/log4j-test.properties
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+# http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+log4j.rootLogger=DEBUG, stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.threshold=INFO
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %-5p %30c{1}:%4L - %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hcatalog/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hcatalog/pom.xml b/flink-connectors/flink-hcatalog/pom.xml
new file mode 100644
index 0000000..dde7996
--- /dev/null
+++ b/flink-connectors/flink-hcatalog/pom.xml
@@ -0,0 +1,182 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+	
+	<modelVersion>4.0.0</modelVersion>
+	
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-connectors</artifactId>
+		<version>1.2-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-hcatalog</artifactId>
+	<name>flink-hcatalog</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-hadoop-compatibility_2.10</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.hive.hcatalog</groupId>
+			<artifactId>hcatalog-core</artifactId>
+			<version>0.12.0</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.json</groupId>
+					<artifactId>json</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+
+		<dependency>
+			<groupId>org.scala-lang</groupId>
+			<artifactId>scala-library</artifactId>
+			<scope>provided</scope>
+		</dependency>
+
+	</dependencies>
+
+	<build>
+		<plugins>
+			<!-- Scala Compiler -->
+			<plugin>
+				<groupId>net.alchim31.maven</groupId>
+				<artifactId>scala-maven-plugin</artifactId>
+				<executions>
+					<!-- Run scala compiler in the process-resources phase, so that dependencies on
+						scala classes can be resolved later in the (Java) compile phase -->
+					<execution>
+						<id>scala-compile-first</id>
+						<phase>process-resources</phase>
+						<goals>
+							<goal>compile</goal>
+						</goals>
+					</execution>
+
+					<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
+						 scala classes can be resolved later in the (Java) test-compile phase -->
+					<execution>
+						<id>scala-test-compile</id>
+						<phase>process-test-resources</phase>
+						<goals>
+							<goal>testCompile</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<jvmArgs>
+						<jvmArg>-Xms128m</jvmArg>
+						<jvmArg>-Xmx512m</jvmArg>
+					</jvmArgs>
+				</configuration>
+			</plugin>
+
+			<!-- Eclipse Integration -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-eclipse-plugin</artifactId>
+				<version>2.8</version>
+				<configuration>
+					<downloadSources>true</downloadSources>
+					<projectnatures>
+						<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+						<projectnature>org.eclipse.jdt.core.javanature</projectnature>
+					</projectnatures>
+					<buildcommands>
+						<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+					</buildcommands>
+					<classpathContainers>
+						<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+						<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+					</classpathContainers>
+					<excludes>
+						<exclude>org.scala-lang:scala-library</exclude>
+						<exclude>org.scala-lang:scala-compiler</exclude>
+					</excludes>
+					<sourceIncludes>
+						<sourceInclude>**/*.scala</sourceInclude>
+						<sourceInclude>**/*.java</sourceInclude>
+					</sourceIncludes>
+				</configuration>
+			</plugin>
+
+			<!-- Adding scala source directories to build path -->
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>build-helper-maven-plugin</artifactId>
+				<version>1.7</version>
+				<executions>
+					<!-- Add src/main/scala to eclipse build path -->
+					<execution>
+						<id>add-source</id>
+						<phase>generate-sources</phase>
+						<goals>
+							<goal>add-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/main/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+					<!-- Add src/test/scala to eclipse build path -->
+					<execution>
+						<id>add-test-source</id>
+						<phase>generate-test-sources</phase>
+						<goals>
+							<goal>add-test-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/test/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+			<!-- Scala Code Style, most of the configuration done via plugin management -->
+			<plugin>
+				<groupId>org.scalastyle</groupId>
+				<artifactId>scalastyle-maven-plugin</artifactId>
+				<configuration>
+					<configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
+				</configuration>
+			</plugin>
+
+		</plugins>
+	</build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java b/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
new file mode 100644
index 0000000..859b706
--- /dev/null
+++ b/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hcatalog;
+
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils;
+import org.apache.flink.api.java.hadoop.mapreduce.wrapper.HadoopInputSplit;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.WritableTypeInfo;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hive.hcatalog.common.HCatException;
+import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.hive.hcatalog.data.DefaultHCatRecord;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hive.hcatalog.data.schema.HCatSchema;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A InputFormat to read from HCatalog tables.
+ * The InputFormat supports projection (selection and order of fields) and partition filters.
+ *
+ * Data can be returned as {@link org.apache.hive.hcatalog.data.HCatRecord} or Flink-native tuple.
+ *
+ * Note: Flink tuples might only support a limited number of fields (depending on the API).
+ *
+ * @param <T>
+ */
+public abstract class HCatInputFormatBase<T> extends RichInputFormat<T, HadoopInputSplit> implements ResultTypeQueryable<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	private Configuration configuration;
+
+	private org.apache.hive.hcatalog.mapreduce.HCatInputFormat hCatInputFormat;
+	private RecordReader<WritableComparable, HCatRecord> recordReader;
+	private boolean fetched = false;
+	private boolean hasNext;
+
+	protected String[] fieldNames = new String[0];
+	protected HCatSchema outputSchema;
+
+	private TypeInformation<T> resultType;
+
+	public HCatInputFormatBase() { }
+
+	/**
+	 * Creates a HCatInputFormat for the given database and table.
+	 * By default, the InputFormat returns {@link org.apache.hive.hcatalog.data.HCatRecord}.
+	 * The return type of the InputFormat can be changed to Flink-native tuples by calling
+	 * {@link HCatInputFormatBase#asFlinkTuples()}.
+	 *
+	 * @param database The name of the database to read from.
+	 * @param table The name of the table to read.
+	 * @throws java.io.IOException
+	 */
+	public HCatInputFormatBase(String database, String table) throws IOException {
+		this(database, table, new Configuration());
+	}
+
+	/**
+	 * Creates a HCatInputFormat for the given database, table, and
+	 * {@link org.apache.hadoop.conf.Configuration}.
+	 * By default, the InputFormat returns {@link org.apache.hive.hcatalog.data.HCatRecord}.
+	 * The return type of the InputFormat can be changed to Flink-native tuples by calling
+	 * {@link HCatInputFormatBase#asFlinkTuples()}.
+	 *
+	 * @param database The name of the database to read from.
+	 * @param table The name of the table to read.
+	 * @param config The Configuration for the InputFormat.
+	 * @throws java.io.IOException
+	 */
+	public HCatInputFormatBase(String database, String table, Configuration config) throws IOException {
+		super();
+		this.configuration = config;
+		HadoopUtils.mergeHadoopConf(this.configuration);
+
+		this.hCatInputFormat = org.apache.hive.hcatalog.mapreduce.HCatInputFormat.setInput(this.configuration, database, table);
+		this.outputSchema = org.apache.hive.hcatalog.mapreduce.HCatInputFormat.getTableSchema(this.configuration);
+
+		// configure output schema of HCatFormat
+		configuration.set("mapreduce.lib.hcat.output.schema", HCatUtil.serialize(outputSchema));
+		// set type information
+		this.resultType = new WritableTypeInfo(DefaultHCatRecord.class);
+	}
+
+	/**
+	 * Specifies the fields which are returned by the InputFormat and their order.
+	 *
+	 * @param fields The fields and their order which are returned by the InputFormat.
+	 * @return This InputFormat with specified return fields.
+	 * @throws java.io.IOException
+	 */
+	public HCatInputFormatBase<T> getFields(String... fields) throws IOException {
+
+		// build output schema
+		ArrayList<HCatFieldSchema> fieldSchemas = new ArrayList<HCatFieldSchema>(fields.length);
+		for(String field : fields) {
+			fieldSchemas.add(this.outputSchema.get(field));
+		}
+		this.outputSchema = new HCatSchema(fieldSchemas);
+
+		// update output schema configuration
+		configuration.set("mapreduce.lib.hcat.output.schema", HCatUtil.serialize(outputSchema));
+
+		return this;
+	}
+
+	/**
+	 * Specifies a SQL-like filter condition on the table's partition columns.
+	 * Filter conditions on non-partition columns are invalid.
+	 * A partition filter can significantly reduce the amount of data to be read.
+	 *
+	 * @param filter A SQL-like filter condition on the table's partition columns.
+	 * @return This InputFormat with specified partition filter.
+	 * @throws java.io.IOException
+	 */
+	public HCatInputFormatBase<T> withFilter(String filter) throws IOException {
+
+		// set filter
+		this.hCatInputFormat.setFilter(filter);
+
+		return this;
+	}
+
+	/**
+	 * Specifies that the InputFormat returns Flink tuples instead of
+	 * {@link org.apache.hive.hcatalog.data.HCatRecord}.
+	 *
+	 * Note: Flink tuples might only support a limited number of fields (depending on the API).
+	 *
+	 * @return This InputFormat.
+	 * @throws org.apache.hive.hcatalog.common.HCatException
+	 */
+	public HCatInputFormatBase<T> asFlinkTuples() throws HCatException {
+
+		// build type information
+		int numFields = outputSchema.getFields().size();
+		if(numFields > this.getMaxFlinkTupleSize()) {
+			throw new IllegalArgumentException("Only up to "+this.getMaxFlinkTupleSize()+
+					" fields can be returned as Flink tuples.");
+		}
+
+		TypeInformation[] fieldTypes = new TypeInformation[numFields];
+		fieldNames = new String[numFields];
+		for (String fieldName : outputSchema.getFieldNames()) {
+			HCatFieldSchema field = outputSchema.get(fieldName);
+
+			int fieldPos = outputSchema.getPosition(fieldName);
+			TypeInformation fieldType = getFieldType(field);
+
+			fieldTypes[fieldPos] = fieldType;
+			fieldNames[fieldPos] = fieldName;
+
+		}
+		this.resultType = new TupleTypeInfo(fieldTypes);
+
+		return this;
+	}
+
+	protected abstract int getMaxFlinkTupleSize();
+
+	private TypeInformation getFieldType(HCatFieldSchema fieldSchema) {
+
+		switch(fieldSchema.getType()) {
+			case INT:
+				return BasicTypeInfo.INT_TYPE_INFO;
+			case TINYINT:
+				return BasicTypeInfo.BYTE_TYPE_INFO;
+			case SMALLINT:
+				return BasicTypeInfo.SHORT_TYPE_INFO;
+			case BIGINT:
+				return BasicTypeInfo.LONG_TYPE_INFO;
+			case BOOLEAN:
+				return BasicTypeInfo.BOOLEAN_TYPE_INFO;
+			case FLOAT:
+				return BasicTypeInfo.FLOAT_TYPE_INFO;
+			case DOUBLE:
+				return BasicTypeInfo.DOUBLE_TYPE_INFO;
+			case STRING:
+				return BasicTypeInfo.STRING_TYPE_INFO;
+			case BINARY:
+				return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
+			case ARRAY:
+				return new GenericTypeInfo(List.class);
+			case MAP:
+				return new GenericTypeInfo(Map.class);
+			case STRUCT:
+				return new GenericTypeInfo(List.class);
+			default:
+				throw new IllegalArgumentException("Unknown data type \""+fieldSchema.getType()+"\" encountered.");
+		}
+	}
+
+	/**
+	 * Returns the {@link org.apache.hadoop.conf.Configuration} of the HCatInputFormat.
+	 *
+	 * @return The Configuration of the HCatInputFormat.
+	 */
+	public Configuration getConfiguration() {
+		return this.configuration;
+	}
+
+	/**
+	 * Returns the {@link org.apache.hive.hcatalog.data.schema.HCatSchema} of the {@link org.apache.hive.hcatalog.data.HCatRecord}
+	 * returned by this InputFormat.
+	 *
+	 * @return The HCatSchema of the HCatRecords returned by this InputFormat.
+	 */
+	public HCatSchema getOutputSchema() {
+		return this.outputSchema;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  InputFormat
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void configure(org.apache.flink.configuration.Configuration parameters) {
+		// nothing to do
+	}
+
+	@Override
+	public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException {
+		// no statistics provided at the moment
+		return null;
+	}
+
+	@Override
+	public HadoopInputSplit[] createInputSplits(int minNumSplits)
+			throws IOException {
+		configuration.setInt("mapreduce.input.fileinputformat.split.minsize", minNumSplits);
+
+		JobContext jobContext = null;
+		try {
+			jobContext = HadoopUtils.instantiateJobContext(configuration, new JobID());
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+
+		List<InputSplit> splits;
+		try {
+			splits = this.hCatInputFormat.getSplits(jobContext);
+		} catch (InterruptedException e) {
+			throw new IOException("Could not get Splits.", e);
+		}
+		HadoopInputSplit[] hadoopInputSplits = new HadoopInputSplit[splits.size()];
+
+		for(int i = 0; i < hadoopInputSplits.length; i++){
+			hadoopInputSplits[i] = new HadoopInputSplit(i, splits.get(i), jobContext);
+		}
+		return hadoopInputSplits;
+	}
+
+	@Override
+	public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] inputSplits) {
+		return new LocatableInputSplitAssigner(inputSplits);
+	}
+
+	@Override
+	public void open(HadoopInputSplit split) throws IOException {
+		TaskAttemptContext context = null;
+		try {
+			context = HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID());
+		} catch(Exception e) {
+			throw new RuntimeException(e);
+		}
+
+		try {
+			this.recordReader = this.hCatInputFormat
+					.createRecordReader(split.getHadoopInputSplit(), context);
+			this.recordReader.initialize(split.getHadoopInputSplit(), context);
+		} catch (InterruptedException e) {
+			throw new IOException("Could not create RecordReader.", e);
+		} finally {
+			this.fetched = false;
+		}
+	}
+
+	@Override
+	public boolean reachedEnd() throws IOException {
+		if(!this.fetched) {
+			fetchNext();
+		}
+		return !this.hasNext;
+	}
+
+	private void fetchNext() throws IOException {
+		try {
+			this.hasNext = this.recordReader.nextKeyValue();
+		} catch (InterruptedException e) {
+			throw new IOException("Could not fetch next KeyValue pair.", e);
+		} finally {
+			this.fetched = true;
+		}
+	}
+
+	@Override
+	public T nextRecord(T record) throws IOException {
+		if(!this.fetched) {
+			// first record
+			fetchNext();
+		}
+		if(!this.hasNext) {
+			return null;
+		}
+		try {
+
+			// get next HCatRecord
+			HCatRecord v = this.recordReader.getCurrentValue();
+			this.fetched = false;
+
+			if(this.fieldNames.length > 0) {
+				// return as Flink tuple
+				return this.buildFlinkTuple(record, v);
+
+			} else {
+				// return as HCatRecord
+				return (T)v;
+			}
+
+		} catch (InterruptedException e) {
+			throw new IOException("Could not get next record.", e);
+		}
+	}
+
+	protected abstract T buildFlinkTuple(T t, HCatRecord record) throws HCatException;
+
+	@Override
+	public void close() throws IOException {
+		this.recordReader.close();
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Custom de/serialization methods
+	// --------------------------------------------------------------------------------------------
+
+	private void writeObject(ObjectOutputStream out) throws IOException {
+		out.writeInt(this.fieldNames.length);
+		for(String fieldName : this.fieldNames) {
+			out.writeUTF(fieldName);
+		}
+		this.configuration.write(out);
+	}
+
+	@SuppressWarnings("unchecked")
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+		this.fieldNames = new String[in.readInt()];
+		for(int i=0; i<this.fieldNames.length; i++) {
+			this.fieldNames[i] = in.readUTF();
+		}
+
+		Configuration configuration = new Configuration();
+		configuration.readFields(in);
+
+		if(this.configuration == null) {
+			this.configuration = configuration;
+		}
+
+		this.hCatInputFormat = new org.apache.hive.hcatalog.mapreduce.HCatInputFormat();
+		this.outputSchema = (HCatSchema)HCatUtil.deserialize(this.configuration.get("mapreduce.lib.hcat.output.schema"));
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Result type business
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public TypeInformation<T> getProducedType() {
+		return this.resultType;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java b/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
new file mode 100644
index 0000000..46f3cd5
--- /dev/null
+++ b/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hcatalog.java;
+
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.hcatalog.HCatInputFormatBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hive.hcatalog.common.HCatException;
+import org.apache.hive.hcatalog.data.HCatRecord;
+
+/**
+ * A InputFormat to read from HCatalog tables.
+ * The InputFormat supports projection (selection and order of fields) and partition filters.
+ *
+ * Data can be returned as {@link HCatRecord} or Flink {@link org.apache.flink.api.java.tuple.Tuple}.
+ * Flink tuples support only up to 25 fields.
+ *
+ * @param <T>
+ */
+public class HCatInputFormat<T> extends HCatInputFormatBase<T> {
+	private static final long serialVersionUID = 1L;
+
+	public HCatInputFormat() {}
+
+	public HCatInputFormat(String database, String table) throws Exception {
+		super(database, table);
+	}
+
+	public HCatInputFormat(String database, String table, Configuration config) throws Exception {
+		super(database, table, config);
+	}
+
+
+	@Override
+	protected int getMaxFlinkTupleSize() {
+		return 25;
+	}
+
+	@Override
+	protected T buildFlinkTuple(T t, HCatRecord record) throws HCatException {
+
+		Tuple tuple = (Tuple)t;
+
+		// Extract all fields from HCatRecord
+		for(int i=0; i < this.fieldNames.length; i++) {
+
+			// get field value
+			Object o = record.get(this.fieldNames[i], this.outputSchema);
+
+			// Set field value in Flink tuple.
+			// Partition columns are returned as String and
+			//   need to be converted to original type.
+			switch(this.outputSchema.get(i).getType()) {
+				case INT:
+					if(o instanceof String) {
+						tuple.setField(Integer.parseInt((String) o), i);
+					} else {
+						tuple.setField(o, i);
+					}
+					break;
+				case TINYINT:
+					if(o instanceof String) {
+						tuple.setField(Byte.parseByte((String) o), i);
+					} else {
+						tuple.setField(o, i);
+					}
+					break;
+				case SMALLINT:
+					if(o instanceof String) {
+						tuple.setField(Short.parseShort((String) o), i);
+					} else {
+						tuple.setField(o, i);
+					}
+					break;
+				case BIGINT:
+					if(o instanceof String) {
+						tuple.setField(Long.parseLong((String) o), i);
+					} else {
+						tuple.setField(o, i);
+					}
+					break;
+				case BOOLEAN:
+					if(o instanceof String) {
+						tuple.setField(Boolean.parseBoolean((String) o), i);
+					} else {
+						tuple.setField(o, i);
+					}
+					break;
+				case FLOAT:
+					if(o instanceof String) {
+						tuple.setField(Float.parseFloat((String) o), i);
+					} else {
+						tuple.setField(o, i);
+					}
+					break;
+				case DOUBLE:
+					if(o instanceof String) {
+						tuple.setField(Double.parseDouble((String) o), i);
+					} else {
+						tuple.setField(o, i);
+					}
+					break;
+				case STRING:
+					tuple.setField(o, i);
+					break;
+				case BINARY:
+					if(o instanceof String) {
+						throw new RuntimeException("Cannot handle partition keys of type BINARY.");
+					} else {
+						tuple.setField(o, i);
+					}
+					break;
+				case ARRAY:
+					if(o instanceof String) {
+						throw new RuntimeException("Cannot handle partition keys of type ARRAY.");
+					} else {
+						tuple.setField(o, i);
+					}
+					break;
+				case MAP:
+					if(o instanceof String) {
+						throw new RuntimeException("Cannot handle partition keys of type MAP.");
+					} else {
+						tuple.setField(o, i);
+					}
+					break;
+				case STRUCT:
+					if(o instanceof String) {
+						throw new RuntimeException("Cannot handle partition keys of type STRUCT.");
+					} else {
+						tuple.setField(o, i);
+					}
+					break;
+				default:
+					throw new RuntimeException("Invalid Type");
+			}
+		}
+
+		return (T)tuple;
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala b/flink-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
new file mode 100644
index 0000000..0299ee1
--- /dev/null
+++ b/flink-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hcatalog.scala
+
+import org.apache.flink.configuration
+import org.apache.flink.hcatalog.HCatInputFormatBase
+import org.apache.hadoop.conf.Configuration
+import org.apache.hive.hcatalog.data.HCatRecord
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema
+
+/**
+ * A InputFormat to read from HCatalog tables.
+ * The InputFormat supports projection (selection and order of fields) and partition filters.
+ *
+ * Data can be returned as [[HCatRecord]] or Scala tuples.
+ * Scala tuples support only up to 22 fields.
+ *
+ */
+class HCatInputFormat[T](
+                        database: String,
+                        table: String,
+                        config: Configuration
+                          ) extends HCatInputFormatBase[T](database, table, config) {
+
+  def this(database: String, table: String) {
+    this(database, table, new Configuration)
+  }
+
+  var vals: Array[Any] = Array[Any]()
+
+  override def configure(parameters: configuration.Configuration): Unit = {
+    super.configure(parameters)
+    vals = new Array[Any](fieldNames.length)
+  }
+
+  override protected def getMaxFlinkTupleSize: Int = 22
+
+  override protected def buildFlinkTuple(t: T, record: HCatRecord): T = {
+
+    // Extract all fields from HCatRecord
+    var i: Int = 0
+    while (i < this.fieldNames.length) {
+
+        val o: AnyRef = record.get(this.fieldNames(i), this.outputSchema)
+
+        // partition columns are returned as String
+        //   Check and convert to actual type.
+        this.outputSchema.get(i).getType match {
+          case HCatFieldSchema.Type.INT =>
+            if (o.isInstanceOf[String]) {
+              vals(i) = o.asInstanceOf[String].toInt
+            }
+            else {
+              vals(i) = o.asInstanceOf[Int]
+            }
+          case HCatFieldSchema.Type.TINYINT =>
+            if (o.isInstanceOf[String]) {
+              vals(i) = o.asInstanceOf[String].toInt.toByte
+            }
+            else {
+              vals(i) = o.asInstanceOf[Byte]
+            }
+          case HCatFieldSchema.Type.SMALLINT =>
+            if (o.isInstanceOf[String]) {
+              vals(i) = o.asInstanceOf[String].toInt.toShort
+            }
+            else {
+              vals(i) = o.asInstanceOf[Short]
+            }
+          case HCatFieldSchema.Type.BIGINT =>
+            if (o.isInstanceOf[String]) {
+              vals(i) = o.asInstanceOf[String].toLong
+            }
+            else {
+              vals(i) = o.asInstanceOf[Long]
+            }
+          case HCatFieldSchema.Type.BOOLEAN =>
+            if (o.isInstanceOf[String]) {
+              vals(i) = o.asInstanceOf[String].toBoolean
+            }
+            else {
+              vals(i) = o.asInstanceOf[Boolean]
+            }
+          case HCatFieldSchema.Type.FLOAT =>
+            if (o.isInstanceOf[String]) {
+              vals(i) = o.asInstanceOf[String].toFloat
+            }
+            else {
+              vals(i) = o.asInstanceOf[Float]
+            }
+          case HCatFieldSchema.Type.DOUBLE =>
+            if (o.isInstanceOf[String]) {
+              vals(i) = o.asInstanceOf[String].toDouble
+            }
+            else {
+              vals(i) = o.asInstanceOf[Double]
+            }
+          case HCatFieldSchema.Type.STRING =>
+            vals(i) = o
+          case HCatFieldSchema.Type.BINARY =>
+            if (o.isInstanceOf[String]) {
+              throw new RuntimeException("Cannot handle partition keys of type BINARY.")
+            }
+            else {
+              vals(i) = o.asInstanceOf[Array[Byte]]
+            }
+          case HCatFieldSchema.Type.ARRAY =>
+            if (o.isInstanceOf[String]) {
+              throw new RuntimeException("Cannot handle partition keys of type ARRAY.")
+            }
+            else {
+              vals(i) = o.asInstanceOf[List[Object]]
+            }
+          case HCatFieldSchema.Type.MAP =>
+            if (o.isInstanceOf[String]) {
+              throw new RuntimeException("Cannot handle partition keys of type MAP.")
+            }
+            else {
+              vals(i) = o.asInstanceOf[Map[Object, Object]]
+            }
+          case HCatFieldSchema.Type.STRUCT =>
+            if (o.isInstanceOf[String]) {
+              throw new RuntimeException("Cannot handle partition keys of type STRUCT.")
+            }
+            else {
+              vals(i) = o.asInstanceOf[List[Object]]
+            }
+          case _ =>
+            throw new RuntimeException("Invalid type " + this.outputSchema.get(i).getType +
+              " encountered.")
+        }
+
+        i += 1
+      }
+    createScalaTuple(vals)
+  }
+
+  private def createScalaTuple(vals: Array[Any]): T = {
+
+    this.fieldNames.length match {
+      case 1 =>
+        new Tuple1(vals(0)).asInstanceOf[T]
+      case 2 =>
+        new Tuple2(vals(0), vals(1)).asInstanceOf[T]
+      case 3 =>
+        new Tuple3(vals(0), vals(1), vals(2)).asInstanceOf[T]
+      case 4 =>
+        new Tuple4(vals(0), vals(1), vals(2), vals(3)).asInstanceOf[T]
+      case 5 =>
+        new Tuple5(vals(0), vals(1), vals(2), vals(3), vals(4)).asInstanceOf[T]
+      case 6 =>
+        new Tuple6(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5)).asInstanceOf[T]
+      case 7 =>
+        new Tuple7(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6)).asInstanceOf[T]
+      case 8 =>
+        new Tuple8(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7))
+          .asInstanceOf[T]
+      case 9 =>
+        new Tuple9(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+          vals(8)).asInstanceOf[T]
+      case 10 =>
+        new Tuple10(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+          vals(8), vals(9)).asInstanceOf[T]
+      case 11 =>
+        new Tuple11(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+          vals(8), vals(9), vals(10)).asInstanceOf[T]
+      case 12 =>
+        new Tuple12(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11)).asInstanceOf[T]
+      case 13 =>
+        new Tuple13(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11), vals(12)).asInstanceOf[T]
+      case 14 =>
+        new Tuple14(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13)).asInstanceOf[T]
+      case 15 =>
+        new Tuple15(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14)).asInstanceOf[T]
+      case 16 =>
+        new Tuple16(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15))
+          .asInstanceOf[T]
+      case 17 =>
+        new Tuple17(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15),
+          vals(16)).asInstanceOf[T]
+      case 18 =>
+        new Tuple18(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15),
+          vals(16), vals(17)).asInstanceOf[T]
+      case 19 =>
+        new Tuple19(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15),
+          vals(16), vals(17), vals(18)).asInstanceOf[T]
+      case 20 =>
+        new Tuple20(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15),
+          vals(16), vals(17), vals(18), vals(19)).asInstanceOf[T]
+      case 21 =>
+        new Tuple21(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15),
+          vals(16), vals(17), vals(18), vals(19), vals(20)).asInstanceOf[T]
+      case 22 =>
+        new Tuple22(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15),
+          vals(16), vals(17), vals(18), vals(19), vals(20), vals(21)).asInstanceOf[T]
+      case _ =>
+        throw new RuntimeException("Only up to 22 fields supported for Scala Tuples.")
+
+  }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/pom.xml b/flink-connectors/flink-jdbc/pom.xml
new file mode 100644
index 0000000..be42648
--- /dev/null
+++ b/flink-connectors/flink-jdbc/pom.xml
@@ -0,0 +1,66 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+	
+	<modelVersion>4.0.0</modelVersion>
+	
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-connectors</artifactId>
+		<version>1.2-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-jdbc</artifactId>
+	<name>flink-jdbc</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-clients_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.derby</groupId>
+			<artifactId>derby</artifactId>
+			<version>10.10.1.1</version>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
new file mode 100644
index 0000000..b4246f5
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Array;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Arrays;
+
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * InputFormat to read data from a database and generate Rows.
+ * The InputFormat has to be configured using the supplied InputFormatBuilder.
+ * A valid RowTypeInfo must be properly configured in the builder, e.g.: </br>
+ *
+ * <pre><code>
+ * TypeInformation<?>[] fieldTypes = new TypeInformation<?>[] {
+ *		BasicTypeInfo.INT_TYPE_INFO,
+ *		BasicTypeInfo.STRING_TYPE_INFO,
+ *		BasicTypeInfo.STRING_TYPE_INFO,
+ *		BasicTypeInfo.DOUBLE_TYPE_INFO,
+ *		BasicTypeInfo.INT_TYPE_INFO
+ *	};
+ *
+ * RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
+ *
+ * JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+ *				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+ *				.setDBUrl("jdbc:derby:memory:ebookshop")
+ *				.setQuery("select * from books")
+ *				.setRowTypeInfo(rowTypeInfo)
+ *				.finish();
+ * </code></pre>
+ *
+ * In order to query the JDBC source in parallel, you need to provide a
+ * parameterized query template (i.e. a valid {@link PreparedStatement}) and
+ * a {@link ParameterValuesProvider} which provides binding values for the
+ * query parameters. E.g.:</br>
+ *
+ * <pre><code>
+ *
+ * Serializable[][] queryParameters = new String[2][1];
+ * queryParameters[0] = new String[]{"Kumar"};
+ * queryParameters[1] = new String[]{"Tan Ah Teck"};
+ *
+ * JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+ *				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+ *				.setDBUrl("jdbc:derby:memory:ebookshop")
+ *				.setQuery("select * from books WHERE author = ?")
+ *				.setRowTypeInfo(rowTypeInfo)
+ *				.setParametersProvider(new GenericParameterValuesProvider(queryParameters))
+ *				.finish();
+ * </code></pre>
+ *
+ * @see Row
+ * @see ParameterValuesProvider
+ * @see PreparedStatement
+ * @see DriverManager
+ */
+public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements ResultTypeQueryable {
+
+	private static final long serialVersionUID = 1L;
+	private static final Logger LOG = LoggerFactory.getLogger(JDBCInputFormat.class);
+
+	private String username;
+	private String password;
+	private String drivername;
+	private String dbURL;
+	private String queryTemplate;
+	private int resultSetType;
+	private int resultSetConcurrency;
+	private RowTypeInfo rowTypeInfo;
+
+	private transient Connection dbConn;
+	private transient PreparedStatement statement;
+	private transient ResultSet resultSet;
+
+	private boolean hasNext;
+	private Object[][] parameterValues;
+
+	public JDBCInputFormat() {
+	}
+
+	@Override
+	public RowTypeInfo getProducedType() {
+		return rowTypeInfo;
+	}
+
+	@Override
+	public void configure(Configuration parameters) {
+		//do nothing here
+	}
+
+	@Override
+	public void openInputFormat() {
+		//called once per inputFormat (on open)
+		try {
+			Class.forName(drivername);
+			if (username == null) {
+				dbConn = DriverManager.getConnection(dbURL);
+			} else {
+				dbConn = DriverManager.getConnection(dbURL, username, password);
+			}
+			statement = dbConn.prepareStatement(queryTemplate, resultSetType, resultSetConcurrency);
+		} catch (SQLException se) {
+			throw new IllegalArgumentException("open() failed." + se.getMessage(), se);
+		} catch (ClassNotFoundException cnfe) {
+			throw new IllegalArgumentException("JDBC-Class not found. - " + cnfe.getMessage(), cnfe);
+		}
+	}
+
+	@Override
+	public void closeInputFormat() {
+		//called once per inputFormat (on close)
+		try {
+			if(statement != null) {
+				statement.close();
+			}
+		} catch (SQLException se) {
+			LOG.info("Inputformat Statement couldn't be closed - " + se.getMessage());
+		} finally {
+			statement = null;
+		}
+
+		try {
+			if(dbConn != null) {
+				dbConn.close();
+			}
+		} catch (SQLException se) {
+			LOG.info("Inputformat couldn't be closed - " + se.getMessage());
+		} finally {
+			dbConn = null;
+		}
+
+		parameterValues = null;
+	}
+
+	/**
+	 * Connects to the source database and executes the query in a <b>parallel
+	 * fashion</b> if
+	 * this {@link InputFormat} is built using a parameterized query (i.e. using
+	 * a {@link PreparedStatement})
+	 * and a proper {@link ParameterValuesProvider}, in a <b>non-parallel
+	 * fashion</b> otherwise.
+	 *
+	 * @param inputSplit which is ignored if this InputFormat is executed as a
+	 *        non-parallel source,
+	 *        a "hook" to the query parameters otherwise (using its
+	 *        <i>splitNumber</i>)
+	 * @throws IOException if there's an error during the execution of the query
+	 */
+	@Override
+	public void open(InputSplit inputSplit) throws IOException {
+		try {
+			if (inputSplit != null && parameterValues != null) {
+				for (int i = 0; i < parameterValues[inputSplit.getSplitNumber()].length; i++) {
+					Object param = parameterValues[inputSplit.getSplitNumber()][i];
+					if (param instanceof String) {
+						statement.setString(i + 1, (String) param);
+					} else if (param instanceof Long) {
+						statement.setLong(i + 1, (Long) param);
+					} else if (param instanceof Integer) {
+						statement.setInt(i + 1, (Integer) param);
+					} else if (param instanceof Double) {
+						statement.setDouble(i + 1, (Double) param);
+					} else if (param instanceof Boolean) {
+						statement.setBoolean(i + 1, (Boolean) param);
+					} else if (param instanceof Float) {
+						statement.setFloat(i + 1, (Float) param);
+					} else if (param instanceof BigDecimal) {
+						statement.setBigDecimal(i + 1, (BigDecimal) param);
+					} else if (param instanceof Byte) {
+						statement.setByte(i + 1, (Byte) param);
+					} else if (param instanceof Short) {
+						statement.setShort(i + 1, (Short) param);
+					} else if (param instanceof Date) {
+						statement.setDate(i + 1, (Date) param);
+					} else if (param instanceof Time) {
+						statement.setTime(i + 1, (Time) param);
+					} else if (param instanceof Timestamp) {
+						statement.setTimestamp(i + 1, (Timestamp) param);
+					} else if (param instanceof Array) {
+						statement.setArray(i + 1, (Array) param);
+					} else {
+						//extends with other types if needed
+						throw new IllegalArgumentException("open() failed. Parameter " + i + " of type " + param.getClass() + " is not handled (yet)." );
+					}
+				}
+				if (LOG.isDebugEnabled()) {
+					LOG.debug(String.format("Executing '%s' with parameters %s", queryTemplate, Arrays.deepToString(parameterValues[inputSplit.getSplitNumber()])));
+				}
+			}
+			resultSet = statement.executeQuery();
+			hasNext = resultSet.next();
+		} catch (SQLException se) {
+			throw new IllegalArgumentException("open() failed." + se.getMessage(), se);
+		}
+	}
+
+	/**
+	 * Closes all resources used.
+	 *
+	 * @throws IOException Indicates that a resource could not be closed.
+	 */
+	@Override
+	public void close() throws IOException {
+		if(resultSet == null) {
+			return;
+		}
+		try {
+			resultSet.close();
+		} catch (SQLException se) {
+			LOG.info("Inputformat ResultSet couldn't be closed - " + se.getMessage());
+		}
+	}
+
+	/**
+	 * Checks whether all data has been read.
+	 *
+	 * @return boolean value indication whether all data has been read.
+	 * @throws IOException
+	 */
+	@Override
+	public boolean reachedEnd() throws IOException {
+		return !hasNext;
+	}
+
+	/**
+	 * Stores the next resultSet row in a tuple
+	 *
+	 * @param row row to be reused.
+	 * @return row containing next {@link Row}
+	 * @throws java.io.IOException
+	 */
+	@Override
+	public Row nextRecord(Row row) throws IOException {
+		try {
+			if (!hasNext) {
+				return null;
+			}
+			for (int pos = 0; pos < row.productArity(); pos++) {
+				row.setField(pos, resultSet.getObject(pos + 1));
+			}
+			//update hasNext after we've read the record
+			hasNext = resultSet.next();
+			return row;
+		} catch (SQLException se) {
+			throw new IOException("Couldn't read data - " + se.getMessage(), se);
+		} catch (NullPointerException npe) {
+			throw new IOException("Couldn't access resultSet", npe);
+		}
+	}
+
+	@Override
+	public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
+		return cachedStatistics;
+	}
+
+	@Override
+	public InputSplit[] createInputSplits(int minNumSplits) throws IOException {
+		if (parameterValues == null) {
+			return new GenericInputSplit[]{new GenericInputSplit(0, 1)};
+		}
+		GenericInputSplit[] ret = new GenericInputSplit[parameterValues.length];
+		for (int i = 0; i < ret.length; i++) {
+			ret[i] = new GenericInputSplit(i, ret.length);
+		}
+		return ret;
+	}
+
+	@Override
+	public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {
+		return new DefaultInputSplitAssigner(inputSplits);
+	}
+
+	/**
+	 * A builder used to set parameters to the output format's configuration in a fluent way.
+	 * @return builder
+	 */
+	public static JDBCInputFormatBuilder buildJDBCInputFormat() {
+		return new JDBCInputFormatBuilder();
+	}
+
+	public static class JDBCInputFormatBuilder {
+		private final JDBCInputFormat format;
+
+		public JDBCInputFormatBuilder() {
+			this.format = new JDBCInputFormat();
+			//using TYPE_FORWARD_ONLY for high performance reads
+			this.format.resultSetType = ResultSet.TYPE_FORWARD_ONLY;
+			this.format.resultSetConcurrency = ResultSet.CONCUR_READ_ONLY;
+		}
+
+		public JDBCInputFormatBuilder setUsername(String username) {
+			format.username = username;
+			return this;
+		}
+
+		public JDBCInputFormatBuilder setPassword(String password) {
+			format.password = password;
+			return this;
+		}
+
+		public JDBCInputFormatBuilder setDrivername(String drivername) {
+			format.drivername = drivername;
+			return this;
+		}
+
+		public JDBCInputFormatBuilder setDBUrl(String dbURL) {
+			format.dbURL = dbURL;
+			return this;
+		}
+
+		public JDBCInputFormatBuilder setQuery(String query) {
+			format.queryTemplate = query;
+			return this;
+		}
+
+		public JDBCInputFormatBuilder setResultSetType(int resultSetType) {
+			format.resultSetType = resultSetType;
+			return this;
+		}
+
+		public JDBCInputFormatBuilder setResultSetConcurrency(int resultSetConcurrency) {
+			format.resultSetConcurrency = resultSetConcurrency;
+			return this;
+		}
+
+		public JDBCInputFormatBuilder setParametersProvider(ParameterValuesProvider parameterValuesProvider) {
+			format.parameterValues = parameterValuesProvider.getParameterValues();
+			return this;
+		}
+
+		public JDBCInputFormatBuilder setRowTypeInfo(RowTypeInfo rowTypeInfo) {
+			format.rowTypeInfo = rowTypeInfo;
+			return this;
+		}
+
+		public JDBCInputFormat finish() {
+			if (format.username == null) {
+				LOG.info("Username was not supplied separately.");
+			}
+			if (format.password == null) {
+				LOG.info("Password was not supplied separately.");
+			}
+			if (format.dbURL == null) {
+				throw new IllegalArgumentException("No database URL supplied");
+			}
+			if (format.queryTemplate == null) {
+				throw new IllegalArgumentException("No query supplied");
+			}
+			if (format.drivername == null) {
+				throw new IllegalArgumentException("No driver supplied");
+			}
+			if (format.rowTypeInfo == null) {
+				throw new IllegalArgumentException("No " + RowTypeInfo.class.getSimpleName() + " supplied");
+			}
+			if (format.parameterValues == null) {
+				LOG.debug("No input splitting configured (data will be read with parallelism 1).");
+			}
+			return format;
+		}
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
new file mode 100644
index 0000000..da4b1ad
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * OutputFormat to write tuples into a database.
+ * The OutputFormat has to be configured using the supplied OutputFormatBuilder.
+ * 
+ * @see Tuple
+ * @see DriverManager
+ */
+public class JDBCOutputFormat extends RichOutputFormat<Row> {
+	private static final long serialVersionUID = 1L;
+	
+	private static final Logger LOG = LoggerFactory.getLogger(JDBCOutputFormat.class);
+	
+	private String username;
+	private String password;
+	private String drivername;
+	private String dbURL;
+	private String query;
+	private int batchInterval = 5000;
+	
+	private Connection dbConn;
+	private PreparedStatement upload;
+	
+	private int batchCount = 0;
+	
+	public int[] typesArray;
+	
+	public JDBCOutputFormat() {
+	}
+	
+	@Override
+	public void configure(Configuration parameters) {
+	}
+	
+	/**
+	 * Connects to the target database and initializes the prepared statement.
+	 *
+	 * @param taskNumber The number of the parallel instance.
+	 * @throws IOException Thrown, if the output could not be opened due to an
+	 * I/O problem.
+	 */
+	@Override
+	public void open(int taskNumber, int numTasks) throws IOException {
+		try {
+			establishConnection();
+			upload = dbConn.prepareStatement(query);
+		} catch (SQLException sqe) {
+			throw new IllegalArgumentException("open() failed.", sqe);
+		} catch (ClassNotFoundException cnfe) {
+			throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
+		}
+	}
+	
+	private void establishConnection() throws SQLException, ClassNotFoundException {
+		Class.forName(drivername);
+		if (username == null) {
+			dbConn = DriverManager.getConnection(dbURL);
+		} else {
+			dbConn = DriverManager.getConnection(dbURL, username, password);
+		}
+	}
+	
+	/**
+	 * Adds a record to the prepared statement.
+	 * <p>
+	 * When this method is called, the output format is guaranteed to be opened.
+	 * </p>
+	 * 
+	 * WARNING: this may fail when no column types specified (because a best effort approach is attempted in order to
+	 * insert a null value but it's not guaranteed that the JDBC driver handles PreparedStatement.setObject(pos, null))
+	 *
+	 * @param row The records to add to the output.
+	 * @see PreparedStatement
+	 * @throws IOException Thrown, if the records could not be added due to an I/O problem.
+	 */
+	@Override
+	public void writeRecord(Row row) throws IOException {
+
+		if (typesArray != null && typesArray.length > 0 && typesArray.length != row.productArity()) {
+			LOG.warn("Column SQL types array doesn't match arity of passed Row! Check the passed array...");
+		} 
+		try {
+
+			if (typesArray == null ) {
+				// no types provided
+				for (int index = 0; index < row.productArity(); index++) {
+					LOG.warn("Unknown column type for column %s. Best effort approach to set its value: %s.", index + 1, row.productElement(index));
+					upload.setObject(index + 1, row.productElement(index));
+				}
+			} else {
+				// types provided
+				for (int index = 0; index < row.productArity(); index++) {
+
+					if (row.productElement(index) == null) {
+						upload.setNull(index + 1, typesArray[index]);
+					} else {
+						// casting values as suggested by http://docs.oracle.com/javase/1.5.0/docs/guide/jdbc/getstart/mapping.html
+						switch (typesArray[index]) {
+							case java.sql.Types.NULL:
+								upload.setNull(index + 1, typesArray[index]);
+								break;
+							case java.sql.Types.BOOLEAN:
+							case java.sql.Types.BIT:
+								upload.setBoolean(index + 1, (boolean) row.productElement(index));
+								break;
+							case java.sql.Types.CHAR:
+							case java.sql.Types.NCHAR:
+							case java.sql.Types.VARCHAR:
+							case java.sql.Types.LONGVARCHAR:
+							case java.sql.Types.LONGNVARCHAR:
+								upload.setString(index + 1, (String) row.productElement(index));
+								break;
+							case java.sql.Types.TINYINT:
+								upload.setByte(index + 1, (byte) row.productElement(index));
+								break;
+							case java.sql.Types.SMALLINT:
+								upload.setShort(index + 1, (short) row.productElement(index));
+								break;
+							case java.sql.Types.INTEGER:
+								upload.setInt(index + 1, (int) row.productElement(index));
+								break;
+							case java.sql.Types.BIGINT:
+								upload.setLong(index + 1, (long) row.productElement(index));
+								break;
+							case java.sql.Types.REAL:
+								upload.setFloat(index + 1, (float) row.productElement(index));
+								break;
+							case java.sql.Types.FLOAT:
+							case java.sql.Types.DOUBLE:
+								upload.setDouble(index + 1, (double) row.productElement(index));
+								break;
+							case java.sql.Types.DECIMAL:
+							case java.sql.Types.NUMERIC:
+								upload.setBigDecimal(index + 1, (java.math.BigDecimal) row.productElement(index));
+								break;
+							case java.sql.Types.DATE:
+								upload.setDate(index + 1, (java.sql.Date) row.productElement(index));
+								break;
+							case java.sql.Types.TIME:
+								upload.setTime(index + 1, (java.sql.Time) row.productElement(index));
+								break;
+							case java.sql.Types.TIMESTAMP:
+								upload.setTimestamp(index + 1, (java.sql.Timestamp) row.productElement(index));
+								break;
+							case java.sql.Types.BINARY:
+							case java.sql.Types.VARBINARY:
+							case java.sql.Types.LONGVARBINARY:
+								upload.setBytes(index + 1, (byte[]) row.productElement(index));
+								break;
+							default:
+								upload.setObject(index + 1, row.productElement(index));
+								LOG.warn("Unmanaged sql type (%s) for column %s. Best effort approach to set its value: %s.",
+									typesArray[index], index + 1, row.productElement(index));
+								// case java.sql.Types.SQLXML
+								// case java.sql.Types.ARRAY:
+								// case java.sql.Types.JAVA_OBJECT:
+								// case java.sql.Types.BLOB:
+								// case java.sql.Types.CLOB:
+								// case java.sql.Types.NCLOB:
+								// case java.sql.Types.DATALINK:
+								// case java.sql.Types.DISTINCT:
+								// case java.sql.Types.OTHER:
+								// case java.sql.Types.REF:
+								// case java.sql.Types.ROWID:
+								// case java.sql.Types.STRUC
+						}
+					}
+				}
+			}
+			upload.addBatch();
+			batchCount++;
+			if (batchCount >= batchInterval) {
+				upload.executeBatch();
+				batchCount = 0;
+			}
+		} catch (SQLException | IllegalArgumentException e) {
+			throw new IllegalArgumentException("writeRecord() failed", e);
+		}
+	}
+	
+	/**
+	 * Executes prepared statement and closes all resources of this instance.
+	 *
+	 * @throws IOException Thrown, if the input could not be closed properly.
+	 */
+	@Override
+	public void close() throws IOException {
+		try {
+			if (upload != null) {
+				upload.executeBatch();
+				upload.close();
+			}
+		} catch (SQLException se) {
+			LOG.info("Inputformat couldn't be closed - " + se.getMessage());
+		} finally {
+			upload = null;
+			batchCount = 0;
+		}
+		
+		try {
+			if (dbConn != null) {
+				dbConn.close();
+			}
+		} catch (SQLException se) {
+			LOG.info("Inputformat couldn't be closed - " + se.getMessage());
+		} finally {
+			dbConn = null;
+		}
+	}
+	
+	public static JDBCOutputFormatBuilder buildJDBCOutputFormat() {
+		return new JDBCOutputFormatBuilder();
+	}
+	
+	public static class JDBCOutputFormatBuilder {
+		private final JDBCOutputFormat format;
+		
+		protected JDBCOutputFormatBuilder() {
+			this.format = new JDBCOutputFormat();
+		}
+		
+		public JDBCOutputFormatBuilder setUsername(String username) {
+			format.username = username;
+			return this;
+		}
+		
+		public JDBCOutputFormatBuilder setPassword(String password) {
+			format.password = password;
+			return this;
+		}
+		
+		public JDBCOutputFormatBuilder setDrivername(String drivername) {
+			format.drivername = drivername;
+			return this;
+		}
+		
+		public JDBCOutputFormatBuilder setDBUrl(String dbURL) {
+			format.dbURL = dbURL;
+			return this;
+		}
+		
+		public JDBCOutputFormatBuilder setQuery(String query) {
+			format.query = query;
+			return this;
+		}
+		
+		public JDBCOutputFormatBuilder setBatchInterval(int batchInterval) {
+			format.batchInterval = batchInterval;
+			return this;
+		}
+		
+		public JDBCOutputFormatBuilder setSqlTypes(int[] typesArray) {
+			format.typesArray = typesArray;
+			return this;
+		}
+		
+		/**
+		 * Finalizes the configuration and checks validity.
+		 * 
+		 * @return Configured JDBCOutputFormat
+		 */
+		public JDBCOutputFormat finish() {
+			if (format.username == null) {
+				LOG.info("Username was not supplied separately.");
+			}
+			if (format.password == null) {
+				LOG.info("Password was not supplied separately.");
+			}
+			if (format.dbURL == null) {
+				throw new IllegalArgumentException("No dababase URL supplied.");
+			}
+			if (format.query == null) {
+				throw new IllegalArgumentException("No query suplied");
+			}
+			if (format.drivername == null) {
+				throw new IllegalArgumentException("No driver supplied");
+			}
+			
+			return format;
+		}
+	}
+	
+}