You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/07/22 12:41:02 UTC
[62/92] [abbrv] prefix all projects in addons and quickstarts with
flink-
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java b/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
deleted file mode 100644
index 9ff5af7..0000000
--- a/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
+++ /dev/null
@@ -1,407 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.addons.hbase;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.addons.hbase.common.HBaseKey;
-import org.apache.flink.addons.hbase.common.HBaseResult;
-import org.apache.flink.addons.hbase.common.HBaseUtil;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.OperatingSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.TableRecordReader;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.util.StringUtils;
-
-/**
- * {@link InputFormat} subclass that wraps the access for HTables.
- */
-public class TableInputFormat implements InputFormat<Record, TableInputSplit> {
-
- private static final long serialVersionUID = 1L;
-
- private static final Log LOG = LogFactory.getLog(TableInputFormat.class);
-
- /** A handle on an HBase table */
- private HTable table;
-
- /** The scanner that performs the actual access on the table. HBase object */
- private Scan scan;
-
- /** Hbase' iterator wrapper */
- private TableRecordReader tableRecordReader;
-
- /** helper variable to decide whether the input is exhausted or not */
- private boolean endReached = false;
-
- /** Job parameter that specifies the input table. */
- public static final String INPUT_TABLE = "hbase.inputtable";
-
- /** Location of the hbase-site.xml. If set, the HBaseAdmin will build inside */
- public static final String CONFIG_LOCATION = "hbase.config.location";
-
- /**
- * Base-64 encoded scanner. All other SCAN_ confs are ignored if this is specified.
- * See {@link TableMapReduceUtil#convertScanToString(Scan)} for more details.
- */
- public static final String SCAN = "hbase.scan";
-
- /** Column Family to Scan */
- public static final String SCAN_COLUMN_FAMILY = "hbase.scan.column.family";
-
- /** Space delimited list of columns to scan. */
- public static final String SCAN_COLUMNS = "hbase.scan.columns";
-
- /** The timestamp used to filter columns with a specific timestamp. */
- public static final String SCAN_TIMESTAMP = "hbase.scan.timestamp";
-
- /** The starting timestamp used to filter columns with a specific range of versions. */
- public static final String SCAN_TIMERANGE_START = "hbase.scan.timerange.start";
-
- /** The ending timestamp used to filter columns with a specific range of versions. */
- public static final String SCAN_TIMERANGE_END = "hbase.scan.timerange.end";
-
- /** The maximum number of version to return. */
- public static final String SCAN_MAXVERSIONS = "hbase.scan.maxversions";
-
- /** Set to false to disable server-side caching of blocks for this scan. */
- public static final String SCAN_CACHEBLOCKS = "hbase.scan.cacheblocks";
-
- /** The number of rows for caching that will be passed to scanners. */
- public static final String SCAN_CACHEDROWS = "hbase.scan.cachedrows";
-
- /** mutable objects that are used to avoid recreation of wrapper objects */
- protected HBaseKey hbaseKey;
-
- protected HBaseResult hbaseResult;
-
- private org.apache.hadoop.conf.Configuration hConf;
-
- @Override
- public void configure(Configuration parameters) {
- HTable table = createTable(parameters);
- setTable(table);
- Scan scan = createScanner(parameters);
- setScan(scan);
- }
-
- /**
- * Read the configuration and creates a {@link Scan} object.
- *
- * @param parameters
- * @return
- */
- protected Scan createScanner(Configuration parameters) {
- Scan scan = null;
- if (parameters.getString(SCAN, null) != null) {
- try {
- scan = HBaseUtil.convertStringToScan(parameters.getString(SCAN, null));
- } catch (IOException e) {
- LOG.error("An error occurred.", e);
- }
- } else {
- try {
- scan = new Scan();
-
- // if (parameters.getString(SCAN_COLUMNS, null) != null) {
- // scan.addColumns(parameters.getString(SCAN_COLUMNS, null));
- // }
-
- if (parameters.getString(SCAN_COLUMN_FAMILY, null) != null) {
- scan.addFamily(Bytes.toBytes(parameters.getString(SCAN_COLUMN_FAMILY, null)));
- }
-
- if (parameters.getString(SCAN_TIMESTAMP, null) != null) {
- scan.setTimeStamp(Long.parseLong(parameters.getString(SCAN_TIMESTAMP, null)));
- }
-
- if (parameters.getString(SCAN_TIMERANGE_START, null) != null
- && parameters.getString(SCAN_TIMERANGE_END, null) != null) {
- scan.setTimeRange(
- Long.parseLong(parameters.getString(SCAN_TIMERANGE_START, null)),
- Long.parseLong(parameters.getString(SCAN_TIMERANGE_END, null)));
- }
-
- if (parameters.getString(SCAN_MAXVERSIONS, null) != null) {
- scan.setMaxVersions(Integer.parseInt(parameters.getString(SCAN_MAXVERSIONS, null)));
- }
-
- if (parameters.getString(SCAN_CACHEDROWS, null) != null) {
- scan.setCaching(Integer.parseInt(parameters.getString(SCAN_CACHEDROWS, null)));
- }
-
- // false by default, full table scans generate too much BC churn
- scan.setCacheBlocks((parameters.getBoolean(SCAN_CACHEBLOCKS, false)));
- } catch (Exception e) {
- LOG.error(StringUtils.stringifyException(e));
- }
- }
- return scan;
- }
-
- /**
- * Create an {@link HTable} instance and set it into this format.
- *
- * @param parameters
- * a {@link Configuration} that holds at least the table name.
- */
- protected HTable createTable(Configuration parameters) {
- String configLocation = parameters.getString(TableInputFormat.CONFIG_LOCATION, null);
- LOG.info("Got config location: " + configLocation);
- if (configLocation != null)
- {
- org.apache.hadoop.conf.Configuration dummyConf = new org.apache.hadoop.conf.Configuration();
- if(OperatingSystem.isWindows()) {
- dummyConf.addResource(new Path("file:/" + configLocation));
- } else {
- dummyConf.addResource(new Path("file://" + configLocation));
- }
- hConf = HBaseConfiguration.create(dummyConf);
- ;
- // hConf.set("hbase.master", "im1a5.internetmemory.org");
- LOG.info("hbase master: " + hConf.get("hbase.master"));
- LOG.info("zookeeper quorum: " + hConf.get("hbase.zookeeper.quorum"));
-
- }
- String tableName = parameters.getString(INPUT_TABLE, "");
- try {
- return new HTable(this.hConf, tableName);
- } catch (Exception e) {
- LOG.error(StringUtils.stringifyException(e));
- }
- return null;
- }
-
- @Override
- public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public boolean reachedEnd() throws IOException {
- return this.endReached;
- }
-
- protected boolean nextResult() throws IOException {
- if (this.tableRecordReader == null)
- {
- throw new IOException("No table record reader provided!");
- }
-
- try {
- if (this.tableRecordReader.nextKeyValue())
- {
- ImmutableBytesWritable currentKey = this.tableRecordReader.getCurrentKey();
- Result currentValue = this.tableRecordReader.getCurrentValue();
-
- hbaseKey.setWritable(currentKey);
- hbaseResult.setResult(currentValue);
- } else
- {
- this.endReached = true;
- return false;
- }
- } catch (InterruptedException e) {
- LOG.error("Table reader has been interrupted", e);
- throw new IOException(e);
- }
-
- return true;
- }
-
- @Override
- public Record nextRecord(Record record) throws IOException {
- if (nextResult()) {
- mapResultToRecord(record, hbaseKey, hbaseResult);
- return record;
- } else {
- return null;
- }
- }
-
- /**
- * Maps the current HBase Result into a Record.
- * This implementation simply stores the HBaseKey at position 0, and the HBase Result object at position 1.
- *
- * @param record
- * @param key
- * @param result
- */
- public void mapResultToRecord(Record record, HBaseKey key, HBaseResult result) {
- record.setField(0, key);
- record.setField(1, result);
- }
-
- @Override
- public void close() throws IOException {
- this.tableRecordReader.close();
- }
-
- @Override
- public void open(TableInputSplit split) throws IOException {
- if (split == null)
- {
- throw new IOException("Input split is null!");
- }
-
- if (this.table == null)
- {
- throw new IOException("No HTable provided!");
- }
-
- if (this.scan == null)
- {
- throw new IOException("No Scan instance provided");
- }
-
- this.tableRecordReader = new TableRecordReader();
-
- this.tableRecordReader.setHTable(this.table);
-
- Scan sc = new Scan(this.scan);
- sc.setStartRow(split.getStartRow());
- LOG.info("split start row: " + new String(split.getStartRow()));
- sc.setStopRow(split.getEndRow());
- LOG.info("split end row: " + new String(split.getEndRow()));
-
- this.tableRecordReader.setScan(sc);
- this.tableRecordReader.restart(split.getStartRow());
-
- this.hbaseKey = new HBaseKey();
- this.hbaseResult = new HBaseResult();
-
- endReached = false;
- }
-
-
- @Override
- public TableInputSplit[] createInputSplits(final int minNumSplits) throws IOException {
-
- if (this.table == null) {
- throw new IOException("No table was provided.");
- }
-
- final Pair<byte[][], byte[][]> keys = this.table.getStartEndKeys();
-
- if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
-
- throw new IOException("Expecting at least one region.");
- }
- int count = 0;
- final List<TableInputSplit> splits = new ArrayList<TableInputSplit>(keys.getFirst().length);
- for (int i = 0; i < keys.getFirst().length; i++) {
-
- if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
- continue;
- }
-
- final String regionLocation = this.table.getRegionLocation(keys.getFirst()[i], false).getHostnamePort();
- final byte[] startRow = this.scan.getStartRow();
- final byte[] stopRow = this.scan.getStopRow();
-
- // determine if the given start an stop key fall into the region
- if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
- Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
- (stopRow.length == 0 ||
- Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
-
- final byte[] splitStart = startRow.length == 0 ||
- Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
- keys.getFirst()[i] : startRow;
- final byte[] splitStop = (stopRow.length == 0 ||
- Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
- keys.getSecond()[i].length > 0 ?
- keys.getSecond()[i] : stopRow;
- final TableInputSplit split = new TableInputSplit(splits.size(), new String[] { regionLocation },
- this.table.getTableName(), splitStart, splitStop);
- splits.add(split);
- if (LOG.isDebugEnabled()) {
- LOG.debug("getSplits: split -> " + (count++) + " -> " + split);
- }
- }
- }
-
- return splits.toArray(new TableInputSplit[0]);
- }
-
- /**
- * Test if the given region is to be included in the InputSplit while splitting
- * the regions of a table.
- * <p>
- * This optimization is effective when there is a specific reasoning to exclude an entire region from the M-R job,
- * (and hence, not contributing to the InputSplit), given the start and end keys of the same. <br>
- * Useful when we need to remember the last-processed top record and revisit the [last, current) interval for M-R
- * processing, continuously. In addition to reducing InputSplits, reduces the load on the region server as well, due
- * to the ordering of the keys. <br>
- * <br>
- * Note: It is possible that <code>endKey.length() == 0 </code> , for the last (recent) region. <br>
- * Override this method, if you want to bulk exclude regions altogether from M-R. By default, no region is excluded(
- * i.e. all regions are included).
- *
- * @param startKey
- * Start key of the region
- * @param endKey
- * End key of the region
- * @return true, if this region needs to be included as part of the input (default).
- */
- private static boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) {
- return true;
- }
-
-
- @Override
- public Class<TableInputSplit> getInputSplitType() {
-
- return TableInputSplit.class;
- }
-
- public void setTable(HTable table)
- {
- this.table = table;
- }
-
- public HTable getTable() {
- return table;
- }
-
- public void setScan(Scan scan)
- {
- this.scan = scan;
- }
-
- public Scan getScan() {
- return scan;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java b/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
deleted file mode 100644
index a77402d..0000000
--- a/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.addons.hbase;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.LocatableInputSplit;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-/**
- * This class implements a input splits for HBase. Each table input split corresponds to a key range (low, high). All
- * references to row below refer to the key of the row.
- */
-public class TableInputSplit extends LocatableInputSplit {
-
- /**
- * The name of the table to retrieve data from
- */
- private byte[] tableName;
-
- /**
- * The start row of the split.
- */
- private byte[] startRow;
-
- /**
- * The end row of the split.
- */
- private byte[] endRow;
-
- /**
- * Creates a new table input split
- *
- * @param splitNumber
- * the number of the input split
- * @param hostnames
- * the names of the hosts storing the data the input split refers to
- * @param tableName
- * the name of the table to retrieve data from
- * @param startRow
- * the start row of the split
- * @param endRow
- * the end row of the split
- */
- TableInputSplit(final int splitNumber, final String[] hostnames, final byte[] tableName, final byte[] startRow,
- final byte[] endRow) {
- super(splitNumber, hostnames);
-
- this.tableName = tableName;
- this.startRow = startRow;
- this.endRow = endRow;
- }
-
- /**
- * Default constructor for serialization/deserialization.
- */
- public TableInputSplit() {
- super();
-
- this.tableName = null;
- this.startRow = null;
- this.endRow = null;
- }
-
- /**
- * Returns the table name.
- *
- * @return The table name.
- */
- public byte[] getTableName() {
- return this.tableName;
- }
-
- /**
- * Returns the start row.
- *
- * @return The start row.
- */
- public byte[] getStartRow() {
- return this.startRow;
- }
-
- /**
- * Returns the end row.
- *
- * @return The end row.
- */
- public byte[] getEndRow() {
- return this.endRow;
- }
-
-
- @Override
- public void write(final DataOutputView out) throws IOException {
-
- super.write(out);
-
- // Write the table name
- if (this.tableName == null) {
- out.writeInt(-1);
- } else {
- out.writeInt(this.tableName.length);
- out.write(this.tableName);
- }
-
- // Write the start row
- if (this.startRow == null) {
- out.writeInt(-1);
- } else {
- out.writeInt(this.startRow.length);
- out.write(this.startRow);
- }
-
- // Write the end row
- if (this.endRow == null) {
- out.writeInt(-1);
- } else {
- out.writeInt(this.endRow.length);
- out.write(this.endRow);
- }
- }
-
-
- @Override
- public void read(final DataInputView in) throws IOException {
-
- super.read(in);
-
- // Read the table name
- int len = in.readInt();
- if (len >= 0) {
- this.tableName = new byte[len];
- in.readFully(this.tableName);
- }
-
- // Read the start row
- len = in.readInt();
- if (len >= 0) {
- this.startRow = new byte[len];
- in.readFully(this.startRow);
- }
-
- // Read the end row
- len = in.readInt();
- if (len >= 0) {
- this.endRow = new byte[len];
- in.readFully(this.endRow);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseKey.java
----------------------------------------------------------------------
diff --git a/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseKey.java b/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseKey.java
deleted file mode 100644
index 44d64de..0000000
--- a/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseKey.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.addons.hbase.common;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Key;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
-/**
- * Simple wrapper to encapsulate an HBase h{@link ImmutableBytesWritable} as a Key
- */
-public class HBaseKey implements Key<HBaseKey> {
-
- private static final long serialVersionUID = 1L;
-
- private ImmutableBytesWritable writable;
-
-
- public HBaseKey() {
- this.writable = new ImmutableBytesWritable();
- }
-
-
- public HBaseKey(ImmutableBytesWritable writable) {
- this.writable = writable;
- }
-
-
- public ImmutableBytesWritable getWritable() {
- return writable;
- }
-
- public void setWritable(ImmutableBytesWritable writable) {
- this.writable = writable;
- }
-
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void write(DataOutputView out) throws IOException {
- this.writable.write(out);
- }
-
- @Override
- public void read(DataInputView in) throws IOException {
- this.writable.readFields(in);
- }
-
- @Override
- public int hashCode() {
- return this.writable.hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj.getClass() == HBaseKey.class) {
- return this.writable.equals(((HBaseKey) obj).writable);
- } else {
- return false;
- }
- }
-
- @Override
- public int compareTo(HBaseKey other) {
- return this.writable.compareTo(other.writable);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java
----------------------------------------------------------------------
diff --git a/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java b/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java
deleted file mode 100644
index d66f59f..0000000
--- a/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.addons.hbase.common;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Value;
-import org.apache.hadoop.hbase.client.Result;
-
-public class HBaseResult implements Value {
-
- private static final long serialVersionUID = 1L;
-
- private Result result;
-
-
- public HBaseResult() {
- this.result = new Result();
- }
-
- public HBaseResult(Result result) {
- this.result = result;
- }
-
-
- public Result getResult() {
- return this.result;
- }
-
- public void setResult(Result result) {
- this.result = result;
- }
-
- public String getStringData() {
- if(this.result != null) {
- return this.result.toString();
- }
- return null;
- }
-
- @Override
- public void read(DataInputView in) throws IOException {
- this.result.readFields(in);
- }
-
- @Override
- public void write(DataOutputView out) throws IOException {
- this.result.write(out);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseUtil.java
----------------------------------------------------------------------
diff --git a/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseUtil.java b/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseUtil.java
deleted file mode 100644
index c1911c5..0000000
--- a/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseUtil.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.addons.hbase.common;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Base64;
-
-/**
- * Utility for {@link TableInputFormat}
- */
-public class HBaseUtil {
-
- /**
- * Writes the given scan into a Base64 encoded string.
- *
- * @param scan
- * The scan to write out.
- * @return The scan saved in a Base64 encoded string.
- * @throws IOException
- * When writing the scan fails.
- */
- static String convertScanToString(Scan scan) throws IOException {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(out);
- scan.write(dos);
- return Base64.encodeBytes(out.toByteArray());
- }
-
- /**
- * Converts the given Base64 string back into a Scan instance.
- *
- * @param base64
- * The scan details.
- * @return The newly created Scan instance.
- * @throws IOException
- * When reading the scan instance fails.
- */
- public static Scan convertStringToScan(String base64) throws IOException {
- ByteArrayInputStream bis = new ByteArrayInputStream(Base64.decode(base64));
- DataInputStream dis = new DataInputStream(bis);
- Scan scan = new Scan();
- scan.readFields(dis);
- return scan;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java b/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
deleted file mode 100644
index a7bc2b3..0000000
--- a/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.addons.hbase.example;
-
-import org.apache.flink.addons.hbase.TableInputFormat;
-import org.apache.flink.addons.hbase.common.HBaseKey;
-import org.apache.flink.addons.hbase.common.HBaseResult;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.GenericDataSource;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-
-/**
- * Implements a word count which takes the input file and counts the number of
- * the occurrences of each word in the file.
- */
-public class HBaseReadExample implements Program, ProgramDescription {
-
- public static class MyTableInputFormat extends TableInputFormat {
-
- private static final long serialVersionUID = 1L;
-
- private final byte[] META_FAMILY = "meta".getBytes();
-
- private final byte[] USER_COLUMN = "user".getBytes();
-
- private final byte[] TIMESTAMP_COLUMN = "timestamp".getBytes();
-
- private final byte[] TEXT_FAMILY = "text".getBytes();
-
- private final byte[] TWEET_COLUMN = "tweet".getBytes();
-
- public MyTableInputFormat() {
- super();
-
- }
-
- @Override
- protected HTable createTable(Configuration parameters) {
- return super.createTable(parameters);
- }
-
- @Override
- protected Scan createScanner(Configuration parameters) {
- Scan scan = new Scan ();
- scan.addColumn (META_FAMILY, USER_COLUMN);
- scan.addColumn (META_FAMILY, TIMESTAMP_COLUMN);
- scan.addColumn (TEXT_FAMILY, TWEET_COLUMN);
- return scan;
- }
-
- StringValue row_string = new StringValue();
- StringValue user_string = new StringValue();
- StringValue timestamp_string = new StringValue();
- StringValue tweet_string = new StringValue();
-
- @Override
- public void mapResultToRecord(Record record, HBaseKey key,
- HBaseResult result) {
- Result res = result.getResult();
- res.getRow();
- record.setField(0, toString(row_string, res.getRow()));
- record.setField(1, toString (user_string, res.getValue(META_FAMILY, USER_COLUMN)));
- record.setField(2, toString (timestamp_string, res.getValue(META_FAMILY, TIMESTAMP_COLUMN)));
- record.setField(3, toString (tweet_string, res.getValue(TEXT_FAMILY, TWEET_COLUMN)));
- }
-
- private final StringValue toString (StringValue string, byte[] bytes) {
- string.setValueAscii(bytes, 0, bytes.length);
- return string;
- }
-
- }
-
-
- @Override
- public Plan getPlan(String... args) {
- // parse job parameters
- int numSubTasks = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
- String output = (args.length > 1 ? args[1] : "");
-
- GenericDataSource<TableInputFormat> source = new GenericDataSource<TableInputFormat>(new MyTableInputFormat(), "HBase Input");
- source.setParameter(TableInputFormat.INPUT_TABLE, "twitter");
- source.setParameter(TableInputFormat.CONFIG_LOCATION, "/etc/hbase/conf/hbase-site.xml");
- FileDataSink out = new FileDataSink(new CsvOutputFormat(), output, source, "HBase String dump");
- CsvOutputFormat.configureRecordFormat(out)
- .recordDelimiter('\n')
- .fieldDelimiter(' ')
- .field(StringValue.class, 0)
- .field(StringValue.class, 1)
- .field(StringValue.class, 2)
- .field(StringValue.class, 3);
-
- Plan plan = new Plan(out, "HBase access Example");
- plan.setDefaultParallelism(numSubTasks);
- return plan;
- }
-
-
- @Override
- public String getDescription() {
- return "Parameters: [numSubStasks] [input] [output]";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/jdbc/pom.xml b/flink-addons/jdbc/pom.xml
deleted file mode 100644
index a29d997..0000000
--- a/flink-addons/jdbc/pom.xml
+++ /dev/null
@@ -1,59 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License. See accompanying LICENSE file.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <artifactId>flink-addons</artifactId>
- <groupId>org.apache.flink</groupId>
- <version>0.6-incubating-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>jdbc</artifactId>
- <name>jdbc</name>
-
- <packaging>jar</packaging>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-core</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.derby</groupId>
- <artifactId>derby</artifactId>
- <version>10.10.1.1</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java b/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
deleted file mode 100644
index ac8bc07..0000000
--- a/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
+++ /dev/null
@@ -1,356 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io.jdbc;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Statement;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.types.NullValue;
-
-/**
- * InputFormat to read data from a database and generate tuples.
- * The InputFormat has to be configured using the supplied InputFormatBuilder.
- *
- * @param <OUT>
- * @see Tuple
- * @see DriverManager
- */
-public class JDBCInputFormat<OUT extends Tuple> implements InputFormat<OUT, InputSplit> {
- private static final long serialVersionUID = 1L;
-
- @SuppressWarnings("unused")
- private static final Log LOG = LogFactory.getLog(JDBCInputFormat.class);
-
- private String username;
- private String password;
- private String drivername;
- private String dbURL;
- private String query;
-
- private transient Connection dbConn;
- private transient Statement statement;
- private transient ResultSet resultSet;
-
- private int[] columnTypes = null;
-
- public JDBCInputFormat() {
- }
-
- @Override
- public void configure(Configuration parameters) {
- }
-
- /**
- * Connects to the source database and executes the query.
- *
- * @param ignored
- * @throws IOException
- */
- @Override
- public void open(InputSplit ignored) throws IOException {
- try {
- establishConnection();
- statement = dbConn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
- resultSet = statement.executeQuery(query);
- } catch (SQLException se) {
- close();
- throw new IllegalArgumentException("open() failed." + se.getMessage(), se);
- } catch (ClassNotFoundException cnfe) {
- throw new IllegalArgumentException("JDBC-Class not found. - " + cnfe.getMessage(), cnfe);
- }
- }
-
- private void establishConnection() throws SQLException, ClassNotFoundException {
- Class.forName(drivername);
- if (username == null) {
- dbConn = DriverManager.getConnection(dbURL);
- } else {
- dbConn = DriverManager.getConnection(dbURL, username, password);
- }
- }
-
- /**
- * Closes all resources used.
- *
- * @throws IOException Indicates that a resource could not be closed.
- */
- @Override
- public void close() throws IOException {
- try {
- resultSet.close();
- } catch (SQLException se) {
- LOG.info("Inputformat couldn't be closed - " + se.getMessage());
- } catch (NullPointerException npe) {
- }
- try {
- statement.close();
- } catch (SQLException se) {
- LOG.info("Inputformat couldn't be closed - " + se.getMessage());
- } catch (NullPointerException npe) {
- }
- try {
- dbConn.close();
- } catch (SQLException se) {
- LOG.info("Inputformat couldn't be closed - " + se.getMessage());
- } catch (NullPointerException npe) {
- }
- }
-
- /**
- * Checks whether all data has been read.
- *
- * @return boolean value indication whether all data has been read.
- * @throws IOException
- */
- @Override
- public boolean reachedEnd() throws IOException {
- try {
- if (resultSet.isLast()) {
- close();
- return true;
- }
- return false;
- } catch (SQLException se) {
- throw new IOException("Couldn't evaluate reachedEnd() - " + se.getMessage(), se);
- }
- }
-
- /**
- * Stores the next resultSet row in a tuple
- *
- * @param tuple
- * @return tuple containing next row
- * @throws java.io.IOException
- */
- @Override
- public OUT nextRecord(OUT tuple) throws IOException {
- try {
- resultSet.next();
- if (columnTypes == null) {
- extractTypes(tuple);
- }
- addValue(tuple);
- return tuple;
- } catch (SQLException se) {
- close();
- throw new IOException("Couldn't read data - " + se.getMessage(), se);
- } catch (NullPointerException npe) {
- close();
- throw new IOException("Couldn't access resultSet", npe);
- }
- }
-
- private void extractTypes(OUT tuple) throws SQLException, IOException {
- ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
- columnTypes = new int[resultSetMetaData.getColumnCount()];
- if (tuple.getArity() != columnTypes.length) {
- close();
- throw new IOException("Tuple size does not match columncount");
- }
- for (int pos = 0; pos < columnTypes.length; pos++) {
- columnTypes[pos] = resultSetMetaData.getColumnType(pos + 1);
- }
- }
-
- /**
- * Enters data value from the current resultSet into a Record.
- *
- * @param pos Tuple position to be set.
- * @param type SQL type of the resultSet value.
- * @param reuse Target Record.
- */
- private void addValue(OUT reuse) throws SQLException {
- for (int pos = 0; pos < columnTypes.length; pos++) {
- switch (columnTypes[pos]) {
- case java.sql.Types.NULL:
- reuse.setField(NullValue.getInstance(), pos);
- break;
- case java.sql.Types.BOOLEAN:
- reuse.setField(resultSet.getBoolean(pos + 1), pos);
- break;
- case java.sql.Types.BIT:
- reuse.setField(resultSet.getBoolean(pos + 1), pos);
- break;
- case java.sql.Types.CHAR:
- reuse.setField(resultSet.getString(pos + 1), pos);
- break;
- case java.sql.Types.NCHAR:
- reuse.setField(resultSet.getString(pos + 1), pos);
- break;
- case java.sql.Types.VARCHAR:
- reuse.setField(resultSet.getString(pos + 1), pos);
- break;
- case java.sql.Types.LONGVARCHAR:
- reuse.setField(resultSet.getString(pos + 1), pos);
- break;
- case java.sql.Types.LONGNVARCHAR:
- reuse.setField(resultSet.getString(pos + 1), pos);
- break;
- case java.sql.Types.TINYINT:
- reuse.setField(resultSet.getShort(pos + 1), pos);
- break;
- case java.sql.Types.SMALLINT:
- reuse.setField(resultSet.getShort(pos + 1), pos);
- break;
- case java.sql.Types.BIGINT:
- reuse.setField(resultSet.getLong(pos + 1), pos);
- break;
- case java.sql.Types.INTEGER:
- reuse.setField(resultSet.getInt(pos + 1), pos);
- break;
- case java.sql.Types.FLOAT:
- reuse.setField(resultSet.getDouble(pos + 1), pos);
- break;
- case java.sql.Types.REAL:
- reuse.setField(resultSet.getFloat(pos + 1), pos);
- break;
- case java.sql.Types.DOUBLE:
- reuse.setField(resultSet.getDouble(pos + 1), pos);
- break;
- case java.sql.Types.DECIMAL:
- reuse.setField(resultSet.getBigDecimal(pos + 1).doubleValue(), pos);
- break;
- case java.sql.Types.NUMERIC:
- reuse.setField(resultSet.getBigDecimal(pos + 1).doubleValue(), pos);
- break;
- case java.sql.Types.DATE:
- reuse.setField(resultSet.getDate(pos + 1).toString(), pos);
- break;
- case java.sql.Types.TIME:
- reuse.setField(resultSet.getTime(pos + 1).getTime(), pos);
- break;
- case java.sql.Types.TIMESTAMP:
- reuse.setField(resultSet.getTimestamp(pos + 1).toString(), pos);
- break;
- case java.sql.Types.SQLXML:
- reuse.setField(resultSet.getSQLXML(pos + 1).toString(), pos);
- break;
- default:
- throw new SQLException("Unsupported sql-type [" + columnTypes[pos] + "] on column [" + pos + "]");
-
- // case java.sql.Types.BINARY:
- // case java.sql.Types.VARBINARY:
- // case java.sql.Types.LONGVARBINARY:
- // case java.sql.Types.ARRAY:
- // case java.sql.Types.JAVA_OBJECT:
- // case java.sql.Types.BLOB:
- // case java.sql.Types.CLOB:
- // case java.sql.Types.NCLOB:
- // case java.sql.Types.DATALINK:
- // case java.sql.Types.DISTINCT:
- // case java.sql.Types.OTHER:
- // case java.sql.Types.REF:
- // case java.sql.Types.ROWID:
- // case java.sql.Types.STRUCT:
- }
- }
- }
-
- @Override
- public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
- return cachedStatistics;
- }
-
- @Override
- public InputSplit[] createInputSplits(int minNumSplits) throws IOException {
- GenericInputSplit[] split = {
- new GenericInputSplit(0, 1)
- };
- return split;
- }
-
- @Override
- public Class<? extends InputSplit> getInputSplitType() {
- return GenericInputSplit.class;
- }
-
- /**
- * A builder used to set parameters to the output format's configuration in a fluent way.
- * @return builder
- */
- public static JDBCInputFormatBuilder buildJDBCInputFormat() {
- return new JDBCInputFormatBuilder();
- }
-
- public static class JDBCInputFormatBuilder {
- private final JDBCInputFormat format;
-
- public JDBCInputFormatBuilder() {
- this.format = new JDBCInputFormat();
- }
-
- public JDBCInputFormatBuilder setUsername(String username) {
- format.username = username;
- return this;
- }
-
- public JDBCInputFormatBuilder setPassword(String password) {
- format.password = password;
- return this;
- }
-
- public JDBCInputFormatBuilder setDrivername(String drivername) {
- format.drivername = drivername;
- return this;
- }
-
- public JDBCInputFormatBuilder setDBUrl(String dbURL) {
- format.dbURL = dbURL;
- return this;
- }
-
- public JDBCInputFormatBuilder setQuery(String query) {
- format.query = query;
- return this;
- }
-
- public JDBCInputFormat finish() {
- if (format.username == null) {
- LOG.info("Username was not supplied separately.");
- }
- if (format.password == null) {
- LOG.info("Password was not supplied separately.");
- }
- if (format.dbURL == null) {
- throw new IllegalArgumentException("No dababase URL supplied.");
- }
- if (format.query == null) {
- throw new IllegalArgumentException("No query suplied");
- }
- if (format.drivername == null) {
- throw new IllegalArgumentException("No driver supplied");
- }
- return format;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java b/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
deleted file mode 100644
index 3a75480..0000000
--- a/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io.jdbc;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.configuration.Configuration;
-
-/**
- * OutputFormat to write tuples into a database.
- * The OutputFormat has to be configured using the supplied OutputFormatBuilder.
- *
- * @param <OUT>
- * @see Tuple
- * @see DriverManager
- */
-public class JDBCOutputFormat<OUT extends Tuple> implements OutputFormat<OUT> {
- private static final long serialVersionUID = 1L;
-
- @SuppressWarnings("unused")
- private static final Log LOG = LogFactory.getLog(JDBCOutputFormat.class);
-
- private String username;
- private String password;
- private String drivername;
- private String dbURL;
- private String query;
- private int batchInterval = 5000;
-
- private Connection dbConn;
- private PreparedStatement upload;
-
- private SupportedTypes[] types = null;
-
- private int batchCount = 0;
-
- public JDBCOutputFormat() {
- }
-
- @Override
- public void configure(Configuration parameters) {
- }
-
- /**
- * Connects to the target database and initializes the prepared statement.
- *
- * @param taskNumber The number of the parallel instance.
- * @throws IOException Thrown, if the output could not be opened due to an
- * I/O problem.
- */
- @Override
- public void open(int taskNumber, int numTasks) throws IOException {
- try {
- establishConnection();
- upload = dbConn.prepareStatement(query);
- } catch (SQLException sqe) {
- close();
- throw new IllegalArgumentException("open() failed:\t!", sqe);
- } catch (ClassNotFoundException cnfe) {
- close();
- throw new IllegalArgumentException("JDBC-Class not found:\t", cnfe);
- }
- }
-
- private void establishConnection() throws SQLException, ClassNotFoundException {
- Class.forName(drivername);
- if (username == null) {
- dbConn = DriverManager.getConnection(dbURL);
- } else {
- dbConn = DriverManager.getConnection(dbURL, username, password);
- }
- }
-
- private enum SupportedTypes {
- BOOLEAN,
- BYTE,
- SHORT,
- INTEGER,
- LONG,
- STRING,
- FLOAT,
- DOUBLE
- }
-
- /**
- * Adds a record to the prepared statement.
- * <p>
- * When this method is called, the output format is guaranteed to be opened.
- *
- * @param tuple The records to add to the output.
- * @throws IOException Thrown, if the records could not be added due to an I/O problem.
- */
- @Override
- public void writeRecord(OUT tuple) throws IOException {
- try {
- if (query.split("\\?,").length != tuple.getArity()) {
- close();
- throw new IOException("Tuple size does not match columncount");
- }
- if (types == null) {
- extractTypes(tuple);
- }
- addValues(tuple);
- upload.addBatch();
- batchCount++;
- if (batchCount >= batchInterval) {
- upload.executeBatch();
- batchCount = 0;
- }
- } catch (SQLException sqe) {
- close();
- throw new IllegalArgumentException("writeRecord() failed", sqe);
- } catch (IllegalArgumentException iae) {
- close();
- throw new IllegalArgumentException("writeRecord() failed", iae);
- }
- }
-
- private void extractTypes(OUT tuple) {
- types = new SupportedTypes[tuple.getArity()];
- for (int x = 0; x < tuple.getArity(); x++) {
- types[x] = SupportedTypes.valueOf(tuple.getField(x).getClass().getSimpleName().toUpperCase());
- }
- }
-
- private void addValues(OUT tuple) throws SQLException {
- for (int index = 0; index < tuple.getArity(); index++) {
- switch (types[index]) {
- case BOOLEAN:
- upload.setBoolean(index + 1, (Boolean) tuple.getField(index));
- break;
- case BYTE:
- upload.setByte(index + 1, (Byte) tuple.getField(index));
- break;
- case SHORT:
- upload.setShort(index + 1, (Short) tuple.getField(index));
- break;
- case INTEGER:
- upload.setInt(index + 1, (Integer) tuple.getField(index));
- break;
- case LONG:
- upload.setLong(index + 1, (Long) tuple.getField(index));
- break;
- case STRING:
- upload.setString(index + 1, (String) tuple.getField(index));
- break;
- case FLOAT:
- upload.setFloat(index + 1, (Float) tuple.getField(index));
- break;
- case DOUBLE:
- upload.setDouble(index + 1, (Double) tuple.getField(index));
- break;
- }
- }
- }
-
- /**
- * Executes prepared statement and closes all resources of this instance.
- *
- * @throws IOException Thrown, if the input could not be closed properly.
- */
- @Override
- public void close() throws IOException {
- try {
- upload.executeBatch();
- batchCount = 0;
- } catch (SQLException se) {
- throw new IllegalArgumentException("close() failed", se);
- } catch (NullPointerException se) {
- }
- try {
- upload.close();
- } catch (SQLException se) {
- LOG.info("Inputformat couldn't be closed - " + se.getMessage());
- } catch (NullPointerException npe) {
- }
- try {
- dbConn.close();
- } catch (SQLException se) {
- LOG.info("Inputformat couldn't be closed - " + se.getMessage());
- } catch (NullPointerException npe) {
- }
- }
-
- public static JDBCOutputFormatBuilder buildJDBCOutputFormat() {
- return new JDBCOutputFormatBuilder();
- }
-
- public static class JDBCOutputFormatBuilder {
- private final JDBCOutputFormat format;
-
- protected JDBCOutputFormatBuilder() {
- this.format = new JDBCOutputFormat();
- }
-
- public JDBCOutputFormatBuilder setUsername(String username) {
- format.username = username;
- return this;
- }
-
- public JDBCOutputFormatBuilder setPassword(String password) {
- format.password = password;
- return this;
- }
-
- public JDBCOutputFormatBuilder setDrivername(String drivername) {
- format.drivername = drivername;
- return this;
- }
-
- public JDBCOutputFormatBuilder setDBUrl(String dbURL) {
- format.dbURL = dbURL;
- return this;
- }
-
- public JDBCOutputFormatBuilder setQuery(String query) {
- format.query = query;
- return this;
- }
-
- public JDBCOutputFormatBuilder setBatchInterval(int batchInterval) {
- format.batchInterval = batchInterval;
- return this;
- }
-
- /**
- Finalizes the configuration and checks validity.
- @return Configured JDBCOutputFormat
- */
- public JDBCOutputFormat finish() {
- if (format.username == null) {
- LOG.info("Username was not supplied separately.");
- }
- if (format.password == null) {
- LOG.info("Password was not supplied separately.");
- }
- if (format.dbURL == null) {
- throw new IllegalArgumentException("No dababase URL supplied.");
- }
- if (format.query == null) {
- throw new IllegalArgumentException("No query suplied");
- }
- if (format.drivername == null) {
- throw new IllegalArgumentException("No driver supplied");
- }
- return format;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java b/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java
deleted file mode 100644
index 7d0c5e8..0000000
--- a/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io.jdbc.example;
-
-import static org.apache.flink.api.java.typeutils.BasicTypeInfo.DOUBLE_TYPE_INFO;
-import static org.apache.flink.api.java.typeutils.BasicTypeInfo.INT_TYPE_INFO;
-import static org.apache.flink.api.java.typeutils.BasicTypeInfo.STRING_TYPE_INFO;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.Statement;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
-import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-
-public class JDBCExample {
-
- public static void main(String[] args) throws Exception {
- prepareTestDb();
-
- ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
- DataSet<Tuple5> source
- = environment.createInput(JDBCInputFormat.buildJDBCInputFormat()
- .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
- .setDBUrl("jdbc:derby:memory:ebookshop")
- .setQuery("select * from books")
- .finish(),
- new TupleTypeInfo(Tuple5.class, INT_TYPE_INFO, STRING_TYPE_INFO, STRING_TYPE_INFO, DOUBLE_TYPE_INFO, INT_TYPE_INFO)
- );
-
- source.output(JDBCOutputFormat.buildJDBCOutputFormat()
- .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
- .setDBUrl("jdbc:derby:memory:ebookshop")
- .setQuery("insert into newbooks (id,title,author,price,qty) values (?,?,?,?,?)")
- .finish());
- environment.execute();
- }
-
- private static void prepareTestDb() throws Exception {
- String dbURL = "jdbc:derby:memory:ebookshop;create=true";
- Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
- Connection conn = DriverManager.getConnection(dbURL);
-
- StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books (");
- sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
- sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
- sqlQueryBuilder.append("qty INT DEFAULT NULL,");
- sqlQueryBuilder.append("PRIMARY KEY (id))");
-
- Statement stat = conn.createStatement();
- stat.executeUpdate(sqlQueryBuilder.toString());
- stat.close();
-
- sqlQueryBuilder = new StringBuilder("CREATE TABLE newbooks (");
- sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
- sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
- sqlQueryBuilder.append("qty INT DEFAULT NULL,");
- sqlQueryBuilder.append("PRIMARY KEY (id))");
-
- stat = conn.createStatement();
- stat.executeUpdate(sqlQueryBuilder.toString());
- stat.close();
-
- sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
- sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),");
- sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),");
- sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),");
- sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),");
- sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)");
-
- stat = conn.createStatement();
- stat.execute(sqlQueryBuilder.toString());
- stat.close();
-
- conn.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormat.java b/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormat.java
deleted file mode 100644
index f2930f2..0000000
--- a/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormat.java
+++ /dev/null
@@ -1,389 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io.jdbc;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Statement;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.io.NonParallelInput;
-import org.apache.flink.api.java.record.io.GenericInputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.BooleanValue;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.FloatValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.ShortValue;
-import org.apache.flink.types.StringValue;
-
-/**
- * InputFormat to read data from a database and generate PactReords.
- * The InputFormat has to be configured with the query, and either all
- * connection parameters or a complete database URL.{@link Configuration} The position of a value inside a Record is
- * determined by the table
- * returned.
- *
- * @see Configuration
- * @see Record
- * @see DriverManager
- */
-public class JDBCInputFormat extends GenericInputFormat implements NonParallelInput {
-
- private static final long serialVersionUID = 1L;
-
- @SuppressWarnings("unused")
- private static final Log LOG = LogFactory.getLog(JDBCInputFormat.class);
-
-
- public final String DRIVER_KEY = "driver";
- public final String USERNAME_KEY = "username";
- public final String PASSWORD_KEY = "password";
- public final String URL_KEY = "url";
- public final String QUERY_KEY = "query";
-
-
- private String username;
- private String password;
- private String driverName;
- private String dbURL;
- private String query;
-
-
- private transient Connection dbConn;
- private transient Statement statement;
- private transient ResultSet resultSet;
-
-
- /**
- * Creates a non-configured JDBCInputFormat. This format has to be
- * configured using configure(configuration).
- */
- public JDBCInputFormat() {}
-
- /**
- * Creates a JDBCInputFormat and configures it.
- *
- * @param driverName
- * JDBC-Drivename
- * @param dbURL
- * Formatted URL containing all connection parameters.
- * @param username
- * @param password
- * @param query
- * Query to execute.
- */
- public JDBCInputFormat(String driverName, String dbURL, String username, String password, String query) {
- this.driverName = driverName;
- this.query = query;
- this.dbURL = dbURL;
- this.username = username;
- this.password = password;
- }
-
- /**
- * Creates a JDBCInputFormat and configures it.
- *
- * @param driverName
- * JDBC-Drivername
- * @param dbURL
- * Formatted URL containing all connection parameters.
- * @param query
- * Query to execute.
- */
- public JDBCInputFormat(String driverName, String dbURL, String query) {
- this(driverName, dbURL, "", "", query);
- }
-
- /**
- * Creates a JDBCInputFormat and configures it.
- *
- * @param parameters
- * Configuration with all connection parameters.
- * @param query
- * Query to execute.
- */
- public JDBCInputFormat(Configuration parameters, String query) {
- this.driverName = parameters.getString(DRIVER_KEY, "");
- this.username = parameters.getString(USERNAME_KEY, "");
- this.password = parameters.getString(PASSWORD_KEY, "");
- this.dbURL = parameters.getString(URL_KEY, "");
- this.query = query;
- }
-
-
- /**
- * Configures this JDBCInputFormat. This includes setting the connection
- * parameters (if necessary), establishing the connection and executing the
- * query.
- *
- * @param parameters
- * Configuration containing all or no parameters.
- */
- @Override
- public void configure(Configuration parameters) {
- boolean needConfigure = isFieldNullOrEmpty(this.query) || isFieldNullOrEmpty(this.dbURL);
- if (needConfigure) {
- this.driverName = parameters.getString(DRIVER_KEY, null);
- this.username = parameters.getString(USERNAME_KEY, null);
- this.password = parameters.getString(PASSWORD_KEY, null);
- this.query = parameters.getString(QUERY_KEY, null);
- this.dbURL = parameters.getString(URL_KEY, null);
- }
-
- try {
- prepareQueryExecution();
- } catch (SQLException e) {
- throw new IllegalArgumentException("Configure failed:\t!", e);
- }
- }
-
- /**
- * Enters data value from the current resultSet into a Record.
- *
- * @param pos
- * Record position to be set.
- * @param type
- * SQL type of the resultSet value.
- * @param record
- * Target Record.
- */
- private void retrieveTypeAndFillRecord(int pos, int type, Record record) throws SQLException,
- NotTransformableSQLFieldException {
- switch (type) {
- case java.sql.Types.NULL:
- record.setField(pos, NullValue.getInstance());
- break;
- case java.sql.Types.BOOLEAN:
- record.setField(pos, new BooleanValue(resultSet.getBoolean(pos + 1)));
- break;
- case java.sql.Types.BIT:
- record.setField(pos, new BooleanValue(resultSet.getBoolean(pos + 1)));
- break;
- case java.sql.Types.CHAR:
- record.setField(pos, new StringValue(resultSet.getString(pos + 1)));
- break;
- case java.sql.Types.NCHAR:
- record.setField(pos, new StringValue(resultSet.getString(pos + 1)));
- break;
- case java.sql.Types.VARCHAR:
- record.setField(pos, new StringValue(resultSet.getString(pos + 1)));
- break;
- case java.sql.Types.LONGVARCHAR:
- record.setField(pos, new StringValue(resultSet.getString(pos + 1)));
- break;
- case java.sql.Types.LONGNVARCHAR:
- record.setField(pos, new StringValue(resultSet.getString(pos + 1)));
- break;
- case java.sql.Types.TINYINT:
- record.setField(pos, new ShortValue(resultSet.getShort(pos + 1)));
- break;
- case java.sql.Types.SMALLINT:
- record.setField(pos, new ShortValue(resultSet.getShort(pos + 1)));
- break;
- case java.sql.Types.BIGINT:
- record.setField(pos, new LongValue(resultSet.getLong(pos + 1)));
- break;
- case java.sql.Types.INTEGER:
- record.setField(pos, new IntValue(resultSet.getInt(pos + 1)));
- break;
- case java.sql.Types.FLOAT:
- record.setField(pos, new DoubleValue(resultSet.getDouble(pos + 1)));
- break;
- case java.sql.Types.REAL:
- record.setField(pos, new FloatValue(resultSet.getFloat(pos + 1)));
- break;
- case java.sql.Types.DOUBLE:
- record.setField(pos, new DoubleValue(resultSet.getDouble(pos + 1)));
- break;
- case java.sql.Types.DECIMAL:
- record.setField(pos, new DoubleValue(resultSet.getBigDecimal(pos + 1).doubleValue()));
- break;
- case java.sql.Types.NUMERIC:
- record.setField(pos, new DoubleValue(resultSet.getBigDecimal(pos + 1).doubleValue()));
- break;
- case java.sql.Types.DATE:
- record.setField(pos, new StringValue(resultSet.getDate(pos + 1).toString()));
- break;
- case java.sql.Types.TIME:
- record.setField(pos, new LongValue(resultSet.getTime(pos + 1).getTime()));
- break;
- case java.sql.Types.TIMESTAMP:
- record.setField(pos, new StringValue(resultSet.getTimestamp(pos + 1).toString()));
- break;
- case java.sql.Types.SQLXML:
- record.setField(pos, new StringValue(resultSet.getSQLXML(pos + 1).toString()));
- break;
- default:
- throw new NotTransformableSQLFieldException("Unknown sql-type [" + type + "]on column [" + pos + "]");
-
- // case java.sql.Types.BINARY:
- // case java.sql.Types.VARBINARY:
- // case java.sql.Types.LONGVARBINARY:
- // case java.sql.Types.ARRAY:
- // case java.sql.Types.JAVA_OBJECT:
- // case java.sql.Types.BLOB:
- // case java.sql.Types.CLOB:
- // case java.sql.Types.NCLOB:
- // case java.sql.Types.DATALINK:
- // case java.sql.Types.DISTINCT:
- // case java.sql.Types.OTHER:
- // case java.sql.Types.REF:
- // case java.sql.Types.ROWID:
- // case java.sql.Types.STRUCT:
- }
- }
-
- private boolean isFieldNullOrEmpty(String field) {
- return (field == null || field.length() == 0);
- }
-
- private void prepareQueryExecution() throws SQLException {
- setClassForDBType();
- prepareCredentialsAndExecute();
- }
-
- /**
- * Loads appropriate JDBC driver.
- *
- * @param dbType
- * Type of the database.
- * @return boolean value, indication whether an appropriate driver could be
- * found.
- */
- private void setClassForDBType() {
- try {
- Class.forName(driverName);
- } catch (ClassNotFoundException cnfe) {
- throw new IllegalArgumentException("JDBC-Class not found:\t" + cnfe.getLocalizedMessage());
- }
- }
-
- private void prepareCredentialsAndExecute() throws SQLException {
- if (isFieldNullOrEmpty(username)) {
- prepareConnection(dbURL);
- } else {
- prepareConnection();
- }
- executeQuery();
- }
-
- /**
- * Establishes a connection to a database.
- *
- * @param dbURL
- * Assembled URL containing all connection parameters.
- * @return boolean value, indicating whether a connection could be
- * established
- */
- private void prepareConnection(String dbURL) throws SQLException {
- dbConn = DriverManager.getConnection(dbURL);
- }
-
- /**
- * Assembles the Database URL and establishes a connection.
- *
- * @param dbType
- * Type of the database.
- * @param username
- * Login username.
- * @param password
- * Login password.
- * @return boolean value, indicating whether a connection could be
- * established
- */
- private void prepareConnection() throws SQLException {
- dbConn = DriverManager.getConnection(dbURL, username, password);
- }
-
- private void executeQuery() throws SQLException {
- statement = dbConn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
- resultSet = statement.executeQuery(this.query);
- }
-
- /**
- * Checks whether all data has been read.
- *
- * @return boolean value indication whether all data has been read.
- */
- @Override
- public boolean reachedEnd() {
- try {
- if (resultSet.isLast()) {
- resultSet.close();
- statement.close();
- dbConn.close();
- return true;
- } else {
- return false;
- }
- } catch (SQLException e) {
- throw new IllegalArgumentException("Couldn't evaluate reachedEnd():\t" + e.getMessage());
- } catch (NullPointerException e) {
- throw new IllegalArgumentException("Couldn't access resultSet:\t" + e.getMessage());
- }
- }
-
- /**
- * Stores the next resultSet row in a Record
- *
- * @param record
- * target Record
- * @return boolean value indicating that the operation was successful
- */
- @Override
- public Record nextRecord(Record record) {
- try {
- resultSet.next();
- ResultSetMetaData rsmd = resultSet.getMetaData();
- int column_count = rsmd.getColumnCount();
- record.setNumFields(column_count);
-
- for (int pos = 0; pos < column_count; pos++) {
- int type = rsmd.getColumnType(pos + 1);
- retrieveTypeAndFillRecord(pos, type, record);
- }
- return record;
- } catch (SQLException e) {
- throw new IllegalArgumentException("Couldn't read data:\t" + e.getMessage());
- } catch (NotTransformableSQLFieldException e) {
- throw new IllegalArgumentException("Couldn't read data because of unknown column sql-type:\t"
- + e.getMessage());
- } catch (NullPointerException e) {
- throw new IllegalArgumentException("Couldn't access resultSet:\t" + e.getMessage());
- }
- }
-
- public static class NotTransformableSQLFieldException extends Exception {
-
- private static final long serialVersionUID = 1L;
-
- public NotTransformableSQLFieldException(String message) {
- super(message);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormat.java b/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormat.java
deleted file mode 100644
index a99b38e..0000000
--- a/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormat.java
+++ /dev/null
@@ -1,353 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io.jdbc;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-
-import org.apache.flink.api.common.io.FileOutputFormat;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.record.operators.GenericDataSink;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.BooleanValue;
-import org.apache.flink.types.ByteValue;
-import org.apache.flink.types.CharValue;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.FloatValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.ShortValue;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.types.Value;
-
-public class JDBCOutputFormat implements OutputFormat<Record> {
- private static final long serialVersionUID = 1L;
-
- private static final int DEFAULT_BATCH_INTERVERAL = 5000;
-
- public static final String DRIVER_KEY = "driver";
- public static final String USERNAME_KEY = "username";
- public static final String PASSWORD_KEY = "password";
- public static final String URL_KEY = "url";
- public static final String QUERY_KEY = "query";
- public static final String FIELD_COUNT_KEY = "fields";
- public static final String FIELD_TYPE_KEY = "type";
- public static final String BATCH_INTERVAL = "batchInt";
-
- private Connection dbConn;
- private PreparedStatement upload;
-
- private String username;
- private String password;
- private String driverName;
- private String dbURL;
-
- private String query;
- private int fieldCount;
- private Class<? extends Value>[] fieldClasses;
-
- /**
- * Variable indicating the current number of insert sets in a batch.
- */
- private int batchCount = 0;
-
- /**
- * Commit interval of batches.
- * High batch interval: faster inserts, more memory required (reduce if OutOfMemoryExceptions occur)
- * low batch interval: slower inserts, less memory.
- */
- private int batchInterval = DEFAULT_BATCH_INTERVERAL;
-
-
- /**
- * Configures this JDBCOutputFormat.
- *
- * @param parameters
- * Configuration containing all parameters.
- */
- @Override
- public void configure(Configuration parameters) {
- this.driverName = parameters.getString(DRIVER_KEY, null);
- this.username = parameters.getString(USERNAME_KEY, null);
- this.password = parameters.getString(PASSWORD_KEY, null);
- this.dbURL = parameters.getString(URL_KEY, null);
- this.query = parameters.getString(QUERY_KEY, null);
- this.fieldCount = parameters.getInteger(FIELD_COUNT_KEY, 0);
- this.batchInterval = parameters.getInteger(BATCH_INTERVAL, DEFAULT_BATCH_INTERVERAL);
-
- @SuppressWarnings("unchecked")
- Class<Value>[] classes = new Class[this.fieldCount];
- this.fieldClasses = classes;
-
- for (int i = 0; i < this.fieldCount; i++) {
- @SuppressWarnings("unchecked")
- Class<? extends Value> clazz = (Class<? extends Value>) parameters.getClass(FIELD_TYPE_KEY + i, null);
- if (clazz == null) {
- throw new IllegalArgumentException("Invalid configuration for JDBCOutputFormat: "
- + "No type class for parameter " + i);
- }
- this.fieldClasses[i] = clazz;
- }
- }
-
- /**
- * Connects to the target database and initializes the prepared statement.
- *
- * @param taskNumber The number of the parallel instance.
- * @throws IOException Thrown, if the output could not be opened due to an
- * I/O problem.
- */
- @Override
- public void open(int taskNumber, int numTasks) throws IOException {
- try {
- establishConnection();
- upload = dbConn.prepareStatement(query);
- } catch (SQLException sqe) {
- throw new IllegalArgumentException("open() failed:\t!", sqe);
- } catch (ClassNotFoundException cnfe) {
- throw new IllegalArgumentException("JDBC-Class not found:\t", cnfe);
- }
- }
-
- private void establishConnection() throws SQLException, ClassNotFoundException {
- Class.forName(driverName);
- if (username == null) {
- dbConn = DriverManager.getConnection(dbURL);
- } else {
- dbConn = DriverManager.getConnection(dbURL, username, password);
- }
- }
-
- /**
- * Adds a record to the prepared statement.
- * <p>
- * When this method is called, the output format is guaranteed to be opened.
- *
- * @param record The records to add to the output.
- * @throws IOException Thrown, if the records could not be added due to an
- * I/O problem.
- */
-
- @Override
- public void writeRecord(Record record) throws IOException {
- try {
- for (int x = 0; x < record.getNumFields(); x++) {
- Value temp = record.getField(x, fieldClasses[x]);
- addValue(x + 1, temp);
- }
- upload.addBatch();
- batchCount++;
- if(batchCount >= batchInterval) {
- upload.executeBatch();
- batchCount = 0;
- }
- } catch (SQLException sqe) {
- throw new IllegalArgumentException("writeRecord() failed:\t", sqe);
- } catch (IllegalArgumentException iae) {
- throw new IllegalArgumentException("writeRecord() failed:\t", iae);
- }
- }
-
- private enum pactType {
- BooleanValue,
- ByteValue,
- CharValue,
- DoubleValue,
- FloatValue,
- IntValue,
- LongValue,
- ShortValue,
- StringValue
- }
-
- private void addValue(int index, Value value) throws SQLException {
- pactType type;
- try {
- type = pactType.valueOf(value.getClass().getSimpleName());
- } catch (IllegalArgumentException iae) {
- throw new IllegalArgumentException("PactType not supported:\t", iae);
- }
- switch (type) {
- case BooleanValue:
- upload.setBoolean(index, ((BooleanValue) value).getValue());
- break;
- case ByteValue:
- upload.setByte(index, ((ByteValue) value).getValue());
- break;
- case CharValue:
- upload.setString(index, String.valueOf(((CharValue) value).getValue()));
- break;
- case DoubleValue:
- upload.setDouble(index, ((DoubleValue) value).getValue());
- break;
- case FloatValue:
- upload.setFloat(index, ((FloatValue) value).getValue());
- break;
- case IntValue:
- upload.setInt(index, ((IntValue) value).getValue());
- break;
- case LongValue:
- upload.setLong(index, ((LongValue) value).getValue());
- break;
- case ShortValue:
- upload.setShort(index, ((ShortValue) value).getValue());
- break;
- case StringValue:
- upload.setString(index, ((StringValue) value).getValue());
- break;
- }
- }
-
- /**
- * Executes prepared statement and closes all resources of this instance.
- *
- * @throws IOException Thrown, if the input could not be closed properly.
- */
- @Override
- public void close() throws IOException {
- try {
- upload.executeBatch();
- batchCount = 0;
- upload.close();
- dbConn.close();
- } catch (SQLException sqe) {
- throw new IllegalArgumentException("close() failed:\t", sqe);
- }
- }
-
- /**
- * Creates a configuration builder that can be used to set the
- * output format's parameters to the config in a fluent fashion.
- *
- * @return A config builder for setting parameters.
- */
- public static ConfigBuilder configureOutputFormat(GenericDataSink target) {
- return new ConfigBuilder(target.getParameters());
- }
-
- /**
- * Abstract builder used to set parameters to the output format's
- * configuration in a fluent way.
- */
- protected static abstract class AbstractConfigBuilder<T>
- extends FileOutputFormat.AbstractConfigBuilder<T> {
-
- /**
- * Creates a new builder for the given configuration.
- *
- * @param config The configuration into which the parameters will be written.
- */
- protected AbstractConfigBuilder(Configuration config) {
- super(config);
- }
-
- /**
- * Sets the query field.
- * @param value value to be set.
- * @return The builder itself.
- */
- public T setQuery(String value) {
- this.config.setString(QUERY_KEY, value);
- @SuppressWarnings("unchecked")
- T ret = (T) this;
- return ret;
- }
-
- /**
- * Sets the url field.
- * @param value value to be set.
- * @return The builder itself.
- */
- public T setUrl(String value) {
- this.config.setString(URL_KEY, value);
- @SuppressWarnings("unchecked")
- T ret = (T) this;
- return ret;
- }
-
- /**
- * Sets the username field.
- * @param value value to be set.
- * @return The builder itself.
- */
- public T setUsername(String value) {
- this.config.setString(USERNAME_KEY, value);
- @SuppressWarnings("unchecked")
- T ret = (T) this;
- return ret;
- }
-
- /**
- * Sets the password field.
- * @param value value to be set.
- * @return The builder itself.
- */
- public T setPassword(String value) {
- this.config.setString(PASSWORD_KEY, value);
- @SuppressWarnings("unchecked")
- T ret = (T) this;
- return ret;
- }
-
- /**
- * Sets the driver field.
- * @param value value to be set.
- * @return The builder itself.
- */
- public T setDriver(String value) {
- this.config.setString(DRIVER_KEY, value);
- @SuppressWarnings("unchecked")
- T ret = (T) this;
- return ret;
- }
-
- /**
- * Sets the type of a column.
- * Types are applied in the order they were set.
- * @param type PactType to apply.
- * @return The builder itself.
- */
- public T setClass(Class<? extends Value> type) {
- final int numYet = this.config.getInteger(FIELD_COUNT_KEY, 0);
- this.config.setClass(FIELD_TYPE_KEY + numYet, type);
- this.config.setInteger(FIELD_COUNT_KEY, numYet + 1);
- @SuppressWarnings("unchecked")
- T ret = (T) this;
- return ret;
- }
- }
-
- /**
- * A builder used to set parameters to the output format's configuration in a fluent way.
- */
- public static final class ConfigBuilder extends AbstractConfigBuilder<ConfigBuilder> {
- /**
- * Creates a new builder for the given configuration.
- *
- * @param targetConfig The configuration into which the parameters will be written.
- */
- protected ConfigBuilder(Configuration targetConfig) {
- super(targetConfig);
- }
- }
-}