You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:37 UTC
[47/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors]
Merge batch and streaming connectors into common Maven module.
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java b/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
deleted file mode 100644
index 35b0a7c..0000000
--- a/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
+++ /dev/null
@@ -1,289 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.addons.hbase;
-
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
-import org.apache.flink.api.common.io.RichInputFormat;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * {@link InputFormat} subclass that wraps the access for HTables.
- */
-public abstract class TableInputFormat<T extends Tuple> extends RichInputFormat<T, TableInputSplit> {
-
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG = LoggerFactory.getLogger(TableInputFormat.class);
-
- /** helper variable to decide whether the input is exhausted or not */
- private boolean endReached = false;
-
- protected transient HTable table = null;
- protected transient Scan scan = null;
-
- /** HBase iterator wrapper */
- private ResultScanner resultScanner = null;
-
- private byte[] lastRow;
- private int scannedRows;
-
- /**
- * Returns an instance of Scan that retrieves the required subset of records from the HBase table.
- * @return The appropriate instance of Scan for this usecase.
- */
- protected abstract Scan getScanner();
-
- /**
- * What table is to be read.
- * Per instance of a TableInputFormat derivative only a single tablename is possible.
- * @return The name of the table
- */
- protected abstract String getTableName();
-
- /**
- * The output from HBase is always an instance of {@link Result}.
- * This method is to copy the data in the Result instance into the required {@link Tuple}
- * @param r The Result instance from HBase that needs to be converted
- * @return The approriate instance of {@link Tuple} that contains the needed information.
- */
- protected abstract T mapResultToTuple(Result r);
-
- /**
- * Creates a {@link Scan} object and opens the {@link HTable} connection.
- * These are opened here because they are needed in the createInputSplits
- * which is called before the openInputFormat method.
- * So the connection is opened in {@link #configure(Configuration)} and closed in {@link #closeInputFormat()}.
- *
- * @param parameters The configuration that is to be used
- * @see Configuration
- */
- @Override
- public void configure(Configuration parameters) {
- table = createTable();
- if (table != null) {
- scan = getScanner();
- }
- }
-
- /**
- * Create an {@link HTable} instance and set it into this format
- */
- private HTable createTable() {
- LOG.info("Initializing HBaseConfiguration");
- //use files found in the classpath
- org.apache.hadoop.conf.Configuration hConf = HBaseConfiguration.create();
-
- try {
- return new HTable(hConf, getTableName());
- } catch (Exception e) {
- LOG.error("Error instantiating a new HTable instance", e);
- }
- return null;
- }
-
- @Override
- public void open(TableInputSplit split) throws IOException {
- if (table == null) {
- throw new IOException("The HBase table has not been opened!");
- }
- if (scan == null) {
- throw new IOException("getScanner returned null");
- }
- if (split == null) {
- throw new IOException("Input split is null!");
- }
-
- logSplitInfo("opening", split);
- scan.setStartRow(split.getStartRow());
- lastRow = split.getEndRow();
- scan.setStopRow(lastRow);
-
- resultScanner = table.getScanner(scan);
- endReached = false;
- scannedRows = 0;
- }
-
- @Override
- public boolean reachedEnd() throws IOException {
- return endReached;
- }
-
- @Override
- public T nextRecord(T reuse) throws IOException {
- if (resultScanner == null) {
- throw new IOException("No table result scanner provided!");
- }
- try {
- Result res = resultScanner.next();
- if (res != null) {
- scannedRows++;
- lastRow = res.getRow();
- return mapResultToTuple(res);
- }
- } catch (Exception e) {
- resultScanner.close();
- //workaround for timeout on scan
- LOG.warn("Error after scan of " + scannedRows + " rows. Retry with a new scanner...", e);
- scan.setStartRow(lastRow);
- resultScanner = table.getScanner(scan);
- Result res = resultScanner.next();
- if (res != null) {
- scannedRows++;
- lastRow = res.getRow();
- return mapResultToTuple(res);
- }
- }
-
- endReached = true;
- return null;
- }
-
- @Override
- public void close() throws IOException {
- LOG.info("Closing split (scanned {} rows)", scannedRows);
- lastRow = null;
- try {
- if (resultScanner != null) {
- resultScanner.close();
- }
- } finally {
- resultScanner = null;
- }
- }
-
- @Override
- public void closeInputFormat() throws IOException {
- try {
- if (table != null) {
- table.close();
- }
- } finally {
- table = null;
- }
- }
-
- @Override
- public TableInputSplit[] createInputSplits(final int minNumSplits) throws IOException {
- if (table == null) {
- throw new IOException("The HBase table has not been opened!");
- }
- if (scan == null) {
- throw new IOException("getScanner returned null");
- }
-
- //Gets the starting and ending row keys for every region in the currently open table
- final Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
- if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
- throw new IOException("Expecting at least one region.");
- }
- final byte[] startRow = scan.getStartRow();
- final byte[] stopRow = scan.getStopRow();
- final boolean scanWithNoLowerBound = startRow.length == 0;
- final boolean scanWithNoUpperBound = stopRow.length == 0;
-
- final List<TableInputSplit> splits = new ArrayList<TableInputSplit>(minNumSplits);
- for (int i = 0; i < keys.getFirst().length; i++) {
- final byte[] startKey = keys.getFirst()[i];
- final byte[] endKey = keys.getSecond()[i];
- final String regionLocation = table.getRegionLocation(startKey, false).getHostnamePort();
- //Test if the given region is to be included in the InputSplit while splitting the regions of a table
- if (!includeRegionInSplit(startKey, endKey)) {
- continue;
- }
- //Finds the region on which the given row is being served
- final String[] hosts = new String[]{regionLocation};
-
- // determine if regions contains keys used by the scan
- boolean isLastRegion = endKey.length == 0;
- if ((scanWithNoLowerBound || isLastRegion || Bytes.compareTo(startRow, endKey) < 0) &&
- (scanWithNoUpperBound || Bytes.compareTo(stopRow, startKey) > 0)) {
-
- final byte[] splitStart = scanWithNoLowerBound || Bytes.compareTo(startKey, startRow) >= 0 ? startKey : startRow;
- final byte[] splitStop = (scanWithNoUpperBound || Bytes.compareTo(endKey, stopRow) <= 0)
- && !isLastRegion ? endKey : stopRow;
- int id = splits.size();
- final TableInputSplit split = new TableInputSplit(id, hosts, table.getTableName(), splitStart, splitStop);
- splits.add(split);
- }
- }
- LOG.info("Created " + splits.size() + " splits");
- for (TableInputSplit split : splits) {
- logSplitInfo("created", split);
- }
- return splits.toArray(new TableInputSplit[0]);
- }
-
- private void logSplitInfo(String action, TableInputSplit split) {
- int splitId = split.getSplitNumber();
- String splitStart = Bytes.toString(split.getStartRow());
- String splitEnd = Bytes.toString(split.getEndRow());
- String splitStartKey = splitStart.isEmpty() ? "-" : splitStart;
- String splitStopKey = splitEnd.isEmpty() ? "-" : splitEnd;
- String[] hostnames = split.getHostnames();
- LOG.info("{} split (this={})[{}|{}|{}|{}]", action, this, splitId, hostnames, splitStartKey, splitStopKey);
- }
-
- /**
- * Test if the given region is to be included in the InputSplit while splitting the regions of a table.
- * <p>
- * This optimization is effective when there is a specific reasoning to exclude an entire region from the M-R job,
- * (and hence, not contributing to the InputSplit), given the start and end keys of the same. <br>
- * Useful when we need to remember the last-processed top record and revisit the [last, current) interval for M-R
- * processing, continuously. In addition to reducing InputSplits, reduces the load on the region server as well, due
- * to the ordering of the keys. <br>
- * <br>
- * Note: It is possible that <code>endKey.length() == 0 </code> , for the last (recent) region. <br>
- * Override this method, if you want to bulk exclude regions altogether from M-R. By default, no region is excluded(
- * i.e. all regions are included).
- *
- * @param startKey Start key of the region
- * @param endKey End key of the region
- * @return true, if this region needs to be included as part of the input (default).
- */
- protected boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) {
- return true;
- }
-
- @Override
- public InputSplitAssigner getInputSplitAssigner(TableInputSplit[] inputSplits) {
- return new LocatableInputSplitAssigner(inputSplits);
- }
-
- @Override
- public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java b/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
deleted file mode 100644
index 75f0b9b..0000000
--- a/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.addons.hbase;
-
-import org.apache.flink.core.io.LocatableInputSplit;
-
-/**
- * This class implements a input splits for HBase. Each table input split corresponds to a key range (low, high). All
- * references to row below refer to the key of the row.
- */
-public class TableInputSplit extends LocatableInputSplit {
-
- private static final long serialVersionUID = 1L;
-
- /** The name of the table to retrieve data from */
- private final byte[] tableName;
-
- /** The start row of the split. */
- private final byte[] startRow;
-
- /** The end row of the split. */
- private final byte[] endRow;
-
- /**
- * Creates a new table input split
- *
- * @param splitNumber
- * the number of the input split
- * @param hostnames
- * the names of the hosts storing the data the input split refers to
- * @param tableName
- * the name of the table to retrieve data from
- * @param startRow
- * the start row of the split
- * @param endRow
- * the end row of the split
- */
- TableInputSplit(final int splitNumber, final String[] hostnames, final byte[] tableName, final byte[] startRow,
- final byte[] endRow) {
- super(splitNumber, hostnames);
-
- this.tableName = tableName;
- this.startRow = startRow;
- this.endRow = endRow;
- }
-
- /**
- * Returns the table name.
- *
- * @return The table name.
- */
- public byte[] getTableName() {
- return this.tableName;
- }
-
- /**
- * Returns the start row.
- *
- * @return The start row.
- */
- public byte[] getStartRow() {
- return this.startRow;
- }
-
- /**
- * Returns the end row.
- *
- * @return The end row.
- */
- public byte[] getEndRow() {
- return this.endRow;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java b/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java
deleted file mode 100644
index 3d9f672..0000000
--- a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Copyright The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.addons.hbase;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.logging.impl.Log4JLogger;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MasterNotRunningException;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.ScannerCallable;
-import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
-import org.apache.hadoop.hbase.ipc.RpcServer;
-import org.apache.log4j.Level;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * By using this class as the super class of a set of tests you will have a HBase testing
- * cluster available that is very suitable for writing tests for scanning and filtering against.
- * This is usable by any downstream application because the HBase cluster is 'injected' because
- * a dynamically generated hbase-site.xml is added to the classpath.
- * Because of this classpath manipulation it is not possible to start a second testing cluster in the same JVM.
- * So if you have this you should either put all hbase related tests in a single class or force surefire to
- * setup a new JVM for each testclass.
- * See: http://maven.apache.org/surefire/maven-surefire-plugin/examples/fork-options-and-parallel-execution.html
- */
-//
-// NOTE: The code in this file is based on code from the
-// Apache HBase project, licensed under the Apache License v 2.0
-//
-// https://github.com/apache/hbase/blob/master/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/FilterTestingCluster.java
-//
-public class HBaseTestingClusterAutostarter implements Serializable {
-
- private static final Log LOG = LogFactory.getLog(HBaseTestingClusterAutostarter.class);
-
- private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
- private static HBaseAdmin admin = null;
- private static List<TableName> createdTables = new ArrayList<>();
-
- private static boolean alreadyRegisteredTestCluster = false;
-
- protected static void createTable(TableName tableName, byte[] columnFamilyName, byte[][] splitKeys) {
- LOG.info("HBase minicluster: Creating table " + tableName.getNameAsString());
-
- assertNotNull("HBaseAdmin is not initialized successfully.", admin);
- HTableDescriptor desc = new HTableDescriptor(tableName);
- HColumnDescriptor colDef = new HColumnDescriptor(columnFamilyName);
- desc.addFamily(colDef);
-
- try {
- admin.createTable(desc, splitKeys);
- createdTables.add(tableName);
- assertTrue("Fail to create the table", admin.tableExists(tableName));
- } catch (IOException e) {
- assertNull("Exception found while creating table", e);
- }
- }
-
- protected static HTable openTable(TableName tableName) throws IOException {
- HTable table = (HTable) admin.getConnection().getTable(tableName);
- assertTrue("Fail to create the table", admin.tableExists(tableName));
- return table;
- }
-
- private static void deleteTables() {
- if (admin != null) {
- for (TableName tableName : createdTables) {
- try {
- if (admin.tableExists(tableName)) {
- admin.disableTable(tableName);
- admin.deleteTable(tableName);
- }
- } catch (IOException e) {
- assertNull("Exception found deleting the table", e);
- }
- }
- }
- }
-
- private static void initialize(Configuration conf) {
- conf = HBaseConfiguration.create(conf);
- conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
- try {
- admin = TEST_UTIL.getHBaseAdmin();
- } catch (MasterNotRunningException e) {
- assertNull("Master is not running", e);
- } catch (ZooKeeperConnectionException e) {
- assertNull("Cannot connect to ZooKeeper", e);
- } catch (IOException e) {
- assertNull("IOException", e);
- }
- }
-
- @BeforeClass
- public static void setUp() throws Exception {
- LOG.info("HBase minicluster: Starting");
- ((Log4JLogger) RpcServer.LOG).getLogger().setLevel(Level.ALL);
- ((Log4JLogger) AbstractRpcClient.LOG).getLogger().setLevel(Level.ALL);
- ((Log4JLogger) ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
-
- TEST_UTIL.startMiniCluster(1);
-
- // https://issues.apache.org/jira/browse/HBASE-11711
- TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", -1);
-
- // Make sure the zookeeper quorum value contains the right port number (varies per run).
- TEST_UTIL.getConfiguration().set("hbase.zookeeper.quorum", "localhost:" + TEST_UTIL.getZkCluster().getClientPort());
-
- initialize(TEST_UTIL.getConfiguration());
- LOG.info("HBase minicluster: Running");
- }
-
- private static File hbaseSiteXmlDirectory;
- private static File hbaseSiteXmlFile;
-
- /**
- * This dynamically generates a hbase-site.xml file that is added to the classpath.
- * This way this HBaseMinicluster can be used by an unmodified application.
- * The downside is that this cannot be 'unloaded' so you can have only one per JVM.
- */
- public static void registerHBaseMiniClusterInClasspath() {
- if (alreadyRegisteredTestCluster) {
- fail("You CANNOT register a second HBase Testing cluster in the classpath of the SAME JVM");
- }
- File baseDir = new File(System.getProperty("java.io.tmpdir", "/tmp/"));
- hbaseSiteXmlDirectory = new File(baseDir, "unittest-hbase-minicluster-" + Math.abs(new Random().nextLong()) + "/");
-
- if (!hbaseSiteXmlDirectory.mkdirs()) {
- fail("Unable to create output directory " + hbaseSiteXmlDirectory + " for the HBase minicluster");
- }
-
- assertNotNull("The ZooKeeper for the HBase minicluster is missing", TEST_UTIL.getZkCluster());
-
- createHBaseSiteXml(hbaseSiteXmlDirectory, TEST_UTIL.getConfiguration().get("hbase.zookeeper.quorum"));
- addDirectoryToClassPath(hbaseSiteXmlDirectory);
-
- // Avoid starting it again.
- alreadyRegisteredTestCluster = true;
- }
-
- private static void createHBaseSiteXml(File hbaseSiteXmlDirectory, String zookeeperQuorum) {
- hbaseSiteXmlFile = new File(hbaseSiteXmlDirectory, "hbase-site.xml");
- // Create the hbase-site.xml file for this run.
- try {
- String hbaseSiteXml = "<?xml version=\"1.0\"?>\n" +
- "<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?>\n" +
- "<configuration>\n" +
- " <property>\n" +
- " <name>hbase.zookeeper.quorum</name>\n" +
- " <value>" + zookeeperQuorum + "</value>\n" +
- " </property>\n" +
- "</configuration>";
- OutputStream fos = new FileOutputStream(hbaseSiteXmlFile);
- fos.write(hbaseSiteXml.getBytes(StandardCharsets.UTF_8));
- fos.close();
- } catch (IOException e) {
- fail("Unable to create " + hbaseSiteXmlFile);
- }
- }
-
- private static void addDirectoryToClassPath(File directory) {
- try {
- // Get the classloader actually used by HBaseConfiguration
- ClassLoader classLoader = HBaseConfiguration.create().getClassLoader();
- if (!(classLoader instanceof URLClassLoader)) {
- fail("We should get a URLClassLoader");
- }
-
- // Make the addURL method accessible
- Method method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
- method.setAccessible(true);
-
- // Add the directory where we put the hbase-site.xml to the classpath
- method.invoke(classLoader, directory.toURI().toURL());
- } catch (MalformedURLException | NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
- fail("Unable to add " + directory + " to classpath because of this exception: " + e.getMessage());
- }
- }
-
- @AfterClass
- public static void tearDown() throws Exception {
- LOG.info("HBase minicluster: Shutting down");
- deleteTables();
- hbaseSiteXmlFile.delete();
- hbaseSiteXmlDirectory.delete();
- TEST_UTIL.shutdownMiniCluster();
- LOG.info("HBase minicluster: Down");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java b/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java
deleted file mode 100644
index 3dddd88..0000000
--- a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.addons.hbase;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class TableInputFormatITCase extends HBaseTestingClusterAutostarter {
- private static final String TEST_TABLE_NAME = "TableInputFormatTestTable";
- private static final byte[] TEST_TABLE_FAMILY_NAME = "F".getBytes();
- private static final byte[] TEST_TABLE_COLUMN_NAME = "Col".getBytes();
-
- // These are the row ids AND also the values we will put in the test table
- private static final String[] ROW_IDS = {"000", "111", "222", "333", "444", "555", "666", "777", "888", "999"};
-
- @BeforeClass
- public static void activateHBaseCluster(){
- registerHBaseMiniClusterInClasspath();
- }
-
- @Before
- public void createTestTable() throws IOException {
- TableName tableName = TableName.valueOf(TEST_TABLE_NAME);
- byte[][] splitKeys = {"0".getBytes(), "3".getBytes(), "6".getBytes(), "9".getBytes()};
- createTable(tableName, TEST_TABLE_FAMILY_NAME, splitKeys);
- HTable table = openTable(tableName);
-
- for (String rowId : ROW_IDS) {
- byte[] rowIdBytes = rowId.getBytes();
- Put p = new Put(rowIdBytes);
- // Use the rowId as the value to facilitate the testing better
- p.add(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME, rowIdBytes);
- table.put(p);
- }
-
- table.close();
- }
-
- class InputFormatForTestTable extends TableInputFormat<Tuple1<String>> {
- @Override
- protected Scan getScanner() {
- return new Scan();
- }
-
- @Override
- protected String getTableName() {
- return TEST_TABLE_NAME;
- }
-
- @Override
- protected Tuple1<String> mapResultToTuple(Result r) {
- return new Tuple1<>(new String(r.getValue(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME)));
- }
- }
-
- @Test
- public void testTableInputFormat() {
- ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
- environment.setParallelism(1);
-
- DataSet<String> resultDataSet =
- environment.createInput(new InputFormatForTestTable()).map(new MapFunction<Tuple1<String>, String>() {
- @Override
- public String map(Tuple1<String> value) throws Exception {
- return value.f0;
- }
- });
-
- List<String> resultSet = new ArrayList<>();
- resultDataSet.output(new LocalCollectionOutputFormat<>(resultSet));
-
- try {
- environment.execute("HBase InputFormat Test");
- } catch (Exception e) {
- Assert.fail("HBase InputFormat test failed. " + e.getMessage());
- }
-
- for (String rowId : ROW_IDS) {
- assertTrue("Missing rowId from table: " + rowId, resultSet.contains(rowId));
- }
-
- assertEquals("The number of records is wrong.", ROW_IDS.length, resultSet.size());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java b/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
deleted file mode 100644
index 8579dee..0000000
--- a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.addons.hbase.example;
-
-public class HBaseFlinkTestConstants {
-
- public static final byte[] CF_SOME = "someCf".getBytes();
- public static final byte[] Q_SOME = "someQual".getBytes();
- public static final String TEST_TABLE_NAME = "test-table";
- public static final String TMP_DIR = "/tmp/test";
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java b/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
deleted file mode 100644
index dccf876..0000000
--- a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.addons.hbase.example;
-
-import org.apache.flink.addons.hbase.TableInputFormat;
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-
-/**
- * Simple stub for HBase DataSet read
- *
- * To run the test first create the test table with hbase shell.
- *
- * Use the following commands:
- * <ul>
- * <li>create 'test-table', 'someCf'</li>
- * <li>put 'test-table', '1', 'someCf:someQual', 'someString'</li>
- * <li>put 'test-table', '2', 'someCf:someQual', 'anotherString'</li>
- * </ul>
- *
- * The test should return just the first entry.
- *
- */
-public class HBaseReadExample {
- public static void main(String[] args) throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- @SuppressWarnings("serial")
- DataSet<Tuple2<String, String>> hbaseDs = env.createInput(new TableInputFormat<Tuple2<String, String>>() {
-
- @Override
- public String getTableName() {
- return HBaseFlinkTestConstants.TEST_TABLE_NAME;
- }
-
- @Override
- protected Scan getScanner() {
- Scan scan = new Scan();
- scan.addColumn(HBaseFlinkTestConstants.CF_SOME, HBaseFlinkTestConstants.Q_SOME);
- return scan;
- }
-
- private Tuple2<String, String> reuse = new Tuple2<String, String>();
-
- @Override
- protected Tuple2<String, String> mapResultToTuple(Result r) {
- String key = Bytes.toString(r.getRow());
- String val = Bytes.toString(r.getValue(HBaseFlinkTestConstants.CF_SOME, HBaseFlinkTestConstants.Q_SOME));
- reuse.setField(key, 0);
- reuse.setField(val, 1);
- return reuse;
- }
- })
- .filter(new FilterFunction<Tuple2<String,String>>() {
-
- @Override
- public boolean filter(Tuple2<String, String> t) throws Exception {
- String val = t.getField(1);
- if(val.startsWith("someStr"))
- return true;
- return false;
- }
- });
-
- hbaseDs.print();
-
- // kick off execution.
- env.execute();
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java b/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
deleted file mode 100644
index 483bdff..0000000
--- a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.addons.hbase.example;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.Collector;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-
-/**
- * Simple stub for HBase DataSet write
- *
- * To run the test first create the test table with hbase shell.
- *
- * Use the following commands:
- * <ul>
- * <li>create 'test-table', 'someCf'</li>
- * </ul>
- *
- */
-@SuppressWarnings("serial")
-public class HBaseWriteExample {
-
- // *************************************************************************
- // PROGRAM
- // *************************************************************************
-
- public static void main(String[] args) throws Exception {
-
- if(!parseParameters(args)) {
- return;
- }
-
- // set up the execution environment
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- // get input data
- DataSet<String> text = getTextDataSet(env);
-
- DataSet<Tuple2<String, Integer>> counts =
- // split up the lines in pairs (2-tuples) containing: (word,1)
- text.flatMap(new Tokenizer())
- // group by the tuple field "0" and sum up tuple field "1"
- .groupBy(0)
- .sum(1);
-
- // emit result
- Job job = Job.getInstance();
- job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, outputTableName);
- // TODO is "mapred.output.dir" really useful?
- job.getConfiguration().set("mapred.output.dir",HBaseFlinkTestConstants.TMP_DIR);
- counts.map(new RichMapFunction <Tuple2<String,Integer>, Tuple2<Text,Mutation>>() {
- private transient Tuple2<Text, Mutation> reuse;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- reuse = new Tuple2<Text, Mutation>();
- }
-
- @Override
- public Tuple2<Text, Mutation> map(Tuple2<String, Integer> t) throws Exception {
- reuse.f0 = new Text(t.f0);
- Put put = new Put(t.f0.getBytes());
- put.add(HBaseFlinkTestConstants.CF_SOME,HBaseFlinkTestConstants.Q_SOME, Bytes.toBytes(t.f1));
- reuse.f1 = put;
- return reuse;
- }
- }).output(new HadoopOutputFormat<Text, Mutation>(new TableOutputFormat<Text>(), job));
-
- // execute program
- env.execute("WordCount (HBase sink) Example");
- }
-
- // *************************************************************************
- // USER FUNCTIONS
- // *************************************************************************
-
- /**
- * Implements the string tokenizer that splits sentences into words as a user-defined
- * FlatMapFunction. The function takes a line (String) and splits it into
- * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
- */
- public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
-
- @Override
- public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
- // normalize and split the line
- String[] tokens = value.toLowerCase().split("\\W+");
-
- // emit the pairs
- for (String token : tokens) {
- if (token.length() > 0) {
- out.collect(new Tuple2<String, Integer>(token, 1));
- }
- }
- }
- }
-
- // *************************************************************************
- // UTIL METHODS
- // *************************************************************************
- private static boolean fileOutput = false;
- private static String textPath;
- private static String outputTableName = HBaseFlinkTestConstants.TEST_TABLE_NAME;
-
- private static boolean parseParameters(String[] args) {
-
- if(args.length > 0) {
- // parse input arguments
- fileOutput = true;
- if(args.length == 2) {
- textPath = args[0];
- outputTableName = args[1];
- } else {
- System.err.println("Usage: HBaseWriteExample <text path> <output table>");
- return false;
- }
- } else {
- System.out.println("Executing HBaseWriteExample example with built-in default data.");
- System.out.println(" Provide parameters to read input data from a file.");
- System.out.println(" Usage: HBaseWriteExample <text path> <output table>");
- }
- return true;
- }
-
- private static DataSet<String> getTextDataSet(ExecutionEnvironment env) {
- if(fileOutput) {
- // read the text file from given input path
- return env.readTextFile(textPath);
- } else {
- // get default test text data
- return getDefaultTextLineDataSet(env);
- }
- }
- private static DataSet<String> getDefaultTextLineDataSet(ExecutionEnvironment env) {
- return env.fromElements(WORDS);
- }
- private static final String[] WORDS = new String[] {
- "To be, or not to be,--that is the question:--",
- "Whether 'tis nobler in the mind to suffer",
- "The slings and arrows of outrageous fortune",
- "Or to take arms against a sea of troubles,",
- "And by opposing end them?--To die,--to sleep,--",
- "No more; and by a sleep to say we end",
- "The heartache, and the thousand natural shocks",
- "That flesh is heir to,--'tis a consummation",
- "Devoutly to be wish'd. To die,--to sleep;--",
- "To sleep! perchance to dream:--ay, there's the rub;",
- "For in that sleep of death what dreams may come,",
- "When we have shuffled off this mortal coil,",
- "Must give us pause: there's the respect",
- "That makes calamity of so long life;",
- "For who would bear the whips and scorns of time,",
- "The oppressor's wrong, the proud man's contumely,",
- "The pangs of despis'd love, the law's delay,",
- "The insolence of office, and the spurns",
- "That patient merit of the unworthy takes,",
- "When he himself might his quietus make",
- "With a bare bodkin? who would these fardels bear,",
- "To grunt and sweat under a weary life,",
- "But that the dread of something after death,--",
- "The undiscover'd country, from whose bourn",
- "No traveller returns,--puzzles the will,",
- "And makes us rather bear those ills we have",
- "Than fly to others that we know not of?",
- "Thus conscience does make cowards of us all;",
- "And thus the native hue of resolution",
- "Is sicklied o'er with the pale cast of thought;",
- "And enterprises of great pith and moment,",
- "With this regard, their currents turn awry,",
- "And lose the name of action.--Soft you now!",
- "The fair Ophelia!--Nymph, in thy orisons",
- "Be all my sins remember'd."
- };
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java b/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
deleted file mode 100644
index 05398db..0000000
--- a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.addons.hbase.example;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Bytes;
-
-/**
- *
- * This is an example how to write streams into HBase. In this example the
- * stream will be written into a local Hbase but it is possible to adapt this
- * example for an HBase running in a cloud. You need a running local HBase with a
- * table "flinkExample" and a column "entry". If your HBase configuration does
- * not fit the hbase-site.xml in the resource folder then you gave to delete temporary this
- * hbase-site.xml to execute the example properly.
- *
- */
-public class HBaseWriteStreamExample {
-
- public static void main(String[] args) throws Exception {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment
- .getExecutionEnvironment();
-
- // data stream with random numbers
- DataStream<String> dataStream = env.addSource(new SourceFunction<String>() {
- private static final long serialVersionUID = 1L;
-
- private volatile boolean isRunning = true;
-
- @Override
- public void run(SourceContext<String> out) throws Exception {
- while (isRunning) {
- out.collect(String.valueOf(Math.floor(Math.random() * 100)));
- }
-
- }
-
- @Override
- public void cancel() {
- isRunning = false;
- }
- });
- dataStream.writeUsingOutputFormat(new HBaseOutputFormat());
-
- env.execute();
- }
-
- /**
- *
- * This class implements an OutputFormat for HBase
- *
- */
- private static class HBaseOutputFormat implements OutputFormat<String> {
-
- private org.apache.hadoop.conf.Configuration conf = null;
- private HTable table = null;
- private String taskNumber = null;
- private int rowNumber = 0;
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void configure(Configuration parameters) {
- conf = HBaseConfiguration.create();
- }
-
- @Override
- public void open(int taskNumber, int numTasks) throws IOException {
- table = new HTable(conf, "flinkExample");
- this.taskNumber = String.valueOf(taskNumber);
- }
-
- @Override
- public void writeRecord(String record) throws IOException {
- Put put = new Put(Bytes.toBytes(taskNumber + rowNumber));
- put.add(Bytes.toBytes("entry"), Bytes.toBytes("entry"),
- Bytes.toBytes(rowNumber));
- rowNumber++;
- table.put(put);
- }
-
- @Override
- public void close() throws IOException {
- table.flushCommits();
- table.close();
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hbase/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hbase/src/test/resources/log4j-test.properties b/flink-batch-connectors/flink-hbase/src/test/resources/log4j-test.properties
deleted file mode 100644
index 804ff45..0000000
--- a/flink-batch-connectors/flink-hbase/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,23 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-log4j.rootLogger=DEBUG, stdout
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.Target=System.out
-log4j.appender.stdout.threshold=INFO
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %-5p %30c{1}:%4L - %m%n
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hcatalog/pom.xml
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hcatalog/pom.xml b/flink-batch-connectors/flink-hcatalog/pom.xml
deleted file mode 100644
index 6889e5a..0000000
--- a/flink-batch-connectors/flink-hcatalog/pom.xml
+++ /dev/null
@@ -1,182 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-batch-connectors</artifactId>
- <version>1.2-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>flink-hcatalog</artifactId>
- <name>flink-hcatalog</name>
-
- <packaging>jar</packaging>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-hadoop-compatibility_2.10</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hive.hcatalog</groupId>
- <artifactId>hcatalog-core</artifactId>
- <version>0.12.0</version>
- <exclusions>
- <exclusion>
- <groupId>org.json</groupId>
- <artifactId>json</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <scope>provided</scope>
- </dependency>
-
- </dependencies>
-
- <build>
- <plugins>
- <!-- Scala Compiler -->
- <plugin>
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <executions>
- <!-- Run scala compiler in the process-resources phase, so that dependencies on
- scala classes can be resolved later in the (Java) compile phase -->
- <execution>
- <id>scala-compile-first</id>
- <phase>process-resources</phase>
- <goals>
- <goal>compile</goal>
- </goals>
- </execution>
-
- <!-- Run scala compiler in the process-test-resources phase, so that dependencies on
- scala classes can be resolved later in the (Java) test-compile phase -->
- <execution>
- <id>scala-test-compile</id>
- <phase>process-test-resources</phase>
- <goals>
- <goal>testCompile</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <jvmArgs>
- <jvmArg>-Xms128m</jvmArg>
- <jvmArg>-Xmx512m</jvmArg>
- </jvmArgs>
- </configuration>
- </plugin>
-
- <!-- Eclipse Integration -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-eclipse-plugin</artifactId>
- <version>2.8</version>
- <configuration>
- <downloadSources>true</downloadSources>
- <projectnatures>
- <projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
- <projectnature>org.eclipse.jdt.core.javanature</projectnature>
- </projectnatures>
- <buildcommands>
- <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
- </buildcommands>
- <classpathContainers>
- <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
- <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
- </classpathContainers>
- <excludes>
- <exclude>org.scala-lang:scala-library</exclude>
- <exclude>org.scala-lang:scala-compiler</exclude>
- </excludes>
- <sourceIncludes>
- <sourceInclude>**/*.scala</sourceInclude>
- <sourceInclude>**/*.java</sourceInclude>
- </sourceIncludes>
- </configuration>
- </plugin>
-
- <!-- Adding scala source directories to build path -->
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <version>1.7</version>
- <executions>
- <!-- Add src/main/scala to eclipse build path -->
- <execution>
- <id>add-source</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>add-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>src/main/scala</source>
- </sources>
- </configuration>
- </execution>
- <!-- Add src/test/scala to eclipse build path -->
- <execution>
- <id>add-test-source</id>
- <phase>generate-test-sources</phase>
- <goals>
- <goal>add-test-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>src/test/scala</source>
- </sources>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <!-- Scala Code Style, most of the configuration done via plugin management -->
- <plugin>
- <groupId>org.scalastyle</groupId>
- <artifactId>scalastyle-maven-plugin</artifactId>
- <configuration>
- <configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
- </configuration>
- </plugin>
-
- </plugins>
- </build>
-
-</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java b/flink-batch-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
deleted file mode 100644
index 859b706..0000000
--- a/flink-batch-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
+++ /dev/null
@@ -1,410 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hcatalog;
-
-import org.apache.flink.api.common.io.RichInputFormat;
-import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils;
-import org.apache.flink.api.java.hadoop.mapreduce.wrapper.HadoopInputSplit;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.WritableTypeInfo;
-import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hive.hcatalog.common.HCatException;
-import org.apache.hive.hcatalog.common.HCatUtil;
-import org.apache.hive.hcatalog.data.DefaultHCatRecord;
-import org.apache.hive.hcatalog.data.HCatRecord;
-import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
-import org.apache.hive.hcatalog.data.schema.HCatSchema;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A InputFormat to read from HCatalog tables.
- * The InputFormat supports projection (selection and order of fields) and partition filters.
- *
- * Data can be returned as {@link org.apache.hive.hcatalog.data.HCatRecord} or Flink-native tuple.
- *
- * Note: Flink tuples might only support a limited number of fields (depending on the API).
- *
- * @param <T>
- */
-public abstract class HCatInputFormatBase<T> extends RichInputFormat<T, HadoopInputSplit> implements ResultTypeQueryable<T> {
-
- private static final long serialVersionUID = 1L;
-
- private Configuration configuration;
-
- private org.apache.hive.hcatalog.mapreduce.HCatInputFormat hCatInputFormat;
- private RecordReader<WritableComparable, HCatRecord> recordReader;
- private boolean fetched = false;
- private boolean hasNext;
-
- protected String[] fieldNames = new String[0];
- protected HCatSchema outputSchema;
-
- private TypeInformation<T> resultType;
-
- public HCatInputFormatBase() { }
-
- /**
- * Creates a HCatInputFormat for the given database and table.
- * By default, the InputFormat returns {@link org.apache.hive.hcatalog.data.HCatRecord}.
- * The return type of the InputFormat can be changed to Flink-native tuples by calling
- * {@link HCatInputFormatBase#asFlinkTuples()}.
- *
- * @param database The name of the database to read from.
- * @param table The name of the table to read.
- * @throws java.io.IOException
- */
- public HCatInputFormatBase(String database, String table) throws IOException {
- this(database, table, new Configuration());
- }
-
- /**
- * Creates a HCatInputFormat for the given database, table, and
- * {@link org.apache.hadoop.conf.Configuration}.
- * By default, the InputFormat returns {@link org.apache.hive.hcatalog.data.HCatRecord}.
- * The return type of the InputFormat can be changed to Flink-native tuples by calling
- * {@link HCatInputFormatBase#asFlinkTuples()}.
- *
- * @param database The name of the database to read from.
- * @param table The name of the table to read.
- * @param config The Configuration for the InputFormat.
- * @throws java.io.IOException
- */
- public HCatInputFormatBase(String database, String table, Configuration config) throws IOException {
- super();
- this.configuration = config;
- HadoopUtils.mergeHadoopConf(this.configuration);
-
- this.hCatInputFormat = org.apache.hive.hcatalog.mapreduce.HCatInputFormat.setInput(this.configuration, database, table);
- this.outputSchema = org.apache.hive.hcatalog.mapreduce.HCatInputFormat.getTableSchema(this.configuration);
-
- // configure output schema of HCatFormat
- configuration.set("mapreduce.lib.hcat.output.schema", HCatUtil.serialize(outputSchema));
- // set type information
- this.resultType = new WritableTypeInfo(DefaultHCatRecord.class);
- }
-
- /**
- * Specifies the fields which are returned by the InputFormat and their order.
- *
- * @param fields The fields and their order which are returned by the InputFormat.
- * @return This InputFormat with specified return fields.
- * @throws java.io.IOException
- */
- public HCatInputFormatBase<T> getFields(String... fields) throws IOException {
-
- // build output schema
- ArrayList<HCatFieldSchema> fieldSchemas = new ArrayList<HCatFieldSchema>(fields.length);
- for(String field : fields) {
- fieldSchemas.add(this.outputSchema.get(field));
- }
- this.outputSchema = new HCatSchema(fieldSchemas);
-
- // update output schema configuration
- configuration.set("mapreduce.lib.hcat.output.schema", HCatUtil.serialize(outputSchema));
-
- return this;
- }
-
- /**
- * Specifies a SQL-like filter condition on the table's partition columns.
- * Filter conditions on non-partition columns are invalid.
- * A partition filter can significantly reduce the amount of data to be read.
- *
- * @param filter A SQL-like filter condition on the table's partition columns.
- * @return This InputFormat with specified partition filter.
- * @throws java.io.IOException
- */
- public HCatInputFormatBase<T> withFilter(String filter) throws IOException {
-
- // set filter
- this.hCatInputFormat.setFilter(filter);
-
- return this;
- }
-
- /**
- * Specifies that the InputFormat returns Flink tuples instead of
- * {@link org.apache.hive.hcatalog.data.HCatRecord}.
- *
- * Note: Flink tuples might only support a limited number of fields (depending on the API).
- *
- * @return This InputFormat.
- * @throws org.apache.hive.hcatalog.common.HCatException
- */
- public HCatInputFormatBase<T> asFlinkTuples() throws HCatException {
-
- // build type information
- int numFields = outputSchema.getFields().size();
- if(numFields > this.getMaxFlinkTupleSize()) {
- throw new IllegalArgumentException("Only up to "+this.getMaxFlinkTupleSize()+
- " fields can be returned as Flink tuples.");
- }
-
- TypeInformation[] fieldTypes = new TypeInformation[numFields];
- fieldNames = new String[numFields];
- for (String fieldName : outputSchema.getFieldNames()) {
- HCatFieldSchema field = outputSchema.get(fieldName);
-
- int fieldPos = outputSchema.getPosition(fieldName);
- TypeInformation fieldType = getFieldType(field);
-
- fieldTypes[fieldPos] = fieldType;
- fieldNames[fieldPos] = fieldName;
-
- }
- this.resultType = new TupleTypeInfo(fieldTypes);
-
- return this;
- }
-
- protected abstract int getMaxFlinkTupleSize();
-
- private TypeInformation getFieldType(HCatFieldSchema fieldSchema) {
-
- switch(fieldSchema.getType()) {
- case INT:
- return BasicTypeInfo.INT_TYPE_INFO;
- case TINYINT:
- return BasicTypeInfo.BYTE_TYPE_INFO;
- case SMALLINT:
- return BasicTypeInfo.SHORT_TYPE_INFO;
- case BIGINT:
- return BasicTypeInfo.LONG_TYPE_INFO;
- case BOOLEAN:
- return BasicTypeInfo.BOOLEAN_TYPE_INFO;
- case FLOAT:
- return BasicTypeInfo.FLOAT_TYPE_INFO;
- case DOUBLE:
- return BasicTypeInfo.DOUBLE_TYPE_INFO;
- case STRING:
- return BasicTypeInfo.STRING_TYPE_INFO;
- case BINARY:
- return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
- case ARRAY:
- return new GenericTypeInfo(List.class);
- case MAP:
- return new GenericTypeInfo(Map.class);
- case STRUCT:
- return new GenericTypeInfo(List.class);
- default:
- throw new IllegalArgumentException("Unknown data type \""+fieldSchema.getType()+"\" encountered.");
- }
- }
-
- /**
- * Returns the {@link org.apache.hadoop.conf.Configuration} of the HCatInputFormat.
- *
- * @return The Configuration of the HCatInputFormat.
- */
- public Configuration getConfiguration() {
- return this.configuration;
- }
-
- /**
- * Returns the {@link org.apache.hive.hcatalog.data.schema.HCatSchema} of the {@link org.apache.hive.hcatalog.data.HCatRecord}
- * returned by this InputFormat.
- *
- * @return The HCatSchema of the HCatRecords returned by this InputFormat.
- */
- public HCatSchema getOutputSchema() {
- return this.outputSchema;
- }
-
- // --------------------------------------------------------------------------------------------
- // InputFormat
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void configure(org.apache.flink.configuration.Configuration parameters) {
- // nothing to do
- }
-
- @Override
- public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException {
- // no statistics provided at the moment
- return null;
- }
-
- @Override
- public HadoopInputSplit[] createInputSplits(int minNumSplits)
- throws IOException {
- configuration.setInt("mapreduce.input.fileinputformat.split.minsize", minNumSplits);
-
- JobContext jobContext = null;
- try {
- jobContext = HadoopUtils.instantiateJobContext(configuration, new JobID());
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- List<InputSplit> splits;
- try {
- splits = this.hCatInputFormat.getSplits(jobContext);
- } catch (InterruptedException e) {
- throw new IOException("Could not get Splits.", e);
- }
- HadoopInputSplit[] hadoopInputSplits = new HadoopInputSplit[splits.size()];
-
- for(int i = 0; i < hadoopInputSplits.length; i++){
- hadoopInputSplits[i] = new HadoopInputSplit(i, splits.get(i), jobContext);
- }
- return hadoopInputSplits;
- }
-
- @Override
- public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] inputSplits) {
- return new LocatableInputSplitAssigner(inputSplits);
- }
-
- @Override
- public void open(HadoopInputSplit split) throws IOException {
- TaskAttemptContext context = null;
- try {
- context = HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID());
- } catch(Exception e) {
- throw new RuntimeException(e);
- }
-
- try {
- this.recordReader = this.hCatInputFormat
- .createRecordReader(split.getHadoopInputSplit(), context);
- this.recordReader.initialize(split.getHadoopInputSplit(), context);
- } catch (InterruptedException e) {
- throw new IOException("Could not create RecordReader.", e);
- } finally {
- this.fetched = false;
- }
- }
-
- @Override
- public boolean reachedEnd() throws IOException {
- if(!this.fetched) {
- fetchNext();
- }
- return !this.hasNext;
- }
-
- private void fetchNext() throws IOException {
- try {
- this.hasNext = this.recordReader.nextKeyValue();
- } catch (InterruptedException e) {
- throw new IOException("Could not fetch next KeyValue pair.", e);
- } finally {
- this.fetched = true;
- }
- }
-
- @Override
- public T nextRecord(T record) throws IOException {
- if(!this.fetched) {
- // first record
- fetchNext();
- }
- if(!this.hasNext) {
- return null;
- }
- try {
-
- // get next HCatRecord
- HCatRecord v = this.recordReader.getCurrentValue();
- this.fetched = false;
-
- if(this.fieldNames.length > 0) {
- // return as Flink tuple
- return this.buildFlinkTuple(record, v);
-
- } else {
- // return as HCatRecord
- return (T)v;
- }
-
- } catch (InterruptedException e) {
- throw new IOException("Could not get next record.", e);
- }
- }
-
- protected abstract T buildFlinkTuple(T t, HCatRecord record) throws HCatException;
-
- @Override
- public void close() throws IOException {
- this.recordReader.close();
- }
-
- // --------------------------------------------------------------------------------------------
- // Custom de/serialization methods
- // --------------------------------------------------------------------------------------------
-
- private void writeObject(ObjectOutputStream out) throws IOException {
- out.writeInt(this.fieldNames.length);
- for(String fieldName : this.fieldNames) {
- out.writeUTF(fieldName);
- }
- this.configuration.write(out);
- }
-
- @SuppressWarnings("unchecked")
- private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
- this.fieldNames = new String[in.readInt()];
- for(int i=0; i<this.fieldNames.length; i++) {
- this.fieldNames[i] = in.readUTF();
- }
-
- Configuration configuration = new Configuration();
- configuration.readFields(in);
-
- if(this.configuration == null) {
- this.configuration = configuration;
- }
-
- this.hCatInputFormat = new org.apache.hive.hcatalog.mapreduce.HCatInputFormat();
- this.outputSchema = (HCatSchema)HCatUtil.deserialize(this.configuration.get("mapreduce.lib.hcat.output.schema"));
- }
-
- // --------------------------------------------------------------------------------------------
- // Result type business
- // --------------------------------------------------------------------------------------------
-
- @Override
- public TypeInformation<T> getProducedType() {
- return this.resultType;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java b/flink-batch-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
deleted file mode 100644
index 46f3cd5..0000000
--- a/flink-batch-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hcatalog.java;
-
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.hcatalog.HCatInputFormatBase;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hive.hcatalog.common.HCatException;
-import org.apache.hive.hcatalog.data.HCatRecord;
-
-/**
- * A InputFormat to read from HCatalog tables.
- * The InputFormat supports projection (selection and order of fields) and partition filters.
- *
- * Data can be returned as {@link HCatRecord} or Flink {@link org.apache.flink.api.java.tuple.Tuple}.
- * Flink tuples support only up to 25 fields.
- *
- * @param <T>
- */
-public class HCatInputFormat<T> extends HCatInputFormatBase<T> {
- private static final long serialVersionUID = 1L;
-
- public HCatInputFormat() {}
-
- public HCatInputFormat(String database, String table) throws Exception {
- super(database, table);
- }
-
- public HCatInputFormat(String database, String table, Configuration config) throws Exception {
- super(database, table, config);
- }
-
-
- @Override
- protected int getMaxFlinkTupleSize() {
- return 25;
- }
-
- @Override
- protected T buildFlinkTuple(T t, HCatRecord record) throws HCatException {
-
- Tuple tuple = (Tuple)t;
-
- // Extract all fields from HCatRecord
- for(int i=0; i < this.fieldNames.length; i++) {
-
- // get field value
- Object o = record.get(this.fieldNames[i], this.outputSchema);
-
- // Set field value in Flink tuple.
- // Partition columns are returned as String and
- // need to be converted to original type.
- switch(this.outputSchema.get(i).getType()) {
- case INT:
- if(o instanceof String) {
- tuple.setField(Integer.parseInt((String) o), i);
- } else {
- tuple.setField(o, i);
- }
- break;
- case TINYINT:
- if(o instanceof String) {
- tuple.setField(Byte.parseByte((String) o), i);
- } else {
- tuple.setField(o, i);
- }
- break;
- case SMALLINT:
- if(o instanceof String) {
- tuple.setField(Short.parseShort((String) o), i);
- } else {
- tuple.setField(o, i);
- }
- break;
- case BIGINT:
- if(o instanceof String) {
- tuple.setField(Long.parseLong((String) o), i);
- } else {
- tuple.setField(o, i);
- }
- break;
- case BOOLEAN:
- if(o instanceof String) {
- tuple.setField(Boolean.parseBoolean((String) o), i);
- } else {
- tuple.setField(o, i);
- }
- break;
- case FLOAT:
- if(o instanceof String) {
- tuple.setField(Float.parseFloat((String) o), i);
- } else {
- tuple.setField(o, i);
- }
- break;
- case DOUBLE:
- if(o instanceof String) {
- tuple.setField(Double.parseDouble((String) o), i);
- } else {
- tuple.setField(o, i);
- }
- break;
- case STRING:
- tuple.setField(o, i);
- break;
- case BINARY:
- if(o instanceof String) {
- throw new RuntimeException("Cannot handle partition keys of type BINARY.");
- } else {
- tuple.setField(o, i);
- }
- break;
- case ARRAY:
- if(o instanceof String) {
- throw new RuntimeException("Cannot handle partition keys of type ARRAY.");
- } else {
- tuple.setField(o, i);
- }
- break;
- case MAP:
- if(o instanceof String) {
- throw new RuntimeException("Cannot handle partition keys of type MAP.");
- } else {
- tuple.setField(o, i);
- }
- break;
- case STRUCT:
- if(o instanceof String) {
- throw new RuntimeException("Cannot handle partition keys of type STRUCT.");
- } else {
- tuple.setField(o, i);
- }
- break;
- default:
- throw new RuntimeException("Invalid Type");
- }
- }
-
- return (T)tuple;
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala b/flink-batch-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
deleted file mode 100644
index 0299ee1..0000000
--- a/flink-batch-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hcatalog.scala
-
-import org.apache.flink.configuration
-import org.apache.flink.hcatalog.HCatInputFormatBase
-import org.apache.hadoop.conf.Configuration
-import org.apache.hive.hcatalog.data.HCatRecord
-import org.apache.hive.hcatalog.data.schema.HCatFieldSchema
-
-/**
- * A InputFormat to read from HCatalog tables.
- * The InputFormat supports projection (selection and order of fields) and partition filters.
- *
- * Data can be returned as [[HCatRecord]] or Scala tuples.
- * Scala tuples support only up to 22 fields.
- *
- */
-class HCatInputFormat[T](
- database: String,
- table: String,
- config: Configuration
- ) extends HCatInputFormatBase[T](database, table, config) {
-
- def this(database: String, table: String) {
- this(database, table, new Configuration)
- }
-
- var vals: Array[Any] = Array[Any]()
-
- override def configure(parameters: configuration.Configuration): Unit = {
- super.configure(parameters)
- vals = new Array[Any](fieldNames.length)
- }
-
- override protected def getMaxFlinkTupleSize: Int = 22
-
- override protected def buildFlinkTuple(t: T, record: HCatRecord): T = {
-
- // Extract all fields from HCatRecord
- var i: Int = 0
- while (i < this.fieldNames.length) {
-
- val o: AnyRef = record.get(this.fieldNames(i), this.outputSchema)
-
- // partition columns are returned as String
- // Check and convert to actual type.
- this.outputSchema.get(i).getType match {
- case HCatFieldSchema.Type.INT =>
- if (o.isInstanceOf[String]) {
- vals(i) = o.asInstanceOf[String].toInt
- }
- else {
- vals(i) = o.asInstanceOf[Int]
- }
- case HCatFieldSchema.Type.TINYINT =>
- if (o.isInstanceOf[String]) {
- vals(i) = o.asInstanceOf[String].toInt.toByte
- }
- else {
- vals(i) = o.asInstanceOf[Byte]
- }
- case HCatFieldSchema.Type.SMALLINT =>
- if (o.isInstanceOf[String]) {
- vals(i) = o.asInstanceOf[String].toInt.toShort
- }
- else {
- vals(i) = o.asInstanceOf[Short]
- }
- case HCatFieldSchema.Type.BIGINT =>
- if (o.isInstanceOf[String]) {
- vals(i) = o.asInstanceOf[String].toLong
- }
- else {
- vals(i) = o.asInstanceOf[Long]
- }
- case HCatFieldSchema.Type.BOOLEAN =>
- if (o.isInstanceOf[String]) {
- vals(i) = o.asInstanceOf[String].toBoolean
- }
- else {
- vals(i) = o.asInstanceOf[Boolean]
- }
- case HCatFieldSchema.Type.FLOAT =>
- if (o.isInstanceOf[String]) {
- vals(i) = o.asInstanceOf[String].toFloat
- }
- else {
- vals(i) = o.asInstanceOf[Float]
- }
- case HCatFieldSchema.Type.DOUBLE =>
- if (o.isInstanceOf[String]) {
- vals(i) = o.asInstanceOf[String].toDouble
- }
- else {
- vals(i) = o.asInstanceOf[Double]
- }
- case HCatFieldSchema.Type.STRING =>
- vals(i) = o
- case HCatFieldSchema.Type.BINARY =>
- if (o.isInstanceOf[String]) {
- throw new RuntimeException("Cannot handle partition keys of type BINARY.")
- }
- else {
- vals(i) = o.asInstanceOf[Array[Byte]]
- }
- case HCatFieldSchema.Type.ARRAY =>
- if (o.isInstanceOf[String]) {
- throw new RuntimeException("Cannot handle partition keys of type ARRAY.")
- }
- else {
- vals(i) = o.asInstanceOf[List[Object]]
- }
- case HCatFieldSchema.Type.MAP =>
- if (o.isInstanceOf[String]) {
- throw new RuntimeException("Cannot handle partition keys of type MAP.")
- }
- else {
- vals(i) = o.asInstanceOf[Map[Object, Object]]
- }
- case HCatFieldSchema.Type.STRUCT =>
- if (o.isInstanceOf[String]) {
- throw new RuntimeException("Cannot handle partition keys of type STRUCT.")
- }
- else {
- vals(i) = o.asInstanceOf[List[Object]]
- }
- case _ =>
- throw new RuntimeException("Invalid type " + this.outputSchema.get(i).getType +
- " encountered.")
- }
-
- i += 1
- }
- createScalaTuple(vals)
- }
-
- private def createScalaTuple(vals: Array[Any]): T = {
-
- this.fieldNames.length match {
- case 1 =>
- new Tuple1(vals(0)).asInstanceOf[T]
- case 2 =>
- new Tuple2(vals(0), vals(1)).asInstanceOf[T]
- case 3 =>
- new Tuple3(vals(0), vals(1), vals(2)).asInstanceOf[T]
- case 4 =>
- new Tuple4(vals(0), vals(1), vals(2), vals(3)).asInstanceOf[T]
- case 5 =>
- new Tuple5(vals(0), vals(1), vals(2), vals(3), vals(4)).asInstanceOf[T]
- case 6 =>
- new Tuple6(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5)).asInstanceOf[T]
- case 7 =>
- new Tuple7(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6)).asInstanceOf[T]
- case 8 =>
- new Tuple8(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7))
- .asInstanceOf[T]
- case 9 =>
- new Tuple9(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
- vals(8)).asInstanceOf[T]
- case 10 =>
- new Tuple10(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
- vals(8), vals(9)).asInstanceOf[T]
- case 11 =>
- new Tuple11(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
- vals(8), vals(9), vals(10)).asInstanceOf[T]
- case 12 =>
- new Tuple12(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
- vals(8), vals(9), vals(10), vals(11)).asInstanceOf[T]
- case 13 =>
- new Tuple13(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
- vals(8), vals(9), vals(10), vals(11), vals(12)).asInstanceOf[T]
- case 14 =>
- new Tuple14(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
- vals(8), vals(9), vals(10), vals(11), vals(12), vals(13)).asInstanceOf[T]
- case 15 =>
- new Tuple15(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
- vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14)).asInstanceOf[T]
- case 16 =>
- new Tuple16(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
- vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15))
- .asInstanceOf[T]
- case 17 =>
- new Tuple17(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
- vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15),
- vals(16)).asInstanceOf[T]
- case 18 =>
- new Tuple18(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
- vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15),
- vals(16), vals(17)).asInstanceOf[T]
- case 19 =>
- new Tuple19(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
- vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15),
- vals(16), vals(17), vals(18)).asInstanceOf[T]
- case 20 =>
- new Tuple20(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
- vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15),
- vals(16), vals(17), vals(18), vals(19)).asInstanceOf[T]
- case 21 =>
- new Tuple21(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
- vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15),
- vals(16), vals(17), vals(18), vals(19), vals(20)).asInstanceOf[T]
- case 22 =>
- new Tuple22(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
- vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15),
- vals(16), vals(17), vals(18), vals(19), vals(20), vals(21)).asInstanceOf[T]
- case _ =>
- throw new RuntimeException("Only up to 22 fields supported for Scala Tuples.")
-
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-jdbc/pom.xml b/flink-batch-connectors/flink-jdbc/pom.xml
deleted file mode 100644
index 40779ba..0000000
--- a/flink-batch-connectors/flink-jdbc/pom.xml
+++ /dev/null
@@ -1,66 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-batch-connectors</artifactId>
- <version>1.2-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>flink-jdbc</artifactId>
- <name>flink-jdbc</name>
-
- <packaging>jar</packaging>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table_2.10</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_2.10</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.derby</groupId>
- <artifactId>derby</artifactId>
- <version>10.10.1.1</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
-</project>