You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:37 UTC

[47/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors] Merge batch and streaming connectors into common Maven module.

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java b/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
deleted file mode 100644
index 35b0a7c..0000000
--- a/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
+++ /dev/null
@@ -1,289 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.addons.hbase;
-
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
-import org.apache.flink.api.common.io.RichInputFormat;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * {@link InputFormat} subclass that wraps the access for HTables.
- */
-public abstract class TableInputFormat<T extends Tuple> extends RichInputFormat<T, TableInputSplit> {
-
-	private static final long serialVersionUID = 1L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(TableInputFormat.class);
-
-	/** helper variable to decide whether the input is exhausted or not */
-	private boolean endReached = false;
-
-	protected transient HTable table = null;
-	protected transient Scan scan = null;
-
-	/** HBase iterator wrapper */
-	private ResultScanner resultScanner = null;
-
-	private byte[] lastRow;
-	private int scannedRows;
-
-	/**
-	 * Returns an instance of Scan that retrieves the required subset of records from the HBase table.
-	 * @return The appropriate instance of Scan for this usecase.
-	 */
-	protected abstract Scan getScanner();
-
-	/**
-	 * What table is to be read.
-	 * Per instance of a TableInputFormat derivative only a single tablename is possible.
-	 * @return The name of the table
-	 */
-	protected abstract String getTableName();
-
-	/**
-	 * The output from HBase is always an instance of {@link Result}.
-	 * This method is to copy the data in the Result instance into the required {@link Tuple}
-	 * @param r The Result instance from HBase that needs to be converted
-	 * @return The approriate instance of {@link Tuple} that contains the needed information.
-	 */
-	protected abstract T mapResultToTuple(Result r);
-
-	/**
-	 * Creates a {@link Scan} object and opens the {@link HTable} connection.
-	 * These are opened here because they are needed in the createInputSplits
-	 * which is called before the openInputFormat method.
-	 * So the connection is opened in {@link #configure(Configuration)} and closed in {@link #closeInputFormat()}.
-	 *
-	 * @param parameters The configuration that is to be used
-	 * @see Configuration
-	 */
-	@Override
-	public void configure(Configuration parameters) {
-		table = createTable();
-		if (table != null) {
-			scan = getScanner();
-		}
-	}
-
-	/**
-	 * Create an {@link HTable} instance and set it into this format
-	 */
-	private HTable createTable() {
-		LOG.info("Initializing HBaseConfiguration");
-		//use files found in the classpath
-		org.apache.hadoop.conf.Configuration hConf = HBaseConfiguration.create();
-
-		try {
-			return new HTable(hConf, getTableName());
-		} catch (Exception e) {
-			LOG.error("Error instantiating a new HTable instance", e);
-		}
-		return null;
-	}
-
-	@Override
-	public void open(TableInputSplit split) throws IOException {
-		if (table == null) {
-			throw new IOException("The HBase table has not been opened!");
-		}
-		if (scan == null) {
-			throw new IOException("getScanner returned null");
-		}
-		if (split == null) {
-			throw new IOException("Input split is null!");
-		}
-
-		logSplitInfo("opening", split);
-		scan.setStartRow(split.getStartRow());
-		lastRow = split.getEndRow();
-		scan.setStopRow(lastRow);
-
-		resultScanner = table.getScanner(scan);
-		endReached = false;
-		scannedRows = 0;
-	}
-
-	@Override
-	public boolean reachedEnd() throws IOException {
-		return endReached;
-	}
-
-	@Override
-	public T nextRecord(T reuse) throws IOException {
-		if (resultScanner == null) {
-			throw new IOException("No table result scanner provided!");
-		}
-		try {
-			Result res = resultScanner.next();
-			if (res != null) {
-				scannedRows++;
-				lastRow = res.getRow();
-				return mapResultToTuple(res);
-			}
-		} catch (Exception e) {
-			resultScanner.close();
-			//workaround for timeout on scan
-			LOG.warn("Error after scan of " + scannedRows + " rows. Retry with a new scanner...", e);
-			scan.setStartRow(lastRow);
-			resultScanner = table.getScanner(scan);
-			Result res = resultScanner.next();
-			if (res != null) {
-				scannedRows++;
-				lastRow = res.getRow();
-				return mapResultToTuple(res);
-			}
-		}
-
-		endReached = true;
-		return null;
-	}
-
-	@Override
-	public void close() throws IOException {
-		LOG.info("Closing split (scanned {} rows)", scannedRows);
-		lastRow = null;
-		try {
-			if (resultScanner != null) {
-				resultScanner.close();
-			}
-		} finally {
-			resultScanner = null;
-		}
-	}
-
-	@Override
-	public void closeInputFormat() throws IOException {
-		try {
-			if (table != null) {
-				table.close();
-			}
-		} finally {
-			table = null;
-		}
-	}
-
-	@Override
-	public TableInputSplit[] createInputSplits(final int minNumSplits) throws IOException {
-		if (table == null) {
-			throw new IOException("The HBase table has not been opened!");
-		}
-		if (scan == null) {
-			throw new IOException("getScanner returned null");
-		}
-
-		//Gets the starting and ending row keys for every region in the currently open table
-		final Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
-		if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
-			throw new IOException("Expecting at least one region.");
-		}
-		final byte[] startRow = scan.getStartRow();
-		final byte[] stopRow = scan.getStopRow();
-		final boolean scanWithNoLowerBound = startRow.length == 0;
-		final boolean scanWithNoUpperBound = stopRow.length == 0;
-
-		final List<TableInputSplit> splits = new ArrayList<TableInputSplit>(minNumSplits);
-		for (int i = 0; i < keys.getFirst().length; i++) {
-			final byte[] startKey = keys.getFirst()[i];
-			final byte[] endKey = keys.getSecond()[i];
-			final String regionLocation = table.getRegionLocation(startKey, false).getHostnamePort();
-			//Test if the given region is to be included in the InputSplit while splitting the regions of a table
-			if (!includeRegionInSplit(startKey, endKey)) {
-				continue;
-			}
-			//Finds the region on which the given row is being served
-			final String[] hosts = new String[]{regionLocation};
-
-			// determine if regions contains keys used by the scan
-			boolean isLastRegion = endKey.length == 0;
-			if ((scanWithNoLowerBound || isLastRegion || Bytes.compareTo(startRow, endKey) < 0) &&
-				(scanWithNoUpperBound || Bytes.compareTo(stopRow, startKey) > 0)) {
-
-				final byte[] splitStart = scanWithNoLowerBound || Bytes.compareTo(startKey, startRow) >= 0 ? startKey : startRow;
-				final byte[] splitStop = (scanWithNoUpperBound || Bytes.compareTo(endKey, stopRow) <= 0)
-					&& !isLastRegion ? endKey : stopRow;
-				int id = splits.size();
-				final TableInputSplit split = new TableInputSplit(id, hosts, table.getTableName(), splitStart, splitStop);
-				splits.add(split);
-			}
-		}
-		LOG.info("Created " + splits.size() + " splits");
-		for (TableInputSplit split : splits) {
-			logSplitInfo("created", split);
-		}
-		return splits.toArray(new TableInputSplit[0]);
-	}
-
-	private void logSplitInfo(String action, TableInputSplit split) {
-		int splitId = split.getSplitNumber();
-		String splitStart = Bytes.toString(split.getStartRow());
-		String splitEnd = Bytes.toString(split.getEndRow());
-		String splitStartKey = splitStart.isEmpty() ? "-" : splitStart;
-		String splitStopKey = splitEnd.isEmpty() ? "-" : splitEnd;
-		String[] hostnames = split.getHostnames();
-		LOG.info("{} split (this={})[{}|{}|{}|{}]", action, this, splitId, hostnames, splitStartKey, splitStopKey);
-	}
-
-	/**
-	 * Test if the given region is to be included in the InputSplit while splitting the regions of a table.
-	 * <p>
-	 * This optimization is effective when there is a specific reasoning to exclude an entire region from the M-R job,
-	 * (and hence, not contributing to the InputSplit), given the start and end keys of the same. <br>
-	 * Useful when we need to remember the last-processed top record and revisit the [last, current) interval for M-R
-	 * processing, continuously. In addition to reducing InputSplits, reduces the load on the region server as well, due
-	 * to the ordering of the keys. <br>
-	 * <br>
-	 * Note: It is possible that <code>endKey.length() == 0 </code> , for the last (recent) region. <br>
-	 * Override this method, if you want to bulk exclude regions altogether from M-R. By default, no region is excluded(
-	 * i.e. all regions are included).
-	 *
-	 * @param startKey Start key of the region
-	 * @param endKey   End key of the region
-	 * @return true, if this region needs to be included as part of the input (default).
-	 */
-	protected boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) {
-		return true;
-	}
-
-	@Override
-	public InputSplitAssigner getInputSplitAssigner(TableInputSplit[] inputSplits) {
-		return new LocatableInputSplitAssigner(inputSplits);
-	}
-
-	@Override
-	public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
-		return null;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java b/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
deleted file mode 100644
index 75f0b9b..0000000
--- a/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.addons.hbase;
-
-import org.apache.flink.core.io.LocatableInputSplit;
-
-/**
- * This class implements a input splits for HBase. Each table input split corresponds to a key range (low, high). All
- * references to row below refer to the key of the row.
- */
-public class TableInputSplit extends LocatableInputSplit {
-
-	private static final long serialVersionUID = 1L;
-
-	/** The name of the table to retrieve data from */
-	private final byte[] tableName;
-
-	/** The start row of the split. */
-	private final byte[] startRow;
-
-	/** The end row of the split. */
-	private final byte[] endRow;
-
-	/**
-	 * Creates a new table input split
-	 * 
-	 * @param splitNumber
-	 *        the number of the input split
-	 * @param hostnames
-	 *        the names of the hosts storing the data the input split refers to
-	 * @param tableName
-	 *        the name of the table to retrieve data from
-	 * @param startRow
-	 *        the start row of the split
-	 * @param endRow
-	 *        the end row of the split
-	 */
-	TableInputSplit(final int splitNumber, final String[] hostnames, final byte[] tableName, final byte[] startRow,
-			final byte[] endRow) {
-		super(splitNumber, hostnames);
-
-		this.tableName = tableName;
-		this.startRow = startRow;
-		this.endRow = endRow;
-	}
-
-	/**
-	 * Returns the table name.
-	 * 
-	 * @return The table name.
-	 */
-	public byte[] getTableName() {
-		return this.tableName;
-	}
-
-	/**
-	 * Returns the start row.
-	 * 
-	 * @return The start row.
-	 */
-	public byte[] getStartRow() {
-		return this.startRow;
-	}
-
-	/**
-	 * Returns the end row.
-	 * 
-	 * @return The end row.
-	 */
-	public byte[] getEndRow() {
-		return this.endRow;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java b/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java
deleted file mode 100644
index 3d9f672..0000000
--- a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Copyright The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.addons.hbase;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.logging.impl.Log4JLogger;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MasterNotRunningException;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.ScannerCallable;
-import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
-import org.apache.hadoop.hbase.ipc.RpcServer;
-import org.apache.log4j.Level;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * By using this class as the super class of a set of tests you will have a HBase testing
- * cluster available that is very suitable for writing tests for scanning and filtering against.
- * This is usable by any downstream application because the HBase cluster is 'injected' because
- * a dynamically generated hbase-site.xml is added to the classpath.
- * Because of this classpath manipulation it is not possible to start a second testing cluster in the same JVM.
- * So if you have this you should either put all hbase related tests in a single class or force surefire to
- * setup a new JVM for each testclass.
- * See: http://maven.apache.org/surefire/maven-surefire-plugin/examples/fork-options-and-parallel-execution.html
- */
-//
-// NOTE: The code in this file is based on code from the
-// Apache HBase project, licensed under the Apache License v 2.0
-//
-// https://github.com/apache/hbase/blob/master/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/FilterTestingCluster.java
-//
-public class HBaseTestingClusterAutostarter implements Serializable {
-
-	private static final Log LOG = LogFactory.getLog(HBaseTestingClusterAutostarter.class);
-
-	private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-	private static HBaseAdmin admin = null;
-	private static List<TableName> createdTables = new ArrayList<>();
-
-	private static boolean alreadyRegisteredTestCluster = false;
-
-	protected static void createTable(TableName tableName, byte[] columnFamilyName, byte[][] splitKeys) {
-		LOG.info("HBase minicluster: Creating table " + tableName.getNameAsString());
-
-		assertNotNull("HBaseAdmin is not initialized successfully.", admin);
-		HTableDescriptor desc = new HTableDescriptor(tableName);
-		HColumnDescriptor colDef = new HColumnDescriptor(columnFamilyName);
-		desc.addFamily(colDef);
-
-		try {
-			admin.createTable(desc, splitKeys);
-			createdTables.add(tableName);
-			assertTrue("Fail to create the table", admin.tableExists(tableName));
-		} catch (IOException e) {
-			assertNull("Exception found while creating table", e);
-		}
-	}
-
-	protected static HTable openTable(TableName tableName) throws IOException {
-		HTable table = (HTable) admin.getConnection().getTable(tableName);
-		assertTrue("Fail to create the table", admin.tableExists(tableName));
-		return table;
-	}
-
-	private static void deleteTables() {
-		if (admin != null) {
-			for (TableName tableName : createdTables) {
-				try {
-					if (admin.tableExists(tableName)) {
-						admin.disableTable(tableName);
-						admin.deleteTable(tableName);
-					}
-				} catch (IOException e) {
-					assertNull("Exception found deleting the table", e);
-				}
-			}
-		}
-	}
-
-	private static void initialize(Configuration conf) {
-		conf = HBaseConfiguration.create(conf);
-		conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
-		try {
-			admin = TEST_UTIL.getHBaseAdmin();
-		} catch (MasterNotRunningException e) {
-			assertNull("Master is not running", e);
-		} catch (ZooKeeperConnectionException e) {
-			assertNull("Cannot connect to ZooKeeper", e);
-		} catch (IOException e) {
-			assertNull("IOException", e);
-		}
-	}
-
-	@BeforeClass
-	public static void setUp() throws Exception {
-		LOG.info("HBase minicluster: Starting");
-		((Log4JLogger) RpcServer.LOG).getLogger().setLevel(Level.ALL);
-		((Log4JLogger) AbstractRpcClient.LOG).getLogger().setLevel(Level.ALL);
-		((Log4JLogger) ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
-
-		TEST_UTIL.startMiniCluster(1);
-
-		// https://issues.apache.org/jira/browse/HBASE-11711
-		TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", -1);
-
-		// Make sure the zookeeper quorum value contains the right port number (varies per run).
-		TEST_UTIL.getConfiguration().set("hbase.zookeeper.quorum", "localhost:" + TEST_UTIL.getZkCluster().getClientPort());
-
-		initialize(TEST_UTIL.getConfiguration());
-		LOG.info("HBase minicluster: Running");
-	}
-
-	private static File hbaseSiteXmlDirectory;
-	private static File hbaseSiteXmlFile;
-
-	/**
-	 * This dynamically generates a hbase-site.xml file that is added to the classpath.
-	 * This way this HBaseMinicluster can be used by an unmodified application.
-	 * The downside is that this cannot be 'unloaded' so you can have only one per JVM.
-	 */
-	public static void registerHBaseMiniClusterInClasspath() {
-		if (alreadyRegisteredTestCluster) {
-			fail("You CANNOT register a second HBase Testing cluster in the classpath of the SAME JVM");
-		}
-		File baseDir = new File(System.getProperty("java.io.tmpdir", "/tmp/"));
-		hbaseSiteXmlDirectory = new File(baseDir, "unittest-hbase-minicluster-" + Math.abs(new Random().nextLong()) + "/");
-
-		if (!hbaseSiteXmlDirectory.mkdirs()) {
-			fail("Unable to create output directory " + hbaseSiteXmlDirectory + " for the HBase minicluster");
-		}
-
-		assertNotNull("The ZooKeeper for the HBase minicluster is missing", TEST_UTIL.getZkCluster());
-
-		createHBaseSiteXml(hbaseSiteXmlDirectory, TEST_UTIL.getConfiguration().get("hbase.zookeeper.quorum"));
-		addDirectoryToClassPath(hbaseSiteXmlDirectory);
-
-		// Avoid starting it again.
-		alreadyRegisteredTestCluster = true;
-	}
-
-	private static void createHBaseSiteXml(File hbaseSiteXmlDirectory, String zookeeperQuorum) {
-		hbaseSiteXmlFile = new File(hbaseSiteXmlDirectory, "hbase-site.xml");
-		// Create the hbase-site.xml file for this run.
-		try {
-			String hbaseSiteXml = "<?xml version=\"1.0\"?>\n" +
-				"<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?>\n" +
-				"<configuration>\n" +
-				"  <property>\n" +
-				"    <name>hbase.zookeeper.quorum</name>\n" +
-				"    <value>" + zookeeperQuorum + "</value>\n" +
-				"  </property>\n" +
-				"</configuration>";
-			OutputStream fos = new FileOutputStream(hbaseSiteXmlFile);
-			fos.write(hbaseSiteXml.getBytes(StandardCharsets.UTF_8));
-			fos.close();
-		} catch (IOException e) {
-			fail("Unable to create " + hbaseSiteXmlFile);
-		}
-	}
-
-	private static void addDirectoryToClassPath(File directory) {
-		try {
-			// Get the classloader actually used by HBaseConfiguration
-			ClassLoader classLoader = HBaseConfiguration.create().getClassLoader();
-			if (!(classLoader instanceof URLClassLoader)) {
-				fail("We should get a URLClassLoader");
-			}
-
-			// Make the addURL method accessible
-			Method method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
-			method.setAccessible(true);
-
-			// Add the directory where we put the hbase-site.xml to the classpath
-			method.invoke(classLoader, directory.toURI().toURL());
-		} catch (MalformedURLException | NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
-			fail("Unable to add " + directory + " to classpath because of this exception: " + e.getMessage());
-		}
-	}
-
-	@AfterClass
-	public static void tearDown() throws Exception {
-		LOG.info("HBase minicluster: Shutting down");
-		deleteTables();
-		hbaseSiteXmlFile.delete();
-		hbaseSiteXmlDirectory.delete();
-		TEST_UTIL.shutdownMiniCluster();
-		LOG.info("HBase minicluster: Down");
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java b/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java
deleted file mode 100644
index 3dddd88..0000000
--- a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.addons.hbase;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class TableInputFormatITCase extends HBaseTestingClusterAutostarter {
-	private static final String TEST_TABLE_NAME = "TableInputFormatTestTable";
-	private static final byte[] TEST_TABLE_FAMILY_NAME = "F".getBytes();
-	private static final byte[] TEST_TABLE_COLUMN_NAME = "Col".getBytes();
-
-	// These are the row ids AND also the values we will put in the test table
-	private static final String[] ROW_IDS = {"000", "111", "222", "333", "444", "555", "666", "777", "888", "999"};
-
-	@BeforeClass
-	public static void activateHBaseCluster(){
-		registerHBaseMiniClusterInClasspath();
-	}
-
-	@Before
-	public void createTestTable() throws IOException {
-		TableName tableName = TableName.valueOf(TEST_TABLE_NAME);
-		byte[][] splitKeys = {"0".getBytes(), "3".getBytes(), "6".getBytes(), "9".getBytes()};
-		createTable(tableName, TEST_TABLE_FAMILY_NAME, splitKeys);
-		HTable table = openTable(tableName);
-
-		for (String rowId : ROW_IDS) {
-			byte[] rowIdBytes = rowId.getBytes();
-			Put p = new Put(rowIdBytes);
-			// Use the rowId as the value to facilitate the testing better
-			p.add(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME, rowIdBytes);
-			table.put(p);
-		}
-
-		table.close();
-	}
-
-	class InputFormatForTestTable extends TableInputFormat<Tuple1<String>> {
-		@Override
-		protected Scan getScanner() {
-			return new Scan();
-		}
-
-		@Override
-		protected String getTableName() {
-			return TEST_TABLE_NAME;
-		}
-
-		@Override
-		protected Tuple1<String> mapResultToTuple(Result r) {
-			return new Tuple1<>(new String(r.getValue(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME)));
-		}
-	}
-
-	@Test
-	public void testTableInputFormat() {
-		ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
-		environment.setParallelism(1);
-
-		DataSet<String> resultDataSet =
-			environment.createInput(new InputFormatForTestTable()).map(new MapFunction<Tuple1<String>, String>() {
-				@Override
-				public String map(Tuple1<String> value) throws Exception {
-					return value.f0;
-				}
-			});
-
-		List<String> resultSet = new ArrayList<>();
-		resultDataSet.output(new LocalCollectionOutputFormat<>(resultSet));
-
-		try {
-			environment.execute("HBase InputFormat Test");
-		} catch (Exception e) {
-			Assert.fail("HBase InputFormat test failed. " + e.getMessage());
-		}
-
-		for (String rowId : ROW_IDS) {
-			assertTrue("Missing rowId from table: " + rowId, resultSet.contains(rowId));
-		}
-
-		assertEquals("The number of records is wrong.", ROW_IDS.length, resultSet.size());
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java b/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
deleted file mode 100644
index 8579dee..0000000
--- a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.addons.hbase.example;
-
-public class HBaseFlinkTestConstants {
-	
-	public static final byte[] CF_SOME = "someCf".getBytes();
-	public static final byte[] Q_SOME = "someQual".getBytes();
-	public static final String TEST_TABLE_NAME = "test-table";
-	public static final String TMP_DIR = "/tmp/test";
-	
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java b/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
deleted file mode 100644
index dccf876..0000000
--- a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.addons.hbase.example;
-
-import org.apache.flink.addons.hbase.TableInputFormat;
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-
-/**
- * Simple stub for HBase DataSet read
- * 
- * To run the test first create the test table with hbase shell.
- * 
- * Use the following commands:
- * <ul>
- *     <li>create 'test-table', 'someCf'</li>
- *     <li>put 'test-table', '1', 'someCf:someQual', 'someString'</li>
- *     <li>put 'test-table', '2', 'someCf:someQual', 'anotherString'</li>
- * </ul>
- * 
- * The test should return just the first entry.
- * 
- */
-public class HBaseReadExample {
-	public static void main(String[] args) throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		@SuppressWarnings("serial")
-		DataSet<Tuple2<String, String>> hbaseDs = env.createInput(new TableInputFormat<Tuple2<String, String>>() {
-			
-				@Override
-				public String getTableName() {
-					return HBaseFlinkTestConstants.TEST_TABLE_NAME;
-				}
-
-				@Override
-				protected Scan getScanner() {
-					Scan scan = new Scan();
-					scan.addColumn(HBaseFlinkTestConstants.CF_SOME, HBaseFlinkTestConstants.Q_SOME);
-					return scan;
-				}
-
-				private Tuple2<String, String> reuse = new Tuple2<String, String>();
-				
-				@Override
-				protected Tuple2<String, String> mapResultToTuple(Result r) {
-					String key = Bytes.toString(r.getRow());
-					String val = Bytes.toString(r.getValue(HBaseFlinkTestConstants.CF_SOME, HBaseFlinkTestConstants.Q_SOME));
-					reuse.setField(key, 0);
-					reuse.setField(val, 1);
-					return reuse;
-				}
-		})
-		.filter(new FilterFunction<Tuple2<String,String>>() {
-
-			@Override
-			public boolean filter(Tuple2<String, String> t) throws Exception {
-				String val = t.getField(1);
-				if(val.startsWith("someStr"))
-					return true;
-				return false;
-			}
-		});
-		
-		hbaseDs.print();
-		
-		// kick off execution.
-		env.execute();
-				
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java b/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
deleted file mode 100644
index 483bdff..0000000
--- a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.addons.hbase.example;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.Collector;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-
-/**
- * Simple stub for HBase DataSet write
- * 
- * To run the test first create the test table with hbase shell.
- * 
- * Use the following commands:
- * <ul>
- *     <li>create 'test-table', 'someCf'</li>
- * </ul>
- * 
- */
-@SuppressWarnings("serial")
-public class HBaseWriteExample {
-	
-	// *************************************************************************
-	//     PROGRAM
-	// *************************************************************************
-	
-	public static void main(String[] args) throws Exception {
-
-		if(!parseParameters(args)) {
-			return;
-		}
-		
-		// set up the execution environment
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		// get input data
-		DataSet<String> text = getTextDataSet(env);
-		
-		DataSet<Tuple2<String, Integer>> counts = 
-				// split up the lines in pairs (2-tuples) containing: (word,1)
-				text.flatMap(new Tokenizer())
-				// group by the tuple field "0" and sum up tuple field "1"
-				.groupBy(0)
-				.sum(1);
-
-		// emit result
-		Job job = Job.getInstance();
-		job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, outputTableName);
-		// TODO is "mapred.output.dir" really useful?
-		job.getConfiguration().set("mapred.output.dir",HBaseFlinkTestConstants.TMP_DIR);
-		counts.map(new RichMapFunction <Tuple2<String,Integer>, Tuple2<Text,Mutation>>() {
-			private transient Tuple2<Text, Mutation> reuse;
-
-			@Override
-			public void open(Configuration parameters) throws Exception {
-				super.open(parameters);
-				reuse = new Tuple2<Text, Mutation>();
-			}
-
-			@Override
-			public Tuple2<Text, Mutation> map(Tuple2<String, Integer> t) throws Exception {
-				reuse.f0 = new Text(t.f0);
-				Put put = new Put(t.f0.getBytes());
-				put.add(HBaseFlinkTestConstants.CF_SOME,HBaseFlinkTestConstants.Q_SOME, Bytes.toBytes(t.f1));
-				reuse.f1 = put;
-				return reuse;
-			}
-		}).output(new HadoopOutputFormat<Text, Mutation>(new TableOutputFormat<Text>(), job));
-		
-		// execute program
-		env.execute("WordCount (HBase sink) Example");
-	}
-	
-	// *************************************************************************
-	//     USER FUNCTIONS
-	// *************************************************************************
-	
-	/**
-	 * Implements the string tokenizer that splits sentences into words as a user-defined
-	 * FlatMapFunction. The function takes a line (String) and splits it into 
-	 * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
-	 */
-	public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
-
-		@Override
-		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
-			// normalize and split the line
-			String[] tokens = value.toLowerCase().split("\\W+");
-			
-			// emit the pairs
-			for (String token : tokens) {
-				if (token.length() > 0) {
-					out.collect(new Tuple2<String, Integer>(token, 1));
-				}
-			}
-		}
-	}
-	
-	// *************************************************************************
-	//     UTIL METHODS
-	// *************************************************************************
-	private static boolean fileOutput = false;
-	private static String textPath;
-	private static String outputTableName = HBaseFlinkTestConstants.TEST_TABLE_NAME;
-	
-	private static boolean parseParameters(String[] args) {
-		
-		if(args.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-			if(args.length == 2) {
-				textPath = args[0];
-				outputTableName = args[1];
-			} else {
-				System.err.println("Usage: HBaseWriteExample <text path> <output table>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing HBaseWriteExample example with built-in default data.");
-			System.out.println("  Provide parameters to read input data from a file.");
-			System.out.println("  Usage: HBaseWriteExample <text path> <output table>");
-		}
-		return true;
-	}
-	
-	private static DataSet<String> getTextDataSet(ExecutionEnvironment env) {
-		if(fileOutput) {
-			// read the text file from given input path
-			return env.readTextFile(textPath);
-		} else {
-			// get default test text data
-			return getDefaultTextLineDataSet(env);
-		}
-	}
-	private static DataSet<String> getDefaultTextLineDataSet(ExecutionEnvironment env) {
-		return env.fromElements(WORDS);
-	}
-	private static final String[] WORDS = new String[] {
-		"To be, or not to be,--that is the question:--",
-		"Whether 'tis nobler in the mind to suffer",
-		"The slings and arrows of outrageous fortune",
-		"Or to take arms against a sea of troubles,",
-		"And by opposing end them?--To die,--to sleep,--",
-		"No more; and by a sleep to say we end",
-		"The heartache, and the thousand natural shocks",
-		"That flesh is heir to,--'tis a consummation",
-		"Devoutly to be wish'd. To die,--to sleep;--",
-		"To sleep! perchance to dream:--ay, there's the rub;",
-		"For in that sleep of death what dreams may come,",
-		"When we have shuffled off this mortal coil,",
-		"Must give us pause: there's the respect",
-		"That makes calamity of so long life;",
-		"For who would bear the whips and scorns of time,",
-		"The oppressor's wrong, the proud man's contumely,",
-		"The pangs of despis'd love, the law's delay,",
-		"The insolence of office, and the spurns",
-		"That patient merit of the unworthy takes,",
-		"When he himself might his quietus make",
-		"With a bare bodkin? who would these fardels bear,",
-		"To grunt and sweat under a weary life,",
-		"But that the dread of something after death,--",
-		"The undiscover'd country, from whose bourn",
-		"No traveller returns,--puzzles the will,",
-		"And makes us rather bear those ills we have",
-		"Than fly to others that we know not of?",
-		"Thus conscience does make cowards of us all;",
-		"And thus the native hue of resolution",
-		"Is sicklied o'er with the pale cast of thought;",
-		"And enterprises of great pith and moment,",
-		"With this regard, their currents turn awry,",
-		"And lose the name of action.--Soft you now!",
-		"The fair Ophelia!--Nymph, in thy orisons",
-		"Be all my sins remember'd."
-	};
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java b/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
deleted file mode 100644
index 05398db..0000000
--- a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.addons.hbase.example;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Bytes;
-
-/**
- * 
- * This is an example how to write streams into HBase. In this example the
- * stream will be written into a local Hbase but it is possible to adapt this
- * example for an HBase running in a cloud. You need a running local HBase with a
- * table "flinkExample" and a column "entry". If your HBase configuration does
- * not fit the hbase-site.xml in the resource folder then you gave to delete temporary this
- * hbase-site.xml to execute the example properly.
- * 
- */
-public class HBaseWriteStreamExample {
-
-	public static void main(String[] args) throws Exception {
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment
-				.getExecutionEnvironment();
-
-		// data stream with random numbers
-		DataStream<String> dataStream = env.addSource(new SourceFunction<String>() {
-			private static final long serialVersionUID = 1L;
-
-			private volatile boolean isRunning = true;
-
-			@Override
-			public void run(SourceContext<String> out) throws Exception {
-				while (isRunning) {
-					out.collect(String.valueOf(Math.floor(Math.random() * 100)));
-				}
-
-			}
-
-			@Override
-			public void cancel() {
-				isRunning = false;
-			}
-		});
-		dataStream.writeUsingOutputFormat(new HBaseOutputFormat());
-
-		env.execute();
-	}
-
-	/**
-	 * 
-	 * This class implements an OutputFormat for HBase
-	 *
-	 */
-	private static class HBaseOutputFormat implements OutputFormat<String> {
-
-		private org.apache.hadoop.conf.Configuration conf = null;
-		private HTable table = null;
-		private String taskNumber = null;
-		private int rowNumber = 0;
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void configure(Configuration parameters) {
-			conf = HBaseConfiguration.create();
-		}
-
-		@Override
-		public void open(int taskNumber, int numTasks) throws IOException {
-			table = new HTable(conf, "flinkExample");
-			this.taskNumber = String.valueOf(taskNumber);
-		}
-
-		@Override
-		public void writeRecord(String record) throws IOException {
-			Put put = new Put(Bytes.toBytes(taskNumber + rowNumber));
-			put.add(Bytes.toBytes("entry"), Bytes.toBytes("entry"),
-					Bytes.toBytes(rowNumber));
-			rowNumber++;
-			table.put(put);
-		}
-
-		@Override
-		public void close() throws IOException {
-			table.flushCommits();
-			table.close();
-		}
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hbase/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hbase/src/test/resources/log4j-test.properties b/flink-batch-connectors/flink-hbase/src/test/resources/log4j-test.properties
deleted file mode 100644
index 804ff45..0000000
--- a/flink-batch-connectors/flink-hbase/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,23 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-# http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-log4j.rootLogger=DEBUG, stdout
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.Target=System.out
-log4j.appender.stdout.threshold=INFO
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %-5p %30c{1}:%4L - %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hcatalog/pom.xml
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hcatalog/pom.xml b/flink-batch-connectors/flink-hcatalog/pom.xml
deleted file mode 100644
index 6889e5a..0000000
--- a/flink-batch-connectors/flink-hcatalog/pom.xml
+++ /dev/null
@@ -1,182 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-	
-	<modelVersion>4.0.0</modelVersion>
-	
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-batch-connectors</artifactId>
-		<version>1.2-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-hcatalog</artifactId>
-	<name>flink-hcatalog</name>
-
-	<packaging>jar</packaging>
-
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-java</artifactId>
-			<version>${project.version}</version>
-			<scope>provided</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-hadoop-compatibility_2.10</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.hive.hcatalog</groupId>
-			<artifactId>hcatalog-core</artifactId>
-			<version>0.12.0</version>
-			<exclusions>
-				<exclusion>
-					<groupId>org.json</groupId>
-					<artifactId>json</artifactId>
-				</exclusion>
-			</exclusions>
-		</dependency>
-
-		<dependency>
-			<groupId>org.scala-lang</groupId>
-			<artifactId>scala-library</artifactId>
-			<scope>provided</scope>
-		</dependency>
-
-	</dependencies>
-
-	<build>
-		<plugins>
-			<!-- Scala Compiler -->
-			<plugin>
-				<groupId>net.alchim31.maven</groupId>
-				<artifactId>scala-maven-plugin</artifactId>
-				<executions>
-					<!-- Run scala compiler in the process-resources phase, so that dependencies on
-						scala classes can be resolved later in the (Java) compile phase -->
-					<execution>
-						<id>scala-compile-first</id>
-						<phase>process-resources</phase>
-						<goals>
-							<goal>compile</goal>
-						</goals>
-					</execution>
-
-					<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
-						 scala classes can be resolved later in the (Java) test-compile phase -->
-					<execution>
-						<id>scala-test-compile</id>
-						<phase>process-test-resources</phase>
-						<goals>
-							<goal>testCompile</goal>
-						</goals>
-					</execution>
-				</executions>
-				<configuration>
-					<jvmArgs>
-						<jvmArg>-Xms128m</jvmArg>
-						<jvmArg>-Xmx512m</jvmArg>
-					</jvmArgs>
-				</configuration>
-			</plugin>
-
-			<!-- Eclipse Integration -->
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-eclipse-plugin</artifactId>
-				<version>2.8</version>
-				<configuration>
-					<downloadSources>true</downloadSources>
-					<projectnatures>
-						<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
-						<projectnature>org.eclipse.jdt.core.javanature</projectnature>
-					</projectnatures>
-					<buildcommands>
-						<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
-					</buildcommands>
-					<classpathContainers>
-						<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
-						<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
-					</classpathContainers>
-					<excludes>
-						<exclude>org.scala-lang:scala-library</exclude>
-						<exclude>org.scala-lang:scala-compiler</exclude>
-					</excludes>
-					<sourceIncludes>
-						<sourceInclude>**/*.scala</sourceInclude>
-						<sourceInclude>**/*.java</sourceInclude>
-					</sourceIncludes>
-				</configuration>
-			</plugin>
-
-			<!-- Adding scala source directories to build path -->
-			<plugin>
-				<groupId>org.codehaus.mojo</groupId>
-				<artifactId>build-helper-maven-plugin</artifactId>
-				<version>1.7</version>
-				<executions>
-					<!-- Add src/main/scala to eclipse build path -->
-					<execution>
-						<id>add-source</id>
-						<phase>generate-sources</phase>
-						<goals>
-							<goal>add-source</goal>
-						</goals>
-						<configuration>
-							<sources>
-								<source>src/main/scala</source>
-							</sources>
-						</configuration>
-					</execution>
-					<!-- Add src/test/scala to eclipse build path -->
-					<execution>
-						<id>add-test-source</id>
-						<phase>generate-test-sources</phase>
-						<goals>
-							<goal>add-test-source</goal>
-						</goals>
-						<configuration>
-							<sources>
-								<source>src/test/scala</source>
-							</sources>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-
-			<!-- Scala Code Style, most of the configuration done via plugin management -->
-			<plugin>
-				<groupId>org.scalastyle</groupId>
-				<artifactId>scalastyle-maven-plugin</artifactId>
-				<configuration>
-					<configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
-				</configuration>
-			</plugin>
-
-		</plugins>
-	</build>
-
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java b/flink-batch-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
deleted file mode 100644
index 859b706..0000000
--- a/flink-batch-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
+++ /dev/null
@@ -1,410 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hcatalog;
-
-import org.apache.flink.api.common.io.RichInputFormat;
-import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils;
-import org.apache.flink.api.java.hadoop.mapreduce.wrapper.HadoopInputSplit;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.WritableTypeInfo;
-import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hive.hcatalog.common.HCatException;
-import org.apache.hive.hcatalog.common.HCatUtil;
-import org.apache.hive.hcatalog.data.DefaultHCatRecord;
-import org.apache.hive.hcatalog.data.HCatRecord;
-import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
-import org.apache.hive.hcatalog.data.schema.HCatSchema;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A InputFormat to read from HCatalog tables.
- * The InputFormat supports projection (selection and order of fields) and partition filters.
- *
- * Data can be returned as {@link org.apache.hive.hcatalog.data.HCatRecord} or Flink-native tuple.
- *
- * Note: Flink tuples might only support a limited number of fields (depending on the API).
- *
- * @param <T>
- */
-public abstract class HCatInputFormatBase<T> extends RichInputFormat<T, HadoopInputSplit> implements ResultTypeQueryable<T> {
-
-	private static final long serialVersionUID = 1L;
-
-	private Configuration configuration;
-
-	private org.apache.hive.hcatalog.mapreduce.HCatInputFormat hCatInputFormat;
-	private RecordReader<WritableComparable, HCatRecord> recordReader;
-	private boolean fetched = false;
-	private boolean hasNext;
-
-	protected String[] fieldNames = new String[0];
-	protected HCatSchema outputSchema;
-
-	private TypeInformation<T> resultType;
-
-	public HCatInputFormatBase() { }
-
-	/**
-	 * Creates a HCatInputFormat for the given database and table.
-	 * By default, the InputFormat returns {@link org.apache.hive.hcatalog.data.HCatRecord}.
-	 * The return type of the InputFormat can be changed to Flink-native tuples by calling
-	 * {@link HCatInputFormatBase#asFlinkTuples()}.
-	 *
-	 * @param database The name of the database to read from.
-	 * @param table The name of the table to read.
-	 * @throws java.io.IOException
-	 */
-	public HCatInputFormatBase(String database, String table) throws IOException {
-		this(database, table, new Configuration());
-	}
-
-	/**
-	 * Creates a HCatInputFormat for the given database, table, and
-	 * {@link org.apache.hadoop.conf.Configuration}.
-	 * By default, the InputFormat returns {@link org.apache.hive.hcatalog.data.HCatRecord}.
-	 * The return type of the InputFormat can be changed to Flink-native tuples by calling
-	 * {@link HCatInputFormatBase#asFlinkTuples()}.
-	 *
-	 * @param database The name of the database to read from.
-	 * @param table The name of the table to read.
-	 * @param config The Configuration for the InputFormat.
-	 * @throws java.io.IOException
-	 */
-	public HCatInputFormatBase(String database, String table, Configuration config) throws IOException {
-		super();
-		this.configuration = config;
-		HadoopUtils.mergeHadoopConf(this.configuration);
-
-		this.hCatInputFormat = org.apache.hive.hcatalog.mapreduce.HCatInputFormat.setInput(this.configuration, database, table);
-		this.outputSchema = org.apache.hive.hcatalog.mapreduce.HCatInputFormat.getTableSchema(this.configuration);
-
-		// configure output schema of HCatFormat
-		configuration.set("mapreduce.lib.hcat.output.schema", HCatUtil.serialize(outputSchema));
-		// set type information
-		this.resultType = new WritableTypeInfo(DefaultHCatRecord.class);
-	}
-
-	/**
-	 * Specifies the fields which are returned by the InputFormat and their order.
-	 *
-	 * @param fields The fields and their order which are returned by the InputFormat.
-	 * @return This InputFormat with specified return fields.
-	 * @throws java.io.IOException
-	 */
-	public HCatInputFormatBase<T> getFields(String... fields) throws IOException {
-
-		// build output schema
-		ArrayList<HCatFieldSchema> fieldSchemas = new ArrayList<HCatFieldSchema>(fields.length);
-		for(String field : fields) {
-			fieldSchemas.add(this.outputSchema.get(field));
-		}
-		this.outputSchema = new HCatSchema(fieldSchemas);
-
-		// update output schema configuration
-		configuration.set("mapreduce.lib.hcat.output.schema", HCatUtil.serialize(outputSchema));
-
-		return this;
-	}
-
-	/**
-	 * Specifies a SQL-like filter condition on the table's partition columns.
-	 * Filter conditions on non-partition columns are invalid.
-	 * A partition filter can significantly reduce the amount of data to be read.
-	 *
-	 * @param filter A SQL-like filter condition on the table's partition columns.
-	 * @return This InputFormat with specified partition filter.
-	 * @throws java.io.IOException
-	 */
-	public HCatInputFormatBase<T> withFilter(String filter) throws IOException {
-
-		// set filter
-		this.hCatInputFormat.setFilter(filter);
-
-		return this;
-	}
-
-	/**
-	 * Specifies that the InputFormat returns Flink tuples instead of
-	 * {@link org.apache.hive.hcatalog.data.HCatRecord}.
-	 *
-	 * Note: Flink tuples might only support a limited number of fields (depending on the API).
-	 *
-	 * @return This InputFormat.
-	 * @throws org.apache.hive.hcatalog.common.HCatException
-	 */
-	public HCatInputFormatBase<T> asFlinkTuples() throws HCatException {
-
-		// build type information
-		int numFields = outputSchema.getFields().size();
-		if(numFields > this.getMaxFlinkTupleSize()) {
-			throw new IllegalArgumentException("Only up to "+this.getMaxFlinkTupleSize()+
-					" fields can be returned as Flink tuples.");
-		}
-
-		TypeInformation[] fieldTypes = new TypeInformation[numFields];
-		fieldNames = new String[numFields];
-		for (String fieldName : outputSchema.getFieldNames()) {
-			HCatFieldSchema field = outputSchema.get(fieldName);
-
-			int fieldPos = outputSchema.getPosition(fieldName);
-			TypeInformation fieldType = getFieldType(field);
-
-			fieldTypes[fieldPos] = fieldType;
-			fieldNames[fieldPos] = fieldName;
-
-		}
-		this.resultType = new TupleTypeInfo(fieldTypes);
-
-		return this;
-	}
-
-	protected abstract int getMaxFlinkTupleSize();
-
-	private TypeInformation getFieldType(HCatFieldSchema fieldSchema) {
-
-		switch(fieldSchema.getType()) {
-			case INT:
-				return BasicTypeInfo.INT_TYPE_INFO;
-			case TINYINT:
-				return BasicTypeInfo.BYTE_TYPE_INFO;
-			case SMALLINT:
-				return BasicTypeInfo.SHORT_TYPE_INFO;
-			case BIGINT:
-				return BasicTypeInfo.LONG_TYPE_INFO;
-			case BOOLEAN:
-				return BasicTypeInfo.BOOLEAN_TYPE_INFO;
-			case FLOAT:
-				return BasicTypeInfo.FLOAT_TYPE_INFO;
-			case DOUBLE:
-				return BasicTypeInfo.DOUBLE_TYPE_INFO;
-			case STRING:
-				return BasicTypeInfo.STRING_TYPE_INFO;
-			case BINARY:
-				return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
-			case ARRAY:
-				return new GenericTypeInfo(List.class);
-			case MAP:
-				return new GenericTypeInfo(Map.class);
-			case STRUCT:
-				return new GenericTypeInfo(List.class);
-			default:
-				throw new IllegalArgumentException("Unknown data type \""+fieldSchema.getType()+"\" encountered.");
-		}
-	}
-
-	/**
-	 * Returns the {@link org.apache.hadoop.conf.Configuration} of the HCatInputFormat.
-	 *
-	 * @return The Configuration of the HCatInputFormat.
-	 */
-	public Configuration getConfiguration() {
-		return this.configuration;
-	}
-
-	/**
-	 * Returns the {@link org.apache.hive.hcatalog.data.schema.HCatSchema} of the {@link org.apache.hive.hcatalog.data.HCatRecord}
-	 * returned by this InputFormat.
-	 *
-	 * @return The HCatSchema of the HCatRecords returned by this InputFormat.
-	 */
-	public HCatSchema getOutputSchema() {
-		return this.outputSchema;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  InputFormat
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public void configure(org.apache.flink.configuration.Configuration parameters) {
-		// nothing to do
-	}
-
-	@Override
-	public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException {
-		// no statistics provided at the moment
-		return null;
-	}
-
-	@Override
-	public HadoopInputSplit[] createInputSplits(int minNumSplits)
-			throws IOException {
-		configuration.setInt("mapreduce.input.fileinputformat.split.minsize", minNumSplits);
-
-		JobContext jobContext = null;
-		try {
-			jobContext = HadoopUtils.instantiateJobContext(configuration, new JobID());
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
-
-		List<InputSplit> splits;
-		try {
-			splits = this.hCatInputFormat.getSplits(jobContext);
-		} catch (InterruptedException e) {
-			throw new IOException("Could not get Splits.", e);
-		}
-		HadoopInputSplit[] hadoopInputSplits = new HadoopInputSplit[splits.size()];
-
-		for(int i = 0; i < hadoopInputSplits.length; i++){
-			hadoopInputSplits[i] = new HadoopInputSplit(i, splits.get(i), jobContext);
-		}
-		return hadoopInputSplits;
-	}
-
-	@Override
-	public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] inputSplits) {
-		return new LocatableInputSplitAssigner(inputSplits);
-	}
-
-	@Override
-	public void open(HadoopInputSplit split) throws IOException {
-		TaskAttemptContext context = null;
-		try {
-			context = HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID());
-		} catch(Exception e) {
-			throw new RuntimeException(e);
-		}
-
-		try {
-			this.recordReader = this.hCatInputFormat
-					.createRecordReader(split.getHadoopInputSplit(), context);
-			this.recordReader.initialize(split.getHadoopInputSplit(), context);
-		} catch (InterruptedException e) {
-			throw new IOException("Could not create RecordReader.", e);
-		} finally {
-			this.fetched = false;
-		}
-	}
-
-	@Override
-	public boolean reachedEnd() throws IOException {
-		if(!this.fetched) {
-			fetchNext();
-		}
-		return !this.hasNext;
-	}
-
-	private void fetchNext() throws IOException {
-		try {
-			this.hasNext = this.recordReader.nextKeyValue();
-		} catch (InterruptedException e) {
-			throw new IOException("Could not fetch next KeyValue pair.", e);
-		} finally {
-			this.fetched = true;
-		}
-	}
-
-	@Override
-	public T nextRecord(T record) throws IOException {
-		if(!this.fetched) {
-			// first record
-			fetchNext();
-		}
-		if(!this.hasNext) {
-			return null;
-		}
-		try {
-
-			// get next HCatRecord
-			HCatRecord v = this.recordReader.getCurrentValue();
-			this.fetched = false;
-
-			if(this.fieldNames.length > 0) {
-				// return as Flink tuple
-				return this.buildFlinkTuple(record, v);
-
-			} else {
-				// return as HCatRecord
-				return (T)v;
-			}
-
-		} catch (InterruptedException e) {
-			throw new IOException("Could not get next record.", e);
-		}
-	}
-
-	protected abstract T buildFlinkTuple(T t, HCatRecord record) throws HCatException;
-
-	@Override
-	public void close() throws IOException {
-		this.recordReader.close();
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Custom de/serialization methods
-	// --------------------------------------------------------------------------------------------
-
-	private void writeObject(ObjectOutputStream out) throws IOException {
-		out.writeInt(this.fieldNames.length);
-		for(String fieldName : this.fieldNames) {
-			out.writeUTF(fieldName);
-		}
-		this.configuration.write(out);
-	}
-
-	@SuppressWarnings("unchecked")
-	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-		this.fieldNames = new String[in.readInt()];
-		for(int i=0; i<this.fieldNames.length; i++) {
-			this.fieldNames[i] = in.readUTF();
-		}
-
-		Configuration configuration = new Configuration();
-		configuration.readFields(in);
-
-		if(this.configuration == null) {
-			this.configuration = configuration;
-		}
-
-		this.hCatInputFormat = new org.apache.hive.hcatalog.mapreduce.HCatInputFormat();
-		this.outputSchema = (HCatSchema)HCatUtil.deserialize(this.configuration.get("mapreduce.lib.hcat.output.schema"));
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Result type business
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public TypeInformation<T> getProducedType() {
-		return this.resultType;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java b/flink-batch-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
deleted file mode 100644
index 46f3cd5..0000000
--- a/flink-batch-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hcatalog.java;
-
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.hcatalog.HCatInputFormatBase;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hive.hcatalog.common.HCatException;
-import org.apache.hive.hcatalog.data.HCatRecord;
-
-/**
- * A InputFormat to read from HCatalog tables.
- * The InputFormat supports projection (selection and order of fields) and partition filters.
- *
- * Data can be returned as {@link HCatRecord} or Flink {@link org.apache.flink.api.java.tuple.Tuple}.
- * Flink tuples support only up to 25 fields.
- *
- * @param <T>
- */
-public class HCatInputFormat<T> extends HCatInputFormatBase<T> {
-	private static final long serialVersionUID = 1L;
-
-	public HCatInputFormat() {}
-
-	public HCatInputFormat(String database, String table) throws Exception {
-		super(database, table);
-	}
-
-	public HCatInputFormat(String database, String table, Configuration config) throws Exception {
-		super(database, table, config);
-	}
-
-
-	@Override
-	protected int getMaxFlinkTupleSize() {
-		return 25;
-	}
-
-	@Override
-	protected T buildFlinkTuple(T t, HCatRecord record) throws HCatException {
-
-		Tuple tuple = (Tuple)t;
-
-		// Extract all fields from HCatRecord
-		for(int i=0; i < this.fieldNames.length; i++) {
-
-			// get field value
-			Object o = record.get(this.fieldNames[i], this.outputSchema);
-
-			// Set field value in Flink tuple.
-			// Partition columns are returned as String and
-			//   need to be converted to original type.
-			switch(this.outputSchema.get(i).getType()) {
-				case INT:
-					if(o instanceof String) {
-						tuple.setField(Integer.parseInt((String) o), i);
-					} else {
-						tuple.setField(o, i);
-					}
-					break;
-				case TINYINT:
-					if(o instanceof String) {
-						tuple.setField(Byte.parseByte((String) o), i);
-					} else {
-						tuple.setField(o, i);
-					}
-					break;
-				case SMALLINT:
-					if(o instanceof String) {
-						tuple.setField(Short.parseShort((String) o), i);
-					} else {
-						tuple.setField(o, i);
-					}
-					break;
-				case BIGINT:
-					if(o instanceof String) {
-						tuple.setField(Long.parseLong((String) o), i);
-					} else {
-						tuple.setField(o, i);
-					}
-					break;
-				case BOOLEAN:
-					if(o instanceof String) {
-						tuple.setField(Boolean.parseBoolean((String) o), i);
-					} else {
-						tuple.setField(o, i);
-					}
-					break;
-				case FLOAT:
-					if(o instanceof String) {
-						tuple.setField(Float.parseFloat((String) o), i);
-					} else {
-						tuple.setField(o, i);
-					}
-					break;
-				case DOUBLE:
-					if(o instanceof String) {
-						tuple.setField(Double.parseDouble((String) o), i);
-					} else {
-						tuple.setField(o, i);
-					}
-					break;
-				case STRING:
-					tuple.setField(o, i);
-					break;
-				case BINARY:
-					if(o instanceof String) {
-						throw new RuntimeException("Cannot handle partition keys of type BINARY.");
-					} else {
-						tuple.setField(o, i);
-					}
-					break;
-				case ARRAY:
-					if(o instanceof String) {
-						throw new RuntimeException("Cannot handle partition keys of type ARRAY.");
-					} else {
-						tuple.setField(o, i);
-					}
-					break;
-				case MAP:
-					if(o instanceof String) {
-						throw new RuntimeException("Cannot handle partition keys of type MAP.");
-					} else {
-						tuple.setField(o, i);
-					}
-					break;
-				case STRUCT:
-					if(o instanceof String) {
-						throw new RuntimeException("Cannot handle partition keys of type STRUCT.");
-					} else {
-						tuple.setField(o, i);
-					}
-					break;
-				default:
-					throw new RuntimeException("Invalid Type");
-			}
-		}
-
-		return (T)tuple;
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala b/flink-batch-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
deleted file mode 100644
index 0299ee1..0000000
--- a/flink-batch-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hcatalog.scala
-
-import org.apache.flink.configuration
-import org.apache.flink.hcatalog.HCatInputFormatBase
-import org.apache.hadoop.conf.Configuration
-import org.apache.hive.hcatalog.data.HCatRecord
-import org.apache.hive.hcatalog.data.schema.HCatFieldSchema
-
-/**
- * A InputFormat to read from HCatalog tables.
- * The InputFormat supports projection (selection and order of fields) and partition filters.
- *
- * Data can be returned as [[HCatRecord]] or Scala tuples.
- * Scala tuples support only up to 22 fields.
- *
- */
-class HCatInputFormat[T](
-                        database: String,
-                        table: String,
-                        config: Configuration
-                          ) extends HCatInputFormatBase[T](database, table, config) {
-
-  def this(database: String, table: String) {
-    this(database, table, new Configuration)
-  }
-
-  var vals: Array[Any] = Array[Any]()
-
-  override def configure(parameters: configuration.Configuration): Unit = {
-    super.configure(parameters)
-    vals = new Array[Any](fieldNames.length)
-  }
-
-  override protected def getMaxFlinkTupleSize: Int = 22
-
-  override protected def buildFlinkTuple(t: T, record: HCatRecord): T = {
-
-    // Extract all fields from HCatRecord
-    var i: Int = 0
-    while (i < this.fieldNames.length) {
-
-        val o: AnyRef = record.get(this.fieldNames(i), this.outputSchema)
-
-        // partition columns are returned as String
-        //   Check and convert to actual type.
-        this.outputSchema.get(i).getType match {
-          case HCatFieldSchema.Type.INT =>
-            if (o.isInstanceOf[String]) {
-              vals(i) = o.asInstanceOf[String].toInt
-            }
-            else {
-              vals(i) = o.asInstanceOf[Int]
-            }
-          case HCatFieldSchema.Type.TINYINT =>
-            if (o.isInstanceOf[String]) {
-              vals(i) = o.asInstanceOf[String].toInt.toByte
-            }
-            else {
-              vals(i) = o.asInstanceOf[Byte]
-            }
-          case HCatFieldSchema.Type.SMALLINT =>
-            if (o.isInstanceOf[String]) {
-              vals(i) = o.asInstanceOf[String].toInt.toShort
-            }
-            else {
-              vals(i) = o.asInstanceOf[Short]
-            }
-          case HCatFieldSchema.Type.BIGINT =>
-            if (o.isInstanceOf[String]) {
-              vals(i) = o.asInstanceOf[String].toLong
-            }
-            else {
-              vals(i) = o.asInstanceOf[Long]
-            }
-          case HCatFieldSchema.Type.BOOLEAN =>
-            if (o.isInstanceOf[String]) {
-              vals(i) = o.asInstanceOf[String].toBoolean
-            }
-            else {
-              vals(i) = o.asInstanceOf[Boolean]
-            }
-          case HCatFieldSchema.Type.FLOAT =>
-            if (o.isInstanceOf[String]) {
-              vals(i) = o.asInstanceOf[String].toFloat
-            }
-            else {
-              vals(i) = o.asInstanceOf[Float]
-            }
-          case HCatFieldSchema.Type.DOUBLE =>
-            if (o.isInstanceOf[String]) {
-              vals(i) = o.asInstanceOf[String].toDouble
-            }
-            else {
-              vals(i) = o.asInstanceOf[Double]
-            }
-          case HCatFieldSchema.Type.STRING =>
-            vals(i) = o
-          case HCatFieldSchema.Type.BINARY =>
-            if (o.isInstanceOf[String]) {
-              throw new RuntimeException("Cannot handle partition keys of type BINARY.")
-            }
-            else {
-              vals(i) = o.asInstanceOf[Array[Byte]]
-            }
-          case HCatFieldSchema.Type.ARRAY =>
-            if (o.isInstanceOf[String]) {
-              throw new RuntimeException("Cannot handle partition keys of type ARRAY.")
-            }
-            else {
-              vals(i) = o.asInstanceOf[List[Object]]
-            }
-          case HCatFieldSchema.Type.MAP =>
-            if (o.isInstanceOf[String]) {
-              throw new RuntimeException("Cannot handle partition keys of type MAP.")
-            }
-            else {
-              vals(i) = o.asInstanceOf[Map[Object, Object]]
-            }
-          case HCatFieldSchema.Type.STRUCT =>
-            if (o.isInstanceOf[String]) {
-              throw new RuntimeException("Cannot handle partition keys of type STRUCT.")
-            }
-            else {
-              vals(i) = o.asInstanceOf[List[Object]]
-            }
-          case _ =>
-            throw new RuntimeException("Invalid type " + this.outputSchema.get(i).getType +
-              " encountered.")
-        }
-
-        i += 1
-      }
-    createScalaTuple(vals)
-  }
-
-  private def createScalaTuple(vals: Array[Any]): T = {
-
-    this.fieldNames.length match {
-      case 1 =>
-        new Tuple1(vals(0)).asInstanceOf[T]
-      case 2 =>
-        new Tuple2(vals(0), vals(1)).asInstanceOf[T]
-      case 3 =>
-        new Tuple3(vals(0), vals(1), vals(2)).asInstanceOf[T]
-      case 4 =>
-        new Tuple4(vals(0), vals(1), vals(2), vals(3)).asInstanceOf[T]
-      case 5 =>
-        new Tuple5(vals(0), vals(1), vals(2), vals(3), vals(4)).asInstanceOf[T]
-      case 6 =>
-        new Tuple6(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5)).asInstanceOf[T]
-      case 7 =>
-        new Tuple7(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6)).asInstanceOf[T]
-      case 8 =>
-        new Tuple8(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7))
-          .asInstanceOf[T]
-      case 9 =>
-        new Tuple9(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
-          vals(8)).asInstanceOf[T]
-      case 10 =>
-        new Tuple10(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
-          vals(8), vals(9)).asInstanceOf[T]
-      case 11 =>
-        new Tuple11(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
-          vals(8), vals(9), vals(10)).asInstanceOf[T]
-      case 12 =>
-        new Tuple12(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
-          vals(8), vals(9), vals(10), vals(11)).asInstanceOf[T]
-      case 13 =>
-        new Tuple13(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
-          vals(8), vals(9), vals(10), vals(11), vals(12)).asInstanceOf[T]
-      case 14 =>
-        new Tuple14(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
-          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13)).asInstanceOf[T]
-      case 15 =>
-        new Tuple15(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
-          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14)).asInstanceOf[T]
-      case 16 =>
-        new Tuple16(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
-          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15))
-          .asInstanceOf[T]
-      case 17 =>
-        new Tuple17(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
-          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15),
-          vals(16)).asInstanceOf[T]
-      case 18 =>
-        new Tuple18(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
-          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15),
-          vals(16), vals(17)).asInstanceOf[T]
-      case 19 =>
-        new Tuple19(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
-          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15),
-          vals(16), vals(17), vals(18)).asInstanceOf[T]
-      case 20 =>
-        new Tuple20(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
-          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15),
-          vals(16), vals(17), vals(18), vals(19)).asInstanceOf[T]
-      case 21 =>
-        new Tuple21(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
-          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15),
-          vals(16), vals(17), vals(18), vals(19), vals(20)).asInstanceOf[T]
-      case 22 =>
-        new Tuple22(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
-          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15),
-          vals(16), vals(17), vals(18), vals(19), vals(20), vals(21)).asInstanceOf[T]
-      case _ =>
-        throw new RuntimeException("Only up to 22 fields supported for Scala Tuples.")
-
-  }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-jdbc/pom.xml b/flink-batch-connectors/flink-jdbc/pom.xml
deleted file mode 100644
index 40779ba..0000000
--- a/flink-batch-connectors/flink-jdbc/pom.xml
+++ /dev/null
@@ -1,66 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-	
-	<modelVersion>4.0.0</modelVersion>
-	
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-batch-connectors</artifactId>
-		<version>1.2-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-jdbc</artifactId>
-	<name>flink-jdbc</name>
-
-	<packaging>jar</packaging>
-
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table_2.10</artifactId>
-			<version>${project.version}</version>
-			<scope>provided</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-java</artifactId>
-			<version>${project.version}</version>
-			<scope>provided</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients_2.10</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.derby</groupId>
-			<artifactId>derby</artifactId>
-			<version>10.10.1.1</version>
-			<scope>test</scope>
-		</dependency>
-	</dependencies>
-</project>