You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/07/12 14:48:40 UTC

[61/73] [abbrv] prefix all projects in addons and quickstarts with flink-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java b/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
deleted file mode 100644
index 9ff5af7..0000000
--- a/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
+++ /dev/null
@@ -1,407 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.addons.hbase;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.addons.hbase.common.HBaseKey;
-import org.apache.flink.addons.hbase.common.HBaseResult;
-import org.apache.flink.addons.hbase.common.HBaseUtil;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.OperatingSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.TableRecordReader;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.util.StringUtils;
-
-/**
- * {@link InputFormat} subclass that wraps the access for HTables.
- */
-public class TableInputFormat implements InputFormat<Record, TableInputSplit> {
-
-	private static final long serialVersionUID = 1L;
-
-	private static final Log LOG = LogFactory.getLog(TableInputFormat.class);
-
-	/** A handle on an HBase table */
-	private HTable table;
-
-	/** The scanner that performs the actual access on the table. HBase object */
-	private Scan scan;
-
-	/** Hbase' iterator wrapper */
-	private TableRecordReader tableRecordReader;
-
-	/** helper variable to decide whether the input is exhausted or not */
-	private boolean endReached = false;
-
-	/** Job parameter that specifies the input table. */
-	public static final String INPUT_TABLE = "hbase.inputtable";
-
-	/** Location of the hbase-site.xml. If set, the HBaseAdmin will build inside */
-	public static final String CONFIG_LOCATION = "hbase.config.location";
-
-	/**
-	 * Base-64 encoded scanner. All other SCAN_ confs are ignored if this is specified.
-	 * See {@link TableMapReduceUtil#convertScanToString(Scan)} for more details.
-	 */
-	public static final String SCAN = "hbase.scan";
-
-	/** Column Family to Scan */
-	public static final String SCAN_COLUMN_FAMILY = "hbase.scan.column.family";
-
-	/** Space delimited list of columns to scan. */
-	public static final String SCAN_COLUMNS = "hbase.scan.columns";
-
-	/** The timestamp used to filter columns with a specific timestamp. */
-	public static final String SCAN_TIMESTAMP = "hbase.scan.timestamp";
-
-	/** The starting timestamp used to filter columns with a specific range of versions. */
-	public static final String SCAN_TIMERANGE_START = "hbase.scan.timerange.start";
-
-	/** The ending timestamp used to filter columns with a specific range of versions. */
-	public static final String SCAN_TIMERANGE_END = "hbase.scan.timerange.end";
-
-	/** The maximum number of version to return. */
-	public static final String SCAN_MAXVERSIONS = "hbase.scan.maxversions";
-
-	/** Set to false to disable server-side caching of blocks for this scan. */
-	public static final String SCAN_CACHEBLOCKS = "hbase.scan.cacheblocks";
-
-	/** The number of rows for caching that will be passed to scanners. */
-	public static final String SCAN_CACHEDROWS = "hbase.scan.cachedrows";
-
-	/** mutable objects that are used to avoid recreation of wrapper objects */
-	protected HBaseKey hbaseKey;
-
-	protected HBaseResult hbaseResult;
-
-	private org.apache.hadoop.conf.Configuration hConf;
-
-	@Override
-	public void configure(Configuration parameters) {
-		HTable table = createTable(parameters);
-		setTable(table);
-		Scan scan = createScanner(parameters);
-		setScan(scan);
-	}
-
-	/**
-	 * Read the configuration and creates a {@link Scan} object.
-	 * 
-	 * @param parameters
-	 * @return
-	 */
-	protected Scan createScanner(Configuration parameters) {
-		Scan scan = null;
-		if (parameters.getString(SCAN, null) != null) {
-			try {
-				scan = HBaseUtil.convertStringToScan(parameters.getString(SCAN, null));
-			} catch (IOException e) {
-				LOG.error("An error occurred.", e);
-			}
-		} else {
-			try {
-				scan = new Scan();
-
-				// if (parameters.getString(SCAN_COLUMNS, null) != null) {
-				// scan.addColumns(parameters.getString(SCAN_COLUMNS, null));
-				// }
-
-				if (parameters.getString(SCAN_COLUMN_FAMILY, null) != null) {
-					scan.addFamily(Bytes.toBytes(parameters.getString(SCAN_COLUMN_FAMILY, null)));
-				}
-
-				if (parameters.getString(SCAN_TIMESTAMP, null) != null) {
-					scan.setTimeStamp(Long.parseLong(parameters.getString(SCAN_TIMESTAMP, null)));
-				}
-
-				if (parameters.getString(SCAN_TIMERANGE_START, null) != null
-					&& parameters.getString(SCAN_TIMERANGE_END, null) != null) {
-					scan.setTimeRange(
-						Long.parseLong(parameters.getString(SCAN_TIMERANGE_START, null)),
-						Long.parseLong(parameters.getString(SCAN_TIMERANGE_END, null)));
-				}
-
-				if (parameters.getString(SCAN_MAXVERSIONS, null) != null) {
-					scan.setMaxVersions(Integer.parseInt(parameters.getString(SCAN_MAXVERSIONS, null)));
-				}
-
-				if (parameters.getString(SCAN_CACHEDROWS, null) != null) {
-					scan.setCaching(Integer.parseInt(parameters.getString(SCAN_CACHEDROWS, null)));
-				}
-
-				// false by default, full table scans generate too much BC churn
-				scan.setCacheBlocks((parameters.getBoolean(SCAN_CACHEBLOCKS, false)));
-			} catch (Exception e) {
-				LOG.error(StringUtils.stringifyException(e));
-			}
-		}
-		return scan;
-	}
-
-	/**
-	 * Create an {@link HTable} instance and set it into this format.
-	 * 
-	 * @param parameters
-	 *        a {@link Configuration} that holds at least the table name.
-	 */
-	protected HTable createTable(Configuration parameters) {
-		String configLocation = parameters.getString(TableInputFormat.CONFIG_LOCATION, null);
-		LOG.info("Got config location: " + configLocation);
-		if (configLocation != null)
-		{
-			org.apache.hadoop.conf.Configuration dummyConf = new org.apache.hadoop.conf.Configuration();
-			if(OperatingSystem.isWindows()) {
-				dummyConf.addResource(new Path("file:/" + configLocation));
-			} else {
-				dummyConf.addResource(new Path("file://" + configLocation));
-			}
-			hConf = HBaseConfiguration.create(dummyConf);
-			;
-			// hConf.set("hbase.master", "im1a5.internetmemory.org");
-			LOG.info("hbase master: " + hConf.get("hbase.master"));
-			LOG.info("zookeeper quorum: " + hConf.get("hbase.zookeeper.quorum"));
-
-		}
-		String tableName = parameters.getString(INPUT_TABLE, "");
-		try {
-			return new HTable(this.hConf, tableName);
-		} catch (Exception e) {
-			LOG.error(StringUtils.stringifyException(e));
-		}
-		return null;
-	}
-
-	@Override
-	public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
-		// TODO Auto-generated method stub
-		return null;
-	}
-
-	@Override
-	public boolean reachedEnd() throws IOException {
-		return this.endReached;
-	}
-
-	protected boolean nextResult() throws IOException {
-		if (this.tableRecordReader == null)
-		{
-			throw new IOException("No table record reader provided!");
-		}
-
-		try {
-			if (this.tableRecordReader.nextKeyValue())
-			{
-				ImmutableBytesWritable currentKey = this.tableRecordReader.getCurrentKey();
-				Result currentValue = this.tableRecordReader.getCurrentValue();
-
-				hbaseKey.setWritable(currentKey);
-				hbaseResult.setResult(currentValue);
-			} else
-			{
-				this.endReached = true;
-				return false;
-			}
-		} catch (InterruptedException e) {
-			LOG.error("Table reader has been interrupted", e);
-			throw new IOException(e);
-		}
-
-		return true;
-	}
-
-	@Override
-	public Record nextRecord(Record record) throws IOException {
-		if (nextResult()) {
-			mapResultToRecord(record, hbaseKey, hbaseResult);
-			return record;
-		} else {
-			return null;
-		}
-	}
-
-	/**
-	 * Maps the current HBase Result into a Record.
-	 * This implementation simply stores the HBaseKey at position 0, and the HBase Result object at position 1.
-	 * 
-	 * @param record
-	 * @param key
-	 * @param result
-	 */
-	public void mapResultToRecord(Record record, HBaseKey key, HBaseResult result) {
-		record.setField(0, key);
-		record.setField(1, result);
-	}
-
-	@Override
-	public void close() throws IOException {
-		this.tableRecordReader.close();
-	}
-
-	@Override
-	public void open(TableInputSplit split) throws IOException {
-		if (split == null)
-		{
-			throw new IOException("Input split is null!");
-		}
-
-		if (this.table == null)
-		{
-			throw new IOException("No HTable provided!");
-		}
-
-		if (this.scan == null)
-		{
-			throw new IOException("No Scan instance provided");
-		}
-
-		this.tableRecordReader = new TableRecordReader();
-
-		this.tableRecordReader.setHTable(this.table);
-
-		Scan sc = new Scan(this.scan);
-		sc.setStartRow(split.getStartRow());
-		LOG.info("split start row: " + new String(split.getStartRow()));
-		sc.setStopRow(split.getEndRow());
-		LOG.info("split end row: " + new String(split.getEndRow()));
-
-		this.tableRecordReader.setScan(sc);
-		this.tableRecordReader.restart(split.getStartRow());
-
-		this.hbaseKey = new HBaseKey();
-		this.hbaseResult = new HBaseResult();
-
-		endReached = false;
-	}
-
-
-	@Override
-	public TableInputSplit[] createInputSplits(final int minNumSplits) throws IOException {
-
-		if (this.table == null) {
-			throw new IOException("No table was provided.");
-		}
-
-		final Pair<byte[][], byte[][]> keys = this.table.getStartEndKeys();
-
-		if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
-
-			throw new IOException("Expecting at least one region.");
-		}
-		int count = 0;
-		final List<TableInputSplit> splits = new ArrayList<TableInputSplit>(keys.getFirst().length);
-		for (int i = 0; i < keys.getFirst().length; i++) {
-
-			if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
-				continue;
-			}
-
-			final String regionLocation = this.table.getRegionLocation(keys.getFirst()[i], false).getHostnamePort();
-			final byte[] startRow = this.scan.getStartRow();
-			final byte[] stopRow = this.scan.getStopRow();
-
-			// determine if the given start an stop key fall into the region
-			if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
-				Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
-				(stopRow.length == 0 ||
-				Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
-
-				final byte[] splitStart = startRow.length == 0 ||
-					Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
-					keys.getFirst()[i] : startRow;
-				final byte[] splitStop = (stopRow.length == 0 ||
-					Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
-					keys.getSecond()[i].length > 0 ?
-					keys.getSecond()[i] : stopRow;
-				final TableInputSplit split = new TableInputSplit(splits.size(), new String[] { regionLocation },
-					this.table.getTableName(), splitStart, splitStop);
-				splits.add(split);
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("getSplits: split -> " + (count++) + " -> " + split);
-				}
-			}
-		}
-
-		return splits.toArray(new TableInputSplit[0]);
-	}
-
-	/**
-	 * Test if the given region is to be included in the InputSplit while splitting
-	 * the regions of a table.
-	 * <p>
-	 * This optimization is effective when there is a specific reasoning to exclude an entire region from the M-R job,
-	 * (and hence, not contributing to the InputSplit), given the start and end keys of the same. <br>
-	 * Useful when we need to remember the last-processed top record and revisit the [last, current) interval for M-R
-	 * processing, continuously. In addition to reducing InputSplits, reduces the load on the region server as well, due
-	 * to the ordering of the keys. <br>
-	 * <br>
-	 * Note: It is possible that <code>endKey.length() == 0 </code> , for the last (recent) region. <br>
-	 * Override this method, if you want to bulk exclude regions altogether from M-R. By default, no region is excluded(
-	 * i.e. all regions are included).
-	 * 
-	 * @param startKey
-	 *        Start key of the region
-	 * @param endKey
-	 *        End key of the region
-	 * @return true, if this region needs to be included as part of the input (default).
-	 */
-	private static boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) {
-		return true;
-	}
-
-
-	@Override
-	public Class<TableInputSplit> getInputSplitType() {
-
-		return TableInputSplit.class;
-	}
-
-	public void setTable(HTable table)
-	{
-		this.table = table;
-	}
-
-	public HTable getTable() {
-		return table;
-	}
-
-	public void setScan(Scan scan)
-	{
-		this.scan = scan;
-	}
-
-	public Scan getScan() {
-		return scan;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java b/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
deleted file mode 100644
index a77402d..0000000
--- a/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.addons.hbase;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.LocatableInputSplit;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-/**
- * This class implements a input splits for HBase. Each table input split corresponds to a key range (low, high). All
- * references to row below refer to the key of the row.
- */
-public class TableInputSplit extends LocatableInputSplit {
-
-	/**
-	 * The name of the table to retrieve data from
-	 */
-	private byte[] tableName;
-
-	/**
-	 * The start row of the split.
-	 */
-	private byte[] startRow;
-
-	/**
-	 * The end row of the split.
-	 */
-	private byte[] endRow;
-
-	/**
-	 * Creates a new table input split
-	 * 
-	 * @param splitNumber
-	 *        the number of the input split
-	 * @param hostnames
-	 *        the names of the hosts storing the data the input split refers to
-	 * @param tableName
-	 *        the name of the table to retrieve data from
-	 * @param startRow
-	 *        the start row of the split
-	 * @param endRow
-	 *        the end row of the split
-	 */
-	TableInputSplit(final int splitNumber, final String[] hostnames, final byte[] tableName, final byte[] startRow,
-			final byte[] endRow) {
-		super(splitNumber, hostnames);
-
-		this.tableName = tableName;
-		this.startRow = startRow;
-		this.endRow = endRow;
-	}
-
-	/**
-	 * Default constructor for serialization/deserialization.
-	 */
-	public TableInputSplit() {
-		super();
-
-		this.tableName = null;
-		this.startRow = null;
-		this.endRow = null;
-	}
-
-	/**
-	 * Returns the table name.
-	 * 
-	 * @return The table name.
-	 */
-	public byte[] getTableName() {
-		return this.tableName;
-	}
-
-	/**
-	 * Returns the start row.
-	 * 
-	 * @return The start row.
-	 */
-	public byte[] getStartRow() {
-		return this.startRow;
-	}
-
-	/**
-	 * Returns the end row.
-	 * 
-	 * @return The end row.
-	 */
-	public byte[] getEndRow() {
-		return this.endRow;
-	}
-
-
-	@Override
-	public void write(final DataOutputView out) throws IOException {
-
-		super.write(out);
-
-		// Write the table name
-		if (this.tableName == null) {
-			out.writeInt(-1);
-		} else {
-			out.writeInt(this.tableName.length);
-			out.write(this.tableName);
-		}
-
-		// Write the start row
-		if (this.startRow == null) {
-			out.writeInt(-1);
-		} else {
-			out.writeInt(this.startRow.length);
-			out.write(this.startRow);
-		}
-
-		// Write the end row
-		if (this.endRow == null) {
-			out.writeInt(-1);
-		} else {
-			out.writeInt(this.endRow.length);
-			out.write(this.endRow);
-		}
-	}
-
-
-	@Override
-	public void read(final DataInputView in) throws IOException {
-
-		super.read(in);
-
-		// Read the table name
-		int len = in.readInt();
-		if (len >= 0) {
-			this.tableName = new byte[len];
-			in.readFully(this.tableName);
-		}
-
-		// Read the start row
-		len = in.readInt();
-		if (len >= 0) {
-			this.startRow = new byte[len];
-			in.readFully(this.startRow);
-		}
-
-		// Read the end row
-		len = in.readInt();
-		if (len >= 0) {
-			this.endRow = new byte[len];
-			in.readFully(this.endRow);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseKey.java
----------------------------------------------------------------------
diff --git a/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseKey.java b/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseKey.java
deleted file mode 100644
index 44d64de..0000000
--- a/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseKey.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.addons.hbase.common;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Key;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
-/**
- * Simple wrapper to encapsulate an HBase h{@link ImmutableBytesWritable} as a Key
- */
-public class HBaseKey implements Key<HBaseKey> {
-
-	private static final long serialVersionUID = 1L;
-
-	private ImmutableBytesWritable writable;
-	
-
-	public HBaseKey() {
-		this.writable = new ImmutableBytesWritable();
-	}
-	
-
-	public HBaseKey(ImmutableBytesWritable writable) {
-		this.writable = writable;
-	}
-	
-	
-	public ImmutableBytesWritable getWritable() {
-		return writable;
-	}
-
-	public void setWritable(ImmutableBytesWritable writable) {
-		this.writable = writable;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		this.writable.write(out);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		this.writable.readFields(in);
-	}
-
-	@Override
-	public int hashCode() {
-		return this.writable.hashCode();
-	}
-	
-	@Override
-	public boolean equals(Object obj) {
-		if (obj.getClass() == HBaseKey.class) {
-			return this.writable.equals(((HBaseKey) obj).writable);
-		} else {
-			return false;
-		}
-	}
-	
-	@Override
-	public int compareTo(HBaseKey other) {
-		return this.writable.compareTo(other.writable);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java
----------------------------------------------------------------------
diff --git a/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java b/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java
deleted file mode 100644
index d66f59f..0000000
--- a/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.addons.hbase.common;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Value;
-import org.apache.hadoop.hbase.client.Result;
-
-public class HBaseResult implements Value {
-	
-	private static final long serialVersionUID = 1L;
-
-	private Result result;
-	
-	
-	public HBaseResult() {
-		this.result = new Result();
-	}
-	
-	public HBaseResult(Result result) {
-		this.result = result;
-	}
-	
-	
-	public Result getResult() {
-		return this.result;
-	}
-	
-	public void setResult(Result result) {
-		this.result = result;
-	}
-	
-	public String getStringData() {
-		if(this.result != null) {
-			return this.result.toString();
-		}
-		return null;
-	}
-	
-	@Override
-	public void read(DataInputView in) throws IOException {
-		this.result.readFields(in);
-	}
-	
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		this.result.write(out);	
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseUtil.java
----------------------------------------------------------------------
diff --git a/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseUtil.java b/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseUtil.java
deleted file mode 100644
index c1911c5..0000000
--- a/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseUtil.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.addons.hbase.common;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Base64;
-
-/**
- * Utility for {@link TableInputFormat}
- */
-public class HBaseUtil {
-
-	/**
-	 * Writes the given scan into a Base64 encoded string.
-	 * 
-	 * @param scan
-	 *        The scan to write out.
-	 * @return The scan saved in a Base64 encoded string.
-	 * @throws IOException
-	 *         When writing the scan fails.
-	 */
-	static String convertScanToString(Scan scan) throws IOException {
-		ByteArrayOutputStream out = new ByteArrayOutputStream();
-		DataOutputStream dos = new DataOutputStream(out);
-		scan.write(dos);
-		return Base64.encodeBytes(out.toByteArray());
-	}
-
-	/**
-	 * Converts the given Base64 string back into a Scan instance.
-	 * 
-	 * @param base64
-	 *        The scan details.
-	 * @return The newly created Scan instance.
-	 * @throws IOException
-	 *         When reading the scan instance fails.
-	 */
-	public static Scan convertStringToScan(String base64) throws IOException {
-		ByteArrayInputStream bis = new ByteArrayInputStream(Base64.decode(base64));
-		DataInputStream dis = new DataInputStream(bis);
-		Scan scan = new Scan();
-		scan.readFields(dis);
-		return scan;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java b/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
deleted file mode 100644
index a7bc2b3..0000000
--- a/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.addons.hbase.example;
-
-import org.apache.flink.addons.hbase.TableInputFormat;
-import org.apache.flink.addons.hbase.common.HBaseKey;
-import org.apache.flink.addons.hbase.common.HBaseResult;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.GenericDataSource;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-
-/**
- * Implements a word count which takes the input file and counts the number of
- * the occurrences of each word in the file.
- */
-public class HBaseReadExample implements Program, ProgramDescription {
-	
-	public static class MyTableInputFormat extends  TableInputFormat {
-		
-		private static final long serialVersionUID = 1L;
-
-		private final byte[] META_FAMILY = "meta".getBytes();
-		
-		private final byte[] USER_COLUMN = "user".getBytes();
-		
-		private final byte[] TIMESTAMP_COLUMN = "timestamp".getBytes();
-		
-		private final byte[] TEXT_FAMILY = "text".getBytes();
-		
-		private final byte[] TWEET_COLUMN = "tweet".getBytes();
-		
-		public MyTableInputFormat() {
-			super();
-			
-		}
-		
-		@Override
-		protected HTable createTable(Configuration parameters) {
-			return super.createTable(parameters);
-		}
-		
-		@Override
-		protected Scan createScanner(Configuration parameters) {
-			Scan scan = new Scan ();
-			scan.addColumn (META_FAMILY, USER_COLUMN);
-			scan.addColumn (META_FAMILY, TIMESTAMP_COLUMN);
-			scan.addColumn (TEXT_FAMILY, TWEET_COLUMN);
-			return scan;
-		}
-		
-		StringValue row_string = new StringValue();
-		StringValue user_string = new StringValue();
-		StringValue timestamp_string = new StringValue();
-		StringValue tweet_string = new StringValue();
-		
-		@Override
-		public void mapResultToRecord(Record record, HBaseKey key,
-				HBaseResult result) {
-			Result res = result.getResult();
-			res.getRow();
-			record.setField(0, toString(row_string, res.getRow()));
-			record.setField(1, toString (user_string, res.getValue(META_FAMILY, USER_COLUMN)));
-			record.setField(2, toString (timestamp_string, res.getValue(META_FAMILY, TIMESTAMP_COLUMN)));
-			record.setField(3, toString (tweet_string, res.getValue(TEXT_FAMILY, TWEET_COLUMN)));
-		}
-		
-		private final StringValue toString (StringValue string, byte[] bytes) {
-			string.setValueAscii(bytes, 0, bytes.length);
-			return string;
-		}
-		
-	}
-	
-
-	@Override
-	public Plan getPlan(String... args) {
-		// parse job parameters
-		int numSubTasks   = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
-		String output    = (args.length > 1 ? args[1] : "");
-
-		GenericDataSource<TableInputFormat> source = new GenericDataSource<TableInputFormat>(new MyTableInputFormat(), "HBase Input");
-		source.setParameter(TableInputFormat.INPUT_TABLE, "twitter");
-		source.setParameter(TableInputFormat.CONFIG_LOCATION, "/etc/hbase/conf/hbase-site.xml");
-		FileDataSink out = new FileDataSink(new CsvOutputFormat(), output, source, "HBase String dump");
-		CsvOutputFormat.configureRecordFormat(out)
-			.recordDelimiter('\n')
-			.fieldDelimiter(' ')
-			.field(StringValue.class, 0)
-			.field(StringValue.class, 1)
-			.field(StringValue.class, 2)
-			.field(StringValue.class, 3);
-		
-		Plan plan = new Plan(out, "HBase access Example");
-		plan.setDefaultParallelism(numSubTasks);
-		return plan;
-	}
-
-
-	@Override
-	public String getDescription() {
-		return "Parameters: [numSubStasks] [input] [output]";
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/jdbc/pom.xml b/flink-addons/jdbc/pom.xml
deleted file mode 100644
index a29d997..0000000
--- a/flink-addons/jdbc/pom.xml
+++ /dev/null
@@ -1,59 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed under the Apache License, Version 2.0 (the "License");
-  you may not use this file except in compliance with the License.
-  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License. See accompanying LICENSE file.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
-	
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-	
-	<modelVersion>4.0.0</modelVersion>
-	
-	<parent>
-		<artifactId>flink-addons</artifactId>
-		<groupId>org.apache.flink</groupId>
-		<version>0.6-incubating-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>jdbc</artifactId>
-	<name>jdbc</name>
-
-	<packaging>jar</packaging>
-
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-java</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-core</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-				
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.derby</groupId>
-			<artifactId>derby</artifactId>
-			<version>10.10.1.1</version>
-			<scope>test</scope>
-		</dependency>
-	</dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java b/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
deleted file mode 100644
index ac8bc07..0000000
--- a/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
+++ /dev/null
@@ -1,356 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io.jdbc;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Statement;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.types.NullValue;
-
-/**
- * InputFormat to read data from a database and generate tuples.
- * The InputFormat has to be configured using the supplied InputFormatBuilder.
- * 
- * @param <OUT>
- * @see Tuple
- * @see DriverManager
- */
-public class JDBCInputFormat<OUT extends Tuple> implements InputFormat<OUT, InputSplit> {
-	private static final long serialVersionUID = 1L;
-
-	@SuppressWarnings("unused")
-	private static final Log LOG = LogFactory.getLog(JDBCInputFormat.class);
-
-	private String username;
-	private String password;
-	private String drivername;
-	private String dbURL;
-	private String query;
-
-	private transient Connection dbConn;
-	private transient Statement statement;
-	private transient ResultSet resultSet;
-
-	private int[] columnTypes = null;
-
-	public JDBCInputFormat() {
-	}
-
-	@Override
-	public void configure(Configuration parameters) {
-	}
-
-	/**
-	 * Connects to the source database and executes the query.
-	 *
-	 * @param ignored
-	 * @throws IOException
-	 */
-	@Override
-	public void open(InputSplit ignored) throws IOException {
-		try {
-			establishConnection();
-			statement = dbConn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
-			resultSet = statement.executeQuery(query);
-		} catch (SQLException se) {
-			close();
-			throw new IllegalArgumentException("open() failed." + se.getMessage(), se);
-		} catch (ClassNotFoundException cnfe) {
-			throw new IllegalArgumentException("JDBC-Class not found. - " + cnfe.getMessage(), cnfe);
-		}
-	}
-
-	private void establishConnection() throws SQLException, ClassNotFoundException {
-		Class.forName(drivername);
-		if (username == null) {
-			dbConn = DriverManager.getConnection(dbURL);
-		} else {
-			dbConn = DriverManager.getConnection(dbURL, username, password);
-		}
-	}
-
-	/**
-	 * Closes all resources used.
-	 *
-	 * @throws IOException Indicates that a resource could not be closed.
-	 */
-	@Override
-	public void close() throws IOException {
-		try {
-			resultSet.close();
-		} catch (SQLException se) {
-			LOG.info("Inputformat couldn't be closed - " + se.getMessage());
-		} catch (NullPointerException npe) {
-		}
-		try {
-			statement.close();
-		} catch (SQLException se) {
-			LOG.info("Inputformat couldn't be closed - " + se.getMessage());
-		} catch (NullPointerException npe) {
-		}
-		try {
-			dbConn.close();
-		} catch (SQLException se) {
-			LOG.info("Inputformat couldn't be closed - " + se.getMessage());
-		} catch (NullPointerException npe) {
-		}
-	}
-
-	/**
-	 * Checks whether all data has been read.
-	 *
-	 * @return boolean value indication whether all data has been read.
-	 * @throws IOException
-	 */
-	@Override
-	public boolean reachedEnd() throws IOException {
-		try {
-			if (resultSet.isLast()) {
-				close();
-				return true;
-			}
-			return false;
-		} catch (SQLException se) {
-			throw new IOException("Couldn't evaluate reachedEnd() - " + se.getMessage(), se);
-		}
-	}
-
-	/**
-	 * Stores the next resultSet row in a tuple
-	 *
-	 * @param tuple
-	 * @return tuple containing next row
-	 * @throws java.io.IOException
-	 */
-	@Override
-	public OUT nextRecord(OUT tuple) throws IOException {
-		try {
-			resultSet.next();
-			if (columnTypes == null) {
-				extractTypes(tuple);
-			}
-			addValue(tuple);
-			return tuple;
-		} catch (SQLException se) {
-			close();
-			throw new IOException("Couldn't read data - " + se.getMessage(), se);
-		} catch (NullPointerException npe) {
-			close();
-			throw new IOException("Couldn't access resultSet", npe);
-		}
-	}
-
-	private void extractTypes(OUT tuple) throws SQLException, IOException {
-		ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
-		columnTypes = new int[resultSetMetaData.getColumnCount()];
-		if (tuple.getArity() != columnTypes.length) {
-			close();
-			throw new IOException("Tuple size does not match columncount");
-		}
-		for (int pos = 0; pos < columnTypes.length; pos++) {
-			columnTypes[pos] = resultSetMetaData.getColumnType(pos + 1);
-		}
-	}
-
-	/**
-	 * Enters data value from the current resultSet into a Record.
-	 *
-	 * @param pos Tuple position to be set.
-	 * @param type SQL type of the resultSet value.
-	 * @param reuse Target Record.
-	 */
-	private void addValue(OUT reuse) throws SQLException {
-		for (int pos = 0; pos < columnTypes.length; pos++) {
-			switch (columnTypes[pos]) {
-				case java.sql.Types.NULL:
-					reuse.setField(NullValue.getInstance(), pos);
-					break;
-				case java.sql.Types.BOOLEAN:
-					reuse.setField(resultSet.getBoolean(pos + 1), pos);
-					break;
-				case java.sql.Types.BIT:
-					reuse.setField(resultSet.getBoolean(pos + 1), pos);
-					break;
-				case java.sql.Types.CHAR:
-					reuse.setField(resultSet.getString(pos + 1), pos);
-					break;
-				case java.sql.Types.NCHAR:
-					reuse.setField(resultSet.getString(pos + 1), pos);
-					break;
-				case java.sql.Types.VARCHAR:
-					reuse.setField(resultSet.getString(pos + 1), pos);
-					break;
-				case java.sql.Types.LONGVARCHAR:
-					reuse.setField(resultSet.getString(pos + 1), pos);
-					break;
-				case java.sql.Types.LONGNVARCHAR:
-					reuse.setField(resultSet.getString(pos + 1), pos);
-					break;
-				case java.sql.Types.TINYINT:
-					reuse.setField(resultSet.getShort(pos + 1), pos);
-					break;
-				case java.sql.Types.SMALLINT:
-					reuse.setField(resultSet.getShort(pos + 1), pos);
-					break;
-				case java.sql.Types.BIGINT:
-					reuse.setField(resultSet.getLong(pos + 1), pos);
-					break;
-				case java.sql.Types.INTEGER:
-					reuse.setField(resultSet.getInt(pos + 1), pos);
-					break;
-				case java.sql.Types.FLOAT:
-					reuse.setField(resultSet.getDouble(pos + 1), pos);
-					break;
-				case java.sql.Types.REAL:
-					reuse.setField(resultSet.getFloat(pos + 1), pos);
-					break;
-				case java.sql.Types.DOUBLE:
-					reuse.setField(resultSet.getDouble(pos + 1), pos);
-					break;
-				case java.sql.Types.DECIMAL:
-					reuse.setField(resultSet.getBigDecimal(pos + 1).doubleValue(), pos);
-					break;
-				case java.sql.Types.NUMERIC:
-					reuse.setField(resultSet.getBigDecimal(pos + 1).doubleValue(), pos);
-					break;
-				case java.sql.Types.DATE:
-					reuse.setField(resultSet.getDate(pos + 1).toString(), pos);
-					break;
-				case java.sql.Types.TIME:
-					reuse.setField(resultSet.getTime(pos + 1).getTime(), pos);
-					break;
-				case java.sql.Types.TIMESTAMP:
-					reuse.setField(resultSet.getTimestamp(pos + 1).toString(), pos);
-					break;
-				case java.sql.Types.SQLXML:
-					reuse.setField(resultSet.getSQLXML(pos + 1).toString(), pos);
-					break;
-				default:
-					throw new SQLException("Unsupported sql-type [" + columnTypes[pos] + "] on column [" + pos + "]");
-
-				// case java.sql.Types.BINARY:
-				// case java.sql.Types.VARBINARY:
-				// case java.sql.Types.LONGVARBINARY:
-				// case java.sql.Types.ARRAY:
-				// case java.sql.Types.JAVA_OBJECT:
-				// case java.sql.Types.BLOB:
-				// case java.sql.Types.CLOB:
-				// case java.sql.Types.NCLOB:
-				// case java.sql.Types.DATALINK:
-				// case java.sql.Types.DISTINCT:
-				// case java.sql.Types.OTHER:
-				// case java.sql.Types.REF:
-				// case java.sql.Types.ROWID:
-				// case java.sql.Types.STRUCT:
-			}
-		}
-	}
-
-	@Override
-	public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
-		return cachedStatistics;
-	}
-
-	@Override
-	public InputSplit[] createInputSplits(int minNumSplits) throws IOException {
-		GenericInputSplit[] split = {
-			new GenericInputSplit(0, 1)
-		};
-		return split;
-	}
-
-	@Override
-	public Class<? extends InputSplit> getInputSplitType() {
-		return GenericInputSplit.class;
-	}
-
-	/**
-	 * A builder used to set parameters to the output format's configuration in a fluent way.
-	 * @return builder
-	 */
-	public static JDBCInputFormatBuilder buildJDBCInputFormat() {
-		return new JDBCInputFormatBuilder();
-	}
-
-	public static class JDBCInputFormatBuilder {
-		private final JDBCInputFormat format;
-
-		public JDBCInputFormatBuilder() {
-			this.format = new JDBCInputFormat();
-		}
-
-		public JDBCInputFormatBuilder setUsername(String username) {
-			format.username = username;
-			return this;
-		}
-
-		public JDBCInputFormatBuilder setPassword(String password) {
-			format.password = password;
-			return this;
-		}
-
-		public JDBCInputFormatBuilder setDrivername(String drivername) {
-			format.drivername = drivername;
-			return this;
-		}
-
-		public JDBCInputFormatBuilder setDBUrl(String dbURL) {
-			format.dbURL = dbURL;
-			return this;
-		}
-
-		public JDBCInputFormatBuilder setQuery(String query) {
-			format.query = query;
-			return this;
-		}
-
-		public JDBCInputFormat finish() {
-			if (format.username == null) {
-				LOG.info("Username was not supplied separately.");
-			}
-			if (format.password == null) {
-				LOG.info("Password was not supplied separately.");
-			}
-			if (format.dbURL == null) {
-				throw new IllegalArgumentException("No dababase URL supplied.");
-			}
-			if (format.query == null) {
-				throw new IllegalArgumentException("No query suplied");
-			}
-			if (format.drivername == null) {
-				throw new IllegalArgumentException("No driver supplied");
-			}
-			return format;
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java b/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
deleted file mode 100644
index 3a75480..0000000
--- a/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io.jdbc;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.configuration.Configuration;
-
-/**
- * OutputFormat to write tuples into a database.
- * The OutputFormat has to be configured using the supplied OutputFormatBuilder.
- * 
- * @param <OUT>
- * @see Tuple
- * @see DriverManager
- */
-public class JDBCOutputFormat<OUT extends Tuple> implements OutputFormat<OUT> {
-	private static final long serialVersionUID = 1L;
-
-	@SuppressWarnings("unused")
-	private static final Log LOG = LogFactory.getLog(JDBCOutputFormat.class);
-
-	private String username;
-	private String password;
-	private String drivername;
-	private String dbURL;
-	private String query;
-	private int batchInterval = 5000;
-
-	private Connection dbConn;
-	private PreparedStatement upload;
-
-	private SupportedTypes[] types = null;
-
-	private int batchCount = 0;
-
-	public JDBCOutputFormat() {
-	}
-
-	@Override
-	public void configure(Configuration parameters) {
-	}
-
-	/**
-	 * Connects to the target database and initializes the prepared statement.
-	 *
-	 * @param taskNumber The number of the parallel instance.
-	 * @throws IOException Thrown, if the output could not be opened due to an
-	 * I/O problem.
-	 */
-	@Override
-	public void open(int taskNumber, int numTasks) throws IOException {
-		try {
-			establishConnection();
-			upload = dbConn.prepareStatement(query);
-		} catch (SQLException sqe) {
-			close();
-			throw new IllegalArgumentException("open() failed:\t!", sqe);
-		} catch (ClassNotFoundException cnfe) {
-			close();
-			throw new IllegalArgumentException("JDBC-Class not found:\t", cnfe);
-		}
-	}
-
-	private void establishConnection() throws SQLException, ClassNotFoundException {
-		Class.forName(drivername);
-		if (username == null) {
-			dbConn = DriverManager.getConnection(dbURL);
-		} else {
-			dbConn = DriverManager.getConnection(dbURL, username, password);
-		}
-	}
-
-	private enum SupportedTypes {
-		BOOLEAN,
-		BYTE,
-		SHORT,
-		INTEGER,
-		LONG,
-		STRING,
-		FLOAT,
-		DOUBLE
-	}
-
-	/**
-	 * Adds a record to the prepared statement.
-	 * <p>
-	 * When this method is called, the output format is guaranteed to be opened.
-	 *
-	 * @param tuple The records to add to the output.
-	 * @throws IOException Thrown, if the records could not be added due to an I/O problem.
-	 */
-	@Override
-	public void writeRecord(OUT tuple) throws IOException {
-		try {
-			if (query.split("\\?,").length != tuple.getArity()) {
-				close();
-				throw new IOException("Tuple size does not match columncount");
-			}
-			if (types == null) {
-				extractTypes(tuple);
-			}
-			addValues(tuple);
-			upload.addBatch();
-			batchCount++;
-			if (batchCount >= batchInterval) {
-				upload.executeBatch();
-				batchCount = 0;
-			}
-		} catch (SQLException sqe) {
-			close();
-			throw new IllegalArgumentException("writeRecord() failed", sqe);
-		} catch (IllegalArgumentException iae) {
-			close();
-			throw new IllegalArgumentException("writeRecord() failed", iae);
-		}
-	}
-
-	private void extractTypes(OUT tuple) {
-		types = new SupportedTypes[tuple.getArity()];
-		for (int x = 0; x < tuple.getArity(); x++) {
-			types[x] = SupportedTypes.valueOf(tuple.getField(x).getClass().getSimpleName().toUpperCase());
-		}
-	}
-
-	private void addValues(OUT tuple) throws SQLException {
-		for (int index = 0; index < tuple.getArity(); index++) {
-			switch (types[index]) {
-				case BOOLEAN:
-					upload.setBoolean(index + 1, (Boolean) tuple.getField(index));
-					break;
-				case BYTE:
-					upload.setByte(index + 1, (Byte) tuple.getField(index));
-					break;
-				case SHORT:
-					upload.setShort(index + 1, (Short) tuple.getField(index));
-					break;
-				case INTEGER:
-					upload.setInt(index + 1, (Integer) tuple.getField(index));
-					break;
-				case LONG:
-					upload.setLong(index + 1, (Long) tuple.getField(index));
-					break;
-				case STRING:
-					upload.setString(index + 1, (String) tuple.getField(index));
-					break;
-				case FLOAT:
-					upload.setFloat(index + 1, (Float) tuple.getField(index));
-					break;
-				case DOUBLE:
-					upload.setDouble(index + 1, (Double) tuple.getField(index));
-					break;
-			}
-		}
-	}
-
-	/**
-	 * Executes prepared statement and closes all resources of this instance.
-	 *
-	 * @throws IOException Thrown, if the input could not be closed properly.
-	 */
-	@Override
-	public void close() throws IOException {
-		try {
-			upload.executeBatch();
-			batchCount = 0;
-		} catch (SQLException se) {
-			throw new IllegalArgumentException("close() failed", se);
-		} catch (NullPointerException se) {
-		}
-		try {
-			upload.close();
-		} catch (SQLException se) {
-			LOG.info("Inputformat couldn't be closed - " + se.getMessage());
-		} catch (NullPointerException npe) {
-		}
-		try {
-			dbConn.close();
-		} catch (SQLException se) {
-			LOG.info("Inputformat couldn't be closed - " + se.getMessage());
-		} catch (NullPointerException npe) {
-		}
-	}
-
-	public static JDBCOutputFormatBuilder buildJDBCOutputFormat() {
-		return new JDBCOutputFormatBuilder();
-	}
-
-	public static class JDBCOutputFormatBuilder {
-		private final JDBCOutputFormat format;
-
-		protected JDBCOutputFormatBuilder() {
-			this.format = new JDBCOutputFormat();
-		}
-
-		public JDBCOutputFormatBuilder setUsername(String username) {
-			format.username = username;
-			return this;
-		}
-
-		public JDBCOutputFormatBuilder setPassword(String password) {
-			format.password = password;
-			return this;
-		}
-
-		public JDBCOutputFormatBuilder setDrivername(String drivername) {
-			format.drivername = drivername;
-			return this;
-		}
-
-		public JDBCOutputFormatBuilder setDBUrl(String dbURL) {
-			format.dbURL = dbURL;
-			return this;
-		}
-
-		public JDBCOutputFormatBuilder setQuery(String query) {
-			format.query = query;
-			return this;
-		}
-
-		public JDBCOutputFormatBuilder setBatchInterval(int batchInterval) {
-			format.batchInterval = batchInterval;
-			return this;
-		}
-
-		/**
-		Finalizes the configuration and checks validity.
-		@return Configured JDBCOutputFormat
-		 */
-		public JDBCOutputFormat finish() {
-			if (format.username == null) {
-				LOG.info("Username was not supplied separately.");
-			}
-			if (format.password == null) {
-				LOG.info("Password was not supplied separately.");
-			}
-			if (format.dbURL == null) {
-				throw new IllegalArgumentException("No dababase URL supplied.");
-			}
-			if (format.query == null) {
-				throw new IllegalArgumentException("No query suplied");
-			}
-			if (format.drivername == null) {
-				throw new IllegalArgumentException("No driver supplied");
-			}
-			return format;
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java b/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java
deleted file mode 100644
index 7d0c5e8..0000000
--- a/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io.jdbc.example;
-
-import static org.apache.flink.api.java.typeutils.BasicTypeInfo.DOUBLE_TYPE_INFO;
-import static org.apache.flink.api.java.typeutils.BasicTypeInfo.INT_TYPE_INFO;
-import static org.apache.flink.api.java.typeutils.BasicTypeInfo.STRING_TYPE_INFO;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.Statement;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
-import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-
-public class JDBCExample {
-
-	public static void main(String[] args) throws Exception {
-		prepareTestDb();
-
-		ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<Tuple5> source
-				= environment.createInput(JDBCInputFormat.buildJDBCInputFormat()
-						.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
-						.setDBUrl("jdbc:derby:memory:ebookshop")
-						.setQuery("select * from books")
-						.finish(),
-						new TupleTypeInfo(Tuple5.class, INT_TYPE_INFO, STRING_TYPE_INFO, STRING_TYPE_INFO, DOUBLE_TYPE_INFO, INT_TYPE_INFO)
-				);
-
-		source.output(JDBCOutputFormat.buildJDBCOutputFormat()
-				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
-				.setDBUrl("jdbc:derby:memory:ebookshop")
-				.setQuery("insert into newbooks (id,title,author,price,qty) values (?,?,?,?,?)")
-				.finish());
-		environment.execute();
-	}
-
-	private static void prepareTestDb() throws Exception {
-		String dbURL = "jdbc:derby:memory:ebookshop;create=true";
-		Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-		Connection conn = DriverManager.getConnection(dbURL);
-
-		StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books (");
-		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
-		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
-		sqlQueryBuilder.append("qty INT DEFAULT NULL,");
-		sqlQueryBuilder.append("PRIMARY KEY (id))");
-
-		Statement stat = conn.createStatement();
-		stat.executeUpdate(sqlQueryBuilder.toString());
-		stat.close();
-
-		sqlQueryBuilder = new StringBuilder("CREATE TABLE newbooks (");
-		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
-		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
-		sqlQueryBuilder.append("qty INT DEFAULT NULL,");
-		sqlQueryBuilder.append("PRIMARY KEY (id))");
-
-		stat = conn.createStatement();
-		stat.executeUpdate(sqlQueryBuilder.toString());
-		stat.close();
-
-		sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
-		sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),");
-		sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),");
-		sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),");
-		sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),");
-		sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)");
-
-		stat = conn.createStatement();
-		stat.execute(sqlQueryBuilder.toString());
-		stat.close();
-
-		conn.close();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormat.java b/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormat.java
deleted file mode 100644
index f2930f2..0000000
--- a/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormat.java
+++ /dev/null
@@ -1,389 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io.jdbc;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Statement;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.io.NonParallelInput;
-import org.apache.flink.api.java.record.io.GenericInputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.BooleanValue;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.FloatValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.ShortValue;
-import org.apache.flink.types.StringValue;
-
-/**
- * InputFormat to read data from a database and generate PactReords.
- * The InputFormat has to be configured with the query, and either all
- * connection parameters or a complete database URL.{@link Configuration} The position of a value inside a Record is
- * determined by the table
- * returned.
- * 
- * @see Configuration
- * @see Record
- * @see DriverManager
- */
-public class JDBCInputFormat extends GenericInputFormat implements NonParallelInput {
-
-	private static final long serialVersionUID = 1L;
-	
-	@SuppressWarnings("unused")
-	private static final Log LOG = LogFactory.getLog(JDBCInputFormat.class);
-	
-
-	public final String DRIVER_KEY = "driver";
-	public final String USERNAME_KEY = "username";
-	public final String PASSWORD_KEY = "password";
-	public final String URL_KEY = "url";
-	public final String QUERY_KEY = "query";
-
-
-	private String username;
-	private String password;
-	private String driverName;
-	private String dbURL;
-	private String query;
-
-	
-	private transient Connection dbConn;
-	private transient Statement statement;
-	private transient ResultSet resultSet;
-
-
-	/**
-	 * Creates a non-configured JDBCInputFormat. This format has to be
-	 * configured using configure(configuration).
-	 */
-	public JDBCInputFormat() {}
-
-	/**
-	 * Creates a JDBCInputFormat and configures it.
-	 * 
-	 * @param driverName
-	 *        JDBC-Drivename
-	 * @param dbURL
-	 *        Formatted URL containing all connection parameters.
-	 * @param username
-	 * @param password
-	 * @param query
-	 *        Query to execute.
-	 */
-	public JDBCInputFormat(String driverName, String dbURL, String username, String password, String query) {
-		this.driverName = driverName;
-		this.query = query;
-		this.dbURL = dbURL;
-		this.username = username;
-		this.password = password;
-	}
-
-	/**
-	 * Creates a JDBCInputFormat and configures it.
-	 * 
-	 * @param driverName
-	 *        JDBC-Drivername
-	 * @param dbURL
-	 *        Formatted URL containing all connection parameters.
-	 * @param query
-	 *        Query to execute.
-	 */
-	public JDBCInputFormat(String driverName, String dbURL, String query) {
-		this(driverName, dbURL, "", "", query);
-	}
-
-	/**
-	 * Creates a JDBCInputFormat and configures it.
-	 * 
-	 * @param parameters
-	 *        Configuration with all connection parameters.
-	 * @param query
-	 *        Query to execute.
-	 */
-	public JDBCInputFormat(Configuration parameters, String query) {
-		this.driverName = parameters.getString(DRIVER_KEY, "");
-		this.username = parameters.getString(USERNAME_KEY, "");
-		this.password = parameters.getString(PASSWORD_KEY, "");
-		this.dbURL = parameters.getString(URL_KEY, "");
-		this.query = query;
-	}
-
-	
-	/**
-	 * Configures this JDBCInputFormat. This includes setting the connection
-	 * parameters (if necessary), establishing the connection and executing the
-	 * query.
-	 * 
-	 * @param parameters
-	 *        Configuration containing all or no parameters.
-	 */
-	@Override
-	public void configure(Configuration parameters) {
-		boolean needConfigure = isFieldNullOrEmpty(this.query) || isFieldNullOrEmpty(this.dbURL);
-		if (needConfigure) {
-			this.driverName = parameters.getString(DRIVER_KEY, null);
-			this.username = parameters.getString(USERNAME_KEY, null);
-			this.password = parameters.getString(PASSWORD_KEY, null);
-			this.query = parameters.getString(QUERY_KEY, null);
-			this.dbURL = parameters.getString(URL_KEY, null);
-		}
-
-		try {
-			prepareQueryExecution();
-		} catch (SQLException e) {
-			throw new IllegalArgumentException("Configure failed:\t!", e);
-		}
-	}
-
-	/**
-	 * Enters data value from the current resultSet into a Record.
-	 * 
-	 * @param pos
-	 *        Record position to be set.
-	 * @param type
-	 *        SQL type of the resultSet value.
-	 * @param record
-	 *        Target Record.
-	 */
-	private void retrieveTypeAndFillRecord(int pos, int type, Record record) throws SQLException,
-			NotTransformableSQLFieldException {
-		switch (type) {
-		case java.sql.Types.NULL:
-			record.setField(pos, NullValue.getInstance());
-			break;
-		case java.sql.Types.BOOLEAN:
-			record.setField(pos, new BooleanValue(resultSet.getBoolean(pos + 1)));
-			break;
-		case java.sql.Types.BIT:
-			record.setField(pos, new BooleanValue(resultSet.getBoolean(pos + 1)));
-			break;
-		case java.sql.Types.CHAR:
-			record.setField(pos, new StringValue(resultSet.getString(pos + 1)));
-			break;
-		case java.sql.Types.NCHAR:
-			record.setField(pos, new StringValue(resultSet.getString(pos + 1)));
-			break;
-		case java.sql.Types.VARCHAR:
-			record.setField(pos, new StringValue(resultSet.getString(pos + 1)));
-			break;
-		case java.sql.Types.LONGVARCHAR:
-			record.setField(pos, new StringValue(resultSet.getString(pos + 1)));
-			break;
-		case java.sql.Types.LONGNVARCHAR:
-			record.setField(pos, new StringValue(resultSet.getString(pos + 1)));
-			break;
-		case java.sql.Types.TINYINT:
-			record.setField(pos, new ShortValue(resultSet.getShort(pos + 1)));
-			break;
-		case java.sql.Types.SMALLINT:
-			record.setField(pos, new ShortValue(resultSet.getShort(pos + 1)));
-			break;
-		case java.sql.Types.BIGINT:
-			record.setField(pos, new LongValue(resultSet.getLong(pos + 1)));
-			break;
-		case java.sql.Types.INTEGER:
-			record.setField(pos, new IntValue(resultSet.getInt(pos + 1)));
-			break;
-		case java.sql.Types.FLOAT:
-			record.setField(pos, new DoubleValue(resultSet.getDouble(pos + 1)));
-			break;
-		case java.sql.Types.REAL:
-			record.setField(pos, new FloatValue(resultSet.getFloat(pos + 1)));
-			break;
-		case java.sql.Types.DOUBLE:
-			record.setField(pos, new DoubleValue(resultSet.getDouble(pos + 1)));
-			break;
-		case java.sql.Types.DECIMAL:
-			record.setField(pos, new DoubleValue(resultSet.getBigDecimal(pos + 1).doubleValue()));
-			break;
-		case java.sql.Types.NUMERIC:
-			record.setField(pos, new DoubleValue(resultSet.getBigDecimal(pos + 1).doubleValue()));
-			break;
-		case java.sql.Types.DATE:
-			record.setField(pos, new StringValue(resultSet.getDate(pos + 1).toString()));
-			break;
-		case java.sql.Types.TIME:
-			record.setField(pos, new LongValue(resultSet.getTime(pos + 1).getTime()));
-			break;
-		case java.sql.Types.TIMESTAMP:
-			record.setField(pos, new StringValue(resultSet.getTimestamp(pos + 1).toString()));
-			break;
-		case java.sql.Types.SQLXML:
-			record.setField(pos, new StringValue(resultSet.getSQLXML(pos + 1).toString()));
-			break;
-		default:
-			throw new NotTransformableSQLFieldException("Unknown sql-type [" + type + "]on column [" + pos + "]");
-
-			// case java.sql.Types.BINARY:
-			// case java.sql.Types.VARBINARY:
-			// case java.sql.Types.LONGVARBINARY:
-			// case java.sql.Types.ARRAY:
-			// case java.sql.Types.JAVA_OBJECT:
-			// case java.sql.Types.BLOB:
-			// case java.sql.Types.CLOB:
-			// case java.sql.Types.NCLOB:
-			// case java.sql.Types.DATALINK:
-			// case java.sql.Types.DISTINCT:
-			// case java.sql.Types.OTHER:
-			// case java.sql.Types.REF:
-			// case java.sql.Types.ROWID:
-			// case java.sql.Types.STRUCT:
-		}
-	}
-
-	private boolean isFieldNullOrEmpty(String field) {
-		return (field == null || field.length() == 0);
-	}
-
-	private void prepareQueryExecution() throws SQLException {
-		setClassForDBType();
-		prepareCredentialsAndExecute();
-	}
-
-	/**
-	 * Loads appropriate JDBC driver.
-	 * 
-	 * @param dbType
-	 *        Type of the database.
-	 * @return boolean value, indication whether an appropriate driver could be
-	 *         found.
-	 */
-	private void setClassForDBType() {
-		try {
-			Class.forName(driverName);
-		} catch (ClassNotFoundException cnfe) {
-			throw new IllegalArgumentException("JDBC-Class not found:\t" + cnfe.getLocalizedMessage());
-		}
-	}
-
-	private void prepareCredentialsAndExecute() throws SQLException {
-		if (isFieldNullOrEmpty(username)) {
-			prepareConnection(dbURL);
-		} else {
-			prepareConnection();
-		}
-		executeQuery();
-	}
-
-	/**
-	 * Establishes a connection to a database.
-	 * 
-	 * @param dbURL
-	 *        Assembled URL containing all connection parameters.
-	 * @return boolean value, indicating whether a connection could be
-	 *         established
-	 */
-	private void prepareConnection(String dbURL) throws SQLException {
-		dbConn = DriverManager.getConnection(dbURL);
-	}
-
-	/**
-	 * Assembles the Database URL and establishes a connection.
-	 * 
-	 * @param dbType
-	 *        Type of the database.
-	 * @param username
-	 *        Login username.
-	 * @param password
-	 *        Login password.
-	 * @return boolean value, indicating whether a connection could be
-	 *         established
-	 */
-	private void prepareConnection() throws SQLException {
-		dbConn = DriverManager.getConnection(dbURL, username, password);
-	}
-
-	private void executeQuery() throws SQLException {
-		statement = dbConn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
-		resultSet = statement.executeQuery(this.query);
-	}
-
-	/**
-	 * Checks whether all data has been read.
-	 * 
-	 * @return boolean value indication whether all data has been read.
-	 */
-	@Override
-	public boolean reachedEnd() {
-		try {
-			if (resultSet.isLast()) {
-				resultSet.close();
-				statement.close();
-				dbConn.close();
-				return true;
-			} else {
-				return false;
-			}
-		} catch (SQLException e) {
-			throw new IllegalArgumentException("Couldn't evaluate reachedEnd():\t" + e.getMessage());
-		} catch (NullPointerException e) {
-			throw new IllegalArgumentException("Couldn't access resultSet:\t" + e.getMessage());
-		}
-	}
-
-	/**
-	 * Stores the next resultSet row in a Record
-	 * 
-	 * @param record
-	 *        target Record
-	 * @return boolean value indicating that the operation was successful
-	 */
-	@Override
-	public Record nextRecord(Record record) {
-		try {
-			resultSet.next();
-			ResultSetMetaData rsmd = resultSet.getMetaData();
-			int column_count = rsmd.getColumnCount();
-			record.setNumFields(column_count);
-
-			for (int pos = 0; pos < column_count; pos++) {
-				int type = rsmd.getColumnType(pos + 1);
-				retrieveTypeAndFillRecord(pos, type, record);
-			}
-			return record;
-		} catch (SQLException e) {
-			throw new IllegalArgumentException("Couldn't read data:\t" + e.getMessage());
-		} catch (NotTransformableSQLFieldException e) {
-			throw new IllegalArgumentException("Couldn't read data because of unknown column sql-type:\t"
-				+ e.getMessage());
-		} catch (NullPointerException e) {
-			throw new IllegalArgumentException("Couldn't access resultSet:\t" + e.getMessage());
-		}
-	}
-	
-	public static class NotTransformableSQLFieldException extends Exception {
-
-		private static final long serialVersionUID = 1L;
-
-		public NotTransformableSQLFieldException(String message) {
-			super(message);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormat.java b/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormat.java
deleted file mode 100644
index a99b38e..0000000
--- a/flink-addons/jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormat.java
+++ /dev/null
@@ -1,353 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io.jdbc;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-
-import org.apache.flink.api.common.io.FileOutputFormat;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.record.operators.GenericDataSink;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.BooleanValue;
-import org.apache.flink.types.ByteValue;
-import org.apache.flink.types.CharValue;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.FloatValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.ShortValue;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.types.Value;
-
-public class JDBCOutputFormat implements OutputFormat<Record> {
-	private static final long serialVersionUID = 1L;
-
-	private static final int DEFAULT_BATCH_INTERVERAL = 5000;
-	
-	public static final String DRIVER_KEY = "driver";
-	public static final String USERNAME_KEY = "username";
-	public static final String PASSWORD_KEY = "password";
-	public static final String URL_KEY = "url";
-	public static final String QUERY_KEY = "query";
-	public static final String FIELD_COUNT_KEY = "fields";
-	public static final String FIELD_TYPE_KEY = "type";
-	public static final String BATCH_INTERVAL = "batchInt";
-
-	private Connection dbConn;
-	private PreparedStatement upload;
-
-	private String username;
-	private String password;
-	private String driverName;
-	private String dbURL;
-
-	private String query;
-	private int fieldCount;
-	private Class<? extends Value>[] fieldClasses;
-	
-	/**
-	 * Variable indicating the current number of insert sets in a batch.
-	 */
-	private int batchCount = 0;
-	
-	/**
-	 * Commit interval of batches.
-	 * High batch interval: faster inserts, more memory required (reduce if OutOfMemoryExceptions occur)
-	 * low batch interval: slower inserts, less memory.
-	 */
-	private int batchInterval = DEFAULT_BATCH_INTERVERAL;
-	
-
-	/**
-	 * Configures this JDBCOutputFormat.
-	 * 
-	 * @param parameters
-	 *        Configuration containing all parameters.
-	 */
-	@Override
-	public void configure(Configuration parameters) {
-		this.driverName = parameters.getString(DRIVER_KEY, null);
-		this.username = parameters.getString(USERNAME_KEY, null);
-		this.password = parameters.getString(PASSWORD_KEY, null);
-		this.dbURL = parameters.getString(URL_KEY, null);
-		this.query = parameters.getString(QUERY_KEY, null);
-		this.fieldCount = parameters.getInteger(FIELD_COUNT_KEY, 0);
-		this.batchInterval = parameters.getInteger(BATCH_INTERVAL, DEFAULT_BATCH_INTERVERAL);
-
-		@SuppressWarnings("unchecked")
-		Class<Value>[] classes = new Class[this.fieldCount];
-		this.fieldClasses = classes;
-
-		for (int i = 0; i < this.fieldCount; i++) {
-			@SuppressWarnings("unchecked")
-			Class<? extends Value> clazz = (Class<? extends Value>) parameters.getClass(FIELD_TYPE_KEY + i, null);
-			if (clazz == null) {
-				throw new IllegalArgumentException("Invalid configuration for JDBCOutputFormat: "
-						+ "No type class for parameter " + i);
-			}
-			this.fieldClasses[i] = clazz;
-		}
-	}
-
-	/**
-	 * Connects to the target database and initializes the prepared statement.
-	 *
-	 * @param taskNumber The number of the parallel instance.
-	 * @throws IOException Thrown, if the output could not be opened due to an
-	 * I/O problem.
-	 */
-	@Override
-	public void open(int taskNumber, int numTasks) throws IOException {
-		try {
-			establishConnection();
-			upload = dbConn.prepareStatement(query);
-		} catch (SQLException sqe) {
-			throw new IllegalArgumentException("open() failed:\t!", sqe);
-		} catch (ClassNotFoundException cnfe) {
-			throw new IllegalArgumentException("JDBC-Class not found:\t", cnfe);
-		}
-	}
-
-	private void establishConnection() throws SQLException, ClassNotFoundException {
-		Class.forName(driverName);
-		if (username == null) {
-			dbConn = DriverManager.getConnection(dbURL);
-		} else {
-			dbConn = DriverManager.getConnection(dbURL, username, password);
-		}
-	}
-
-	/**
-	 * Adds a record to the prepared statement.
-	 * <p>
-	 * When this method is called, the output format is guaranteed to be opened.
-	 *
-	 * @param record The records to add to the output.
-	 * @throws IOException Thrown, if the records could not be added due to an
-	 * I/O problem.
-	 */
-	
-	@Override
-	public void writeRecord(Record record) throws IOException {
-		try {
-			for (int x = 0; x < record.getNumFields(); x++) {
-				Value temp = record.getField(x, fieldClasses[x]);
-				addValue(x + 1, temp);
-			}
-			upload.addBatch();
-			batchCount++;
-			if(batchCount >= batchInterval) {
-				upload.executeBatch();
-				batchCount = 0;
-			}
-		} catch (SQLException sqe) {
-			throw new IllegalArgumentException("writeRecord() failed:\t", sqe);
-		} catch (IllegalArgumentException iae) {
-			throw new IllegalArgumentException("writeRecord() failed:\t", iae);
-		}
-	}
-
-	private enum pactType {
-		BooleanValue,
-		ByteValue,
-		CharValue,
-		DoubleValue,
-		FloatValue,
-		IntValue,
-		LongValue,
-		ShortValue,
-		StringValue
-	}
-
-	private void addValue(int index, Value value) throws SQLException {
-		pactType type;
-		try {
-			type = pactType.valueOf(value.getClass().getSimpleName());
-		} catch (IllegalArgumentException iae) {
-			throw new IllegalArgumentException("PactType not supported:\t", iae);
-		}
-		switch (type) {
-			case BooleanValue:
-				upload.setBoolean(index, ((BooleanValue) value).getValue());
-				break;
-			case ByteValue:
-				upload.setByte(index, ((ByteValue) value).getValue());
-				break;
-			case CharValue:
-				upload.setString(index, String.valueOf(((CharValue) value).getValue()));
-				break;
-			case DoubleValue:
-				upload.setDouble(index, ((DoubleValue) value).getValue());
-				break;
-			case FloatValue:
-				upload.setFloat(index, ((FloatValue) value).getValue());
-				break;
-			case IntValue:
-				upload.setInt(index, ((IntValue) value).getValue());
-				break;
-			case LongValue:
-				upload.setLong(index, ((LongValue) value).getValue());
-				break;
-			case ShortValue:
-				upload.setShort(index, ((ShortValue) value).getValue());
-				break;
-			case StringValue:
-				upload.setString(index, ((StringValue) value).getValue());
-				break;
-		}
-	}
-
-	/**
-	 * Executes prepared statement and closes all resources of this instance.
-	 *
-	 * @throws IOException Thrown, if the input could not be closed properly.
-	 */
-	@Override
-	public void close() throws IOException {
-		try {
-			upload.executeBatch();
-			batchCount = 0;
-			upload.close();
-			dbConn.close();
-		} catch (SQLException sqe) {
-			throw new IllegalArgumentException("close() failed:\t", sqe);
-		}
-	}
-
-	/**
-	 * Creates a configuration builder that can be used to set the 
-	 * output format's parameters to the config in a fluent fashion.
-	 * 
-	 * @return A config builder for setting parameters.
-	 */
-	public static ConfigBuilder configureOutputFormat(GenericDataSink target) {
-		return new ConfigBuilder(target.getParameters());
-	}
-
-	/**
-	 * Abstract builder used to set parameters to the output format's 
-	 * configuration in a fluent way.
-	 */
-	protected static abstract class AbstractConfigBuilder<T>
-			extends FileOutputFormat.AbstractConfigBuilder<T> {
-
-		/**
-		 * Creates a new builder for the given configuration.
-		 * 
-		 * @param config The configuration into which the parameters will be written.
-		 */
-		protected AbstractConfigBuilder(Configuration config) {
-			super(config);
-		}
-
-		/**
-		 * Sets the query field.
-		 * @param value value to be set.
-		 * @return The builder itself.
-		 */
-		public T setQuery(String value) {
-			this.config.setString(QUERY_KEY, value);
-			@SuppressWarnings("unchecked")
-			T ret = (T) this;
-			return ret;
-		}
-
-		/**
-		 * Sets the url field.
-		 * @param value value to be set.
-		 * @return The builder itself.
-		 */
-		public T setUrl(String value) {
-			this.config.setString(URL_KEY, value);
-			@SuppressWarnings("unchecked")
-			T ret = (T) this;
-			return ret;
-		}
-
-		/**
-		 * Sets the username field.
-		 * @param value value to be set.
-		 * @return The builder itself.
-		 */
-		public T setUsername(String value) {
-			this.config.setString(USERNAME_KEY, value);
-			@SuppressWarnings("unchecked")
-			T ret = (T) this;
-			return ret;
-		}
-
-		/**
-		 * Sets the password field.
-		 * @param value value to be set.
-		 * @return The builder itself.
-		 */
-		public T setPassword(String value) {
-			this.config.setString(PASSWORD_KEY, value);
-			@SuppressWarnings("unchecked")
-			T ret = (T) this;
-			return ret;
-		}
-
-		/**
-		 * Sets the driver field.
-		 * @param value value to be set.
-		 * @return The builder itself.
-		 */
-		public T setDriver(String value) {
-			this.config.setString(DRIVER_KEY, value);
-			@SuppressWarnings("unchecked")
-			T ret = (T) this;
-			return ret;
-		}
-
-		/**
-		 * Sets the type of a column.
-		 * Types are applied in the order they were set.
-		 * @param type PactType to apply.
-		 * @return The builder itself.
-		 */
-		public T setClass(Class<? extends Value> type) {
-			final int numYet = this.config.getInteger(FIELD_COUNT_KEY, 0);
-			this.config.setClass(FIELD_TYPE_KEY + numYet, type);
-			this.config.setInteger(FIELD_COUNT_KEY, numYet + 1);
-			@SuppressWarnings("unchecked")
-			T ret = (T) this;
-			return ret;
-		}
-	}
-
-	/**
-	 * A builder used to set parameters to the output format's configuration in a fluent way.
-	 */
-	public static final class ConfigBuilder extends AbstractConfigBuilder<ConfigBuilder> {
-		/**
-		 * Creates a new builder for the given configuration.
-		 * 
-		 * @param targetConfig The configuration into which the parameters will be written.
-		 */
-		protected ConfigBuilder(Configuration targetConfig) {
-			super(targetConfig);
-		}
-	}
-}