You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by haohui <gi...@git.apache.org> on 2017/04/12 06:58:20 UTC

[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

GitHub user haohui opened a pull request:

    https://github.com/apache/flink/pull/3712

    [FLINK-6281] Create TableSink for JDBC.

    This PR implements the `StreamTableSink` interface for the JDBC connectors so that the streaming SQL APIs can directly interact with them.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/haohui/flink FLINK-6281

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3712.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3712
    
----
commit 2bfd014bedc5bd64f346652dfb5ddb41cc36cc3f
Author: Haohui Mai <wh...@apache.org>
Date:   2017-04-12T06:56:56Z

    [FLINK-6281] Create TableSink for JDBC.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3712#discussion_r120576118
  
    --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java ---
    @@ -202,14 +202,20 @@ public void writeRecord(Row row) throws IOException {
     			upload.addBatch();
     			batchCount++;
     			if (batchCount >= batchInterval) {
    -				upload.executeBatch();
    -				batchCount = 0;
    +				flush();
     			}
     		} catch (SQLException | IllegalArgumentException e) {
     			throw new IllegalArgumentException("writeRecord() failed", e);
     		}
     	}
     
    +	void flush() throws SQLException {
    +		if (upload != null) {
    +			upload.executeBatch();
    --- End diff --
    
    It's been a while since i worked with JDBC, I take it this is a synchronous call? What happens if this call fails?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3712#discussion_r113412806
  
    --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java ---
    @@ -0,0 +1,91 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.java.io.jdbc;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.java.typeutils.RowTypeInfo;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
    +import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink;
    +import org.apache.flink.table.sinks.StreamTableSink;
    +import org.apache.flink.table.sinks.TableSink;
    +import org.apache.flink.types.Row;
    +
    +import java.io.IOException;
    +import java.util.UUID;
    +
    +public class JDBCTableSink extends GenericWriteAheadSink<Row> implements StreamTableSink<Row> {
    +	private final JDBCOutputFormat outputFormat;
    +	private final CheckpointCommitter committer;
    +	private final String[] fieldNames;
    +	private final TypeInformation[] fieldTypes;
    +
    +	public JDBCTableSink(CheckpointCommitter committer, TypeSerializer<Row> serializer,
    --- End diff --
    
    I would propose either adding a JDBCCheckpointCommitter that cooperates with the sink (as seen in this [prototype](https://github.com/zentol/flink/commit/92e878b59a7371ac9cad402d0b009c7439cd1900) or omitting the `CheckpointCommitter` argument and providing a dummy to the `GenericWriteAheadSink`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3712#discussion_r132160714
  
    --- Diff: docs/dev/table/sourceSinks.md ---
    @@ -202,7 +202,38 @@ val csvTableSource = CsvTableSource
     Provided TableSinks
     -------------------
     
    -**TODO**
    +### JDBCAppendSink
    +
    +<code>JDBCAppendSink</code> allows you to bridge the data stream to the JDBC driver. The sink only supports append-only data. It does not support retractions and upserts from Flink's perspectives. However, you can customize the query using <code>REPLACE</code> or <code>INSERT OVERWRITE</code> to implement upsert inside the database.
    +
    +To use the JDBC sink, you have to add the JDBC connector dependency (<code>flink-jdbc</code>) to your project. Then you can create the sink using <code>JDBCAppendSinkBuilder</code>:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +
    +JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
    +  .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
    +  .setDBUrl("jdbc:derby:memory:ebookshop")
    +  .setQuery("INSERT INTO books (id) VALUES (?)")
    +  .setFieldTypes(new TypeInformation<?>[] {INT_TYPE_INFO})
    --- End diff --
    
    use varargs?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3712#discussion_r132141831
  
    --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java ---
    @@ -0,0 +1,97 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.io.jdbc;
    +
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.typeutils.RowTypeInfo;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.table.sinks.AppendStreamTableSink;
    +import org.apache.flink.table.sinks.BatchTableSink;
    +import org.apache.flink.table.sinks.TableSink;
    +import org.apache.flink.types.Row;
    +import org.apache.flink.util.Preconditions;
    +
    +/**
    + * An at-least-once Table sink for JDBC.
    + *
    + * <p>The mechanisms of Flink guarantees delivering messages at-least-once to this sink.
    + * However, one common use case is to run idempotent queries (e.g., <code>REPLACE</code> or
    + * <code>INSERT OVERWRITE</code>) to upsert into the database and achieve exactly-once semantic.</p>
    + */
    +public class JDBCAppendTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> {
    +	private final JDBCSinkFunction sink;
    +
    +	private String[] fieldNames;
    +	private TypeInformation[] fieldTypes;
    +
    +	JDBCAppendTableSink(JDBCOutputFormat outputFormat) {
    +		this.sink = new JDBCSinkFunction(outputFormat);
    +	}
    +
    +	public static JDBCAppendTableSinkBuilder builder() {
    +		return new JDBCAppendTableSinkBuilder();
    +	}
    +
    +	@Override
    +	public void emitDataStream(DataStream<Row> dataStream) {
    +		dataStream.addSink(sink);
    +	}
    +
    +	@Override
    +	public void emitDataSet(DataSet<Row> dataSet) {
    +		dataSet.output(sink.outputFormat);
    +	}
    +
    +	@Override
    +	public TypeInformation<Row> getOutputType() {
    +		return new RowTypeInfo(fieldTypes, fieldNames);
    +	}
    +
    +	@Override
    +	public String[] getFieldNames() {
    +		return fieldNames;
    +	}
    +
    +	@Override
    +	public TypeInformation<?>[] getFieldTypes() {
    +		return fieldTypes;
    +	}
    +
    +	@Override
    +	public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
    +		int[] types = sink.outputFormat.getTypesArray();
    +		Preconditions.checkArgument(fieldTypes.length == types.length);
    +		for (int i = 0; i < types.length; ++i) {
    +			Preconditions.checkArgument(JDBCTypeUtil.typeInformationToSqlType(fieldTypes[i]) == types[i],
    --- End diff --
    
    add more details to error message like: `"Schema of output table incompatible with JDBCAppendTableSink: expected [type1, type2, type3, ...], actual [type1, type2]"`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3712#discussion_r132162563
  
    --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCSinkFunction.java ---
    @@ -0,0 +1,62 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.io.jdbc;
    +
    +import org.apache.flink.api.common.functions.RuntimeContext;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.state.FunctionInitializationContext;
    +import org.apache.flink.runtime.state.FunctionSnapshotContext;
    +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.types.Row;
    +
    +class JDBCSinkFunction extends RichSinkFunction<Row> implements CheckpointedFunction {
    +	final JDBCOutputFormat outputFormat;
    +
    +	JDBCSinkFunction(JDBCOutputFormat outputFormat) {
    +		this.outputFormat = outputFormat;
    +	}
    +
    +	@Override
    +	public void invoke(Row value) throws Exception {
    +		outputFormat.writeRecord(value);
    +	}
    +
    +	@Override
    +	public void snapshotState(FunctionSnapshotContext context) throws Exception {
    +		outputFormat.flush();
    +	}
    +
    +	@Override
    +	public void initializeState(FunctionInitializationContext context) throws Exception {
    +	}
    +
    +	@Override
    +	public void open(Configuration parameters) throws Exception {
    +		super.open(parameters);
    +		RuntimeContext ctx = getRuntimeContext();
    +		outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
    --- End diff --
    
    add a call `outputFormat.setRuntimeContext(ctx);` before calling `open()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by haohui <gi...@git.apache.org>.
Github user haohui commented on the issue:

    https://github.com/apache/flink/pull/3712
  
    @fhueske @zentol can you please take another look? Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by haohui <gi...@git.apache.org>.
Github user haohui commented on the issue:

    https://github.com/apache/flink/pull/3712
  
    Thanks for the review. I'm not aware of the fact that OutputFormats are not integrated with Flink's checkpointing mechanism.
    
    To address this problem, maybe we can do something similar to the `FlinkKafkaProducerBase`? What do you think @zentol and @aljoscha ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/3712
  
    There is `GenericWriteAheadSink`, which buffers elements and writes to Cassandra. I think this needs some more thought, maybe a design outline on the Jira issue. I'm also not 100 % sure if the generic write-ahead sink will work for this. @zentol might have a better answer, though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3712
  
    I think an at-least-once sink with support for upserts would also be very useful.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3712#discussion_r132168767
  
    --- Diff: docs/dev/table/sourceSinks.md ---
    @@ -202,7 +202,38 @@ val csvTableSource = CsvTableSource
     Provided TableSinks
     -------------------
     
    -**TODO**
    +### JDBCAppendSink
    +
    +<code>JDBCAppendSink</code> allows you to bridge the data stream to the JDBC driver. The sink only supports append-only data. It does not support retractions and upserts from Flink's perspectives. However, you can customize the query using <code>REPLACE</code> or <code>INSERT OVERWRITE</code> to implement upsert inside the database.
    +
    +To use the JDBC sink, you have to add the JDBC connector dependency (<code>flink-jdbc</code>) to your project. Then you can create the sink using <code>JDBCAppendSinkBuilder</code>:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +
    +JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
    +  .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
    +  .setDBUrl("jdbc:derby:memory:ebookshop")
    +  .setQuery("INSERT INTO books (id) VALUES (?)")
    +  .setFieldTypes(new TypeInformation<?>[] {INT_TYPE_INFO})
    --- End diff --
    
    change to `setParameterTypes()` if we rename the method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3712#discussion_r126736740
  
    --- Diff: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSinkTest.java ---
    @@ -0,0 +1,128 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.io.jdbc;
    +
    +import org.apache.flink.api.common.functions.RuntimeContext;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.typeutils.RowTypeInfo;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.types.Row;
    +
    +import org.junit.After;
    +import org.junit.Test;
    +
    +import java.io.IOException;
    +import java.sql.Connection;
    +import java.sql.DriverManager;
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +
    +import static org.junit.Assert.assertArrayEquals;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertNotSame;
    +import static org.mockito.Mockito.doReturn;
    +import static org.mockito.Mockito.mock;
    +
    +/**
    + * Test for JDBCTableSink.
    + */
    +public class JDBCTableSinkTest extends JDBCTestBase {
    +	private static final String[] FIELD_NAMES = new String[]{"foo"};
    +	private static final TypeInformation[] FIELD_TYPES = new TypeInformation[]{
    +		BasicTypeInfo.STRING_TYPE_INFO
    +	};
    +
    +
    +	private JDBCOutputFormat jdbcOutputFormat;
    +
    +	@After
    +	public void tearDown() throws IOException {
    +		if (jdbcOutputFormat != null) {
    +			jdbcOutputFormat.close();
    +		}
    +		jdbcOutputFormat = null;
    +	}
    +
    +	@Test
    +	public void testFlush() throws Exception {
    --- End diff --
    
    This tests rather the `JdbcOutputFormat.flush()` method. I would move (and adapt) this test to the `JdbcOutputFormatTest`.
    
    For the `JdbcTableSink` we need tests that check the configuration (which is done below) and a check that the `emitDataStream()` method (and the returned `SinkFunction`) is working correctly. I would do this by calling `emitDataStream()` with a mocked `DataStream<Row>` and fetching the `SinkFunction` from the returned `DataStreamSink` (`sink.getTransformation().getOperator().getUserFunction()`).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3712#discussion_r128221151
  
    --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java ---
    @@ -0,0 +1,97 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.io.jdbc;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.util.Preconditions;
    +
    +import static org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.DEFAULT_BATCH_INTERVAL;
    +
    +/**
    + * A builder to configure and build the JDBCAppendTableSink.
    + */
    +public class JDBCAppendTableSinkBuilder {
    +	private String username;
    +	private String password;
    +	private String driverName;
    +	private String dbURL;
    +	private String query;
    +	private int batchInterval = DEFAULT_BATCH_INTERVAL;
    +	private TypeInformation<?>[] fieldTypes;
    +
    +	public JDBCAppendTableSinkBuilder setUsername(String username) {
    +		this.username = username;
    +		return this;
    +	}
    +
    +	public JDBCAppendTableSinkBuilder setPassword(String password) {
    +		this.password = password;
    +		return this;
    +	}
    +
    +	public JDBCAppendTableSinkBuilder setDrivername(String drivername) {
    +		this.driverName = drivername;
    +		return this;
    +	}
    +
    +	public JDBCAppendTableSinkBuilder setDBUrl(String dbURL) {
    +		this.dbURL = dbURL;
    +		return this;
    +	}
    +
    +	public JDBCAppendTableSinkBuilder setQuery(String query) {
    +		this.query = query;
    +		return this;
    +	}
    +
    +	public JDBCAppendTableSinkBuilder setBatchInterval(int batchInterval) {
    +		this.batchInterval = batchInterval;
    +		return this;
    +	}
    +
    +	public JDBCAppendTableSinkBuilder setFieldTypes(TypeInformation<?>[] fieldTypes) {
    --- End diff --
    
    Make `fieldTypes` a vararg for convenience? 
    
    I would think that `java.sql.Types` would be more natural in the context of a JDBC sink but I'm open for `TypeInformation` as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3712
  
    Thanks for the update @haohui.
    
    I'll merge this PR :-)
    I have to change some of the type mappings to make it work with `JDBCOutputFormat`. I think it would be a good idea to redesign the type handling in `JDBCOutputFormat`. I'll open a JIRA for that.
    
    Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/3712


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/3712
  
    Great, just wanted to make sure we're on the same page.
    
    Why did you revert the usage of the GenericWriteAheadSink? Now we're back to where we started, not having any guarantee that data is written when a checkpoint is being completed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3712
  
    Thanks @zentol! 
    I'll have a look at it as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3712#discussion_r132141735
  
    --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java ---
    @@ -0,0 +1,97 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.io.jdbc;
    +
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.typeutils.RowTypeInfo;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.table.sinks.AppendStreamTableSink;
    +import org.apache.flink.table.sinks.BatchTableSink;
    +import org.apache.flink.table.sinks.TableSink;
    +import org.apache.flink.types.Row;
    +import org.apache.flink.util.Preconditions;
    +
    +/**
    + * An at-least-once Table sink for JDBC.
    + *
    + * <p>The mechanisms of Flink guarantees delivering messages at-least-once to this sink.
    + * However, one common use case is to run idempotent queries (e.g., <code>REPLACE</code> or
    + * <code>INSERT OVERWRITE</code>) to upsert into the database and achieve exactly-once semantic.</p>
    + */
    +public class JDBCAppendTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> {
    +	private final JDBCSinkFunction sink;
    +
    +	private String[] fieldNames;
    +	private TypeInformation[] fieldTypes;
    +
    +	JDBCAppendTableSink(JDBCOutputFormat outputFormat) {
    +		this.sink = new JDBCSinkFunction(outputFormat);
    +	}
    +
    +	public static JDBCAppendTableSinkBuilder builder() {
    +		return new JDBCAppendTableSinkBuilder();
    +	}
    +
    +	@Override
    +	public void emitDataStream(DataStream<Row> dataStream) {
    +		dataStream.addSink(sink);
    +	}
    +
    +	@Override
    +	public void emitDataSet(DataSet<Row> dataSet) {
    +		dataSet.output(sink.outputFormat);
    +	}
    +
    +	@Override
    +	public TypeInformation<Row> getOutputType() {
    +		return new RowTypeInfo(fieldTypes, fieldNames);
    +	}
    +
    +	@Override
    +	public String[] getFieldNames() {
    +		return fieldNames;
    +	}
    +
    +	@Override
    +	public TypeInformation<?>[] getFieldTypes() {
    +		return fieldTypes;
    +	}
    +
    +	@Override
    +	public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
    +		int[] types = sink.outputFormat.getTypesArray();
    +		Preconditions.checkArgument(fieldTypes.length == types.length);
    --- End diff --
    
    Give a detailed error message like: `"Schema of output table incompatible with JDBCAppendTableSink: expected [type1, type2, type3, ...], actual [type1, type2]"`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3712#discussion_r132146531
  
    --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java ---
    @@ -0,0 +1,97 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.io.jdbc;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.util.Preconditions;
    +
    +import static org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.DEFAULT_BATCH_INTERVAL;
    +
    +/**
    + * A builder to configure and build the JDBCAppendTableSink.
    + */
    +public class JDBCAppendTableSinkBuilder {
    +	private String username;
    +	private String password;
    +	private String driverName;
    +	private String dbURL;
    +	private String query;
    +	private int batchSize = DEFAULT_BATCH_INTERVAL;
    +	private TypeInformation<?>[] fieldTypes;
    +
    +	public JDBCAppendTableSinkBuilder setUsername(String username) {
    +		this.username = username;
    +		return this;
    +	}
    +
    +	public JDBCAppendTableSinkBuilder setPassword(String password) {
    +		this.password = password;
    +		return this;
    +	}
    +
    +	public JDBCAppendTableSinkBuilder setDrivername(String drivername) {
    +		this.driverName = drivername;
    +		return this;
    +	}
    +
    +	public JDBCAppendTableSinkBuilder setDBUrl(String dbURL) {
    +		this.dbURL = dbURL;
    +		return this;
    +	}
    +
    +	public JDBCAppendTableSinkBuilder setQuery(String query) {
    +		this.query = query;
    +		return this;
    +	}
    +
    +	public JDBCAppendTableSinkBuilder setBatchSize(int batchSize) {
    +		this.batchSize = batchSize;
    +		return this;
    +	}
    +
    +	public JDBCAppendTableSinkBuilder setFieldTypes(TypeInformation<?>... fieldTypes) {
    +		this.fieldTypes = fieldTypes;
    +		return this;
    +	}
    +
    +	/**
    +	 * Finalizes the configuration and checks validity.
    +	 *
    +	 * @return Configured JDBCOutputFormat
    +	 */
    +	public JDBCAppendTableSink build() {
    +		Preconditions.checkNotNull(fieldTypes, "Row type is unspecified");
    --- End diff --
    
    change error message to `"Types of the query parameters are not specified. Please specify types using the setFieldTypes() method."` (or `setParameterTypes()` if we rename the method).`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/3712
  
    +1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3712#discussion_r113410894
  
    --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java ---
    @@ -0,0 +1,91 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.java.io.jdbc;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.java.typeutils.RowTypeInfo;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
    +import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink;
    +import org.apache.flink.table.sinks.StreamTableSink;
    +import org.apache.flink.table.sinks.TableSink;
    +import org.apache.flink.types.Row;
    +
    +import java.io.IOException;
    +import java.util.UUID;
    +
    +public class JDBCTableSink extends GenericWriteAheadSink<Row> implements StreamTableSink<Row> {
    +	private final JDBCOutputFormat outputFormat;
    +	private final CheckpointCommitter committer;
    +	private final String[] fieldNames;
    +	private final TypeInformation[] fieldTypes;
    +
    +	public JDBCTableSink(CheckpointCommitter committer, TypeSerializer<Row> serializer,
    +				JDBCOutputFormat outputFormat, String[] fieldNames,
    +				TypeInformation[] fieldTypes) throws Exception {
    --- End diff --
    
    like the cassandra sink the `fieldNames/Types` should be removed to provide a clean API to the user.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3712#discussion_r132160738
  
    --- Diff: docs/dev/table/sourceSinks.md ---
    @@ -202,7 +202,38 @@ val csvTableSource = CsvTableSource
     Provided TableSinks
     -------------------
     
    -**TODO**
    +### JDBCAppendSink
    +
    +<code>JDBCAppendSink</code> allows you to bridge the data stream to the JDBC driver. The sink only supports append-only data. It does not support retractions and upserts from Flink's perspectives. However, you can customize the query using <code>REPLACE</code> or <code>INSERT OVERWRITE</code> to implement upsert inside the database.
    +
    +To use the JDBC sink, you have to add the JDBC connector dependency (<code>flink-jdbc</code>) to your project. Then you can create the sink using <code>JDBCAppendSinkBuilder</code>:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +
    +JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
    +  .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
    +  .setDBUrl("jdbc:derby:memory:ebookshop")
    +  .setQuery("INSERT INTO books (id) VALUES (?)")
    +  .setFieldTypes(new TypeInformation<?>[] {INT_TYPE_INFO})
    +  .build();
    +{% endhighlight %}
    +</div>
    +
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val sink = JDBCAppendTableSink.builder()
    +  .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
    +  .setDBUrl("jdbc:derby:memory:ebookshop")
    +  .setQuery("INSERT INTO books (id) VALUES (?)")
    +  .setFieldTypes(Array(INT_TYPE_INFO))
    --- End diff --
    
    use varargs?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3712#discussion_r113410993
  
    --- Diff: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSinkTest.java ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.java.io.jdbc;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.java.typeutils.RowTypeInfo;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
    +import org.apache.flink.types.Row;
    +import org.junit.Test;
    +import org.mockito.Mockito;
    +
    +import static org.junit.Assert.assertArrayEquals;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNotSame;
    +import static org.mockito.Matchers.anyString;
    +import static org.mockito.Matchers.same;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.verify;
    +
    +public class JDBCTableSinkTest {
    +	private static final String[] FIELD_NAMES = new String[] {"foo"};
    --- End diff --
    
    remove space after `[]`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3712#discussion_r120575757
  
    --- Diff: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSinkTest.java ---
    @@ -0,0 +1,79 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.io.jdbc;
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.typeutils.RowTypeInfo;
    +import org.apache.flink.runtime.state.FunctionSnapshotContext;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.types.Row;
    +
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertArrayEquals;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNotSame;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.verify;
    +
    +/**
    + * Test for JDBCTableSink.
    + */
    +public class JDBCTableSinkTest {
    +	private static final String[] FIELD_NAMES = new String[]{"foo"};
    +	private static final TypeInformation[] FIELD_TYPES = new TypeInformation[]{
    +		BasicTypeInfo.STRING_TYPE_INFO
    +	};
    +
    +	@Test
    +	public void testOutputSink() throws Exception {
    +		JDBCOutputFormat outputFormat = mock(JDBCOutputFormat.class);
    +		JDBCTableSink sink = new JDBCTableSink(outputFormat);
    +		@SuppressWarnings("unchecked")
    +		DataStream<Row> dataStream = (DataStream<Row>) mock(DataStream.class);
    +		sink.emitDataStream(dataStream);
    +		verify(dataStream).addSink(sink);
    +	}
    +
    +	@Test
    +	public void testFlush() throws Exception {
    +		JDBCOutputFormat outputFormat = mock(JDBCOutputFormat.class);
    +		JDBCTableSink sink = new JDBCTableSink(outputFormat);
    +		@SuppressWarnings("unchecked")
    +		DataStream<Row> dataStream = (DataStream<Row>) mock(DataStream.class);
    +		sink.emitDataStream(dataStream);
    +		sink.snapshotState(mock(FunctionSnapshotContext.class));
    +		verify(dataStream).addSink(sink);
    +		verify(outputFormat).flush();
    --- End diff --
    
    let's not use mocking for this test. Just create an actual format/sink, give N values to the sink where N < batchSize, verify they haven't been written yet, call flush, verify they were written.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3712#discussion_r113412006
  
    --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java ---
    @@ -0,0 +1,91 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.java.io.jdbc;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.java.typeutils.RowTypeInfo;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
    +import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink;
    +import org.apache.flink.table.sinks.StreamTableSink;
    +import org.apache.flink.table.sinks.TableSink;
    +import org.apache.flink.types.Row;
    +
    +import java.io.IOException;
    +import java.util.UUID;
    +
    +public class JDBCTableSink extends GenericWriteAheadSink<Row> implements StreamTableSink<Row> {
    +	private final JDBCOutputFormat outputFormat;
    +	private final CheckpointCommitter committer;
    +	private final String[] fieldNames;
    +	private final TypeInformation[] fieldTypes;
    +
    +	public JDBCTableSink(CheckpointCommitter committer, TypeSerializer<Row> serializer,
    +				JDBCOutputFormat outputFormat, String[] fieldNames,
    +				TypeInformation[] fieldTypes) throws Exception {
    +		super(committer, serializer, UUID.randomUUID().toString().replace("-", "_"));
    +		this.outputFormat = outputFormat;
    +		this.committer = committer;
    +		this.fieldNames = fieldNames;
    +		this.fieldTypes = fieldTypes;
    +	}
    +
    +	@Override
    +	public void emitDataStream(DataStream<Row> dataStream) {
    +		dataStream.transform("JDBC Sink", getOutputType(), this);
    +	}
    +
    +	@Override
    +	public TypeInformation<Row> getOutputType() {
    +		return new RowTypeInfo(fieldTypes, fieldNames);
    +	}
    +
    +	@Override
    +	public String[] getFieldNames() {
    +		return fieldNames;
    +	}
    +
    +	@Override
    +	public TypeInformation<?>[] getFieldTypes() {
    +		return fieldTypes;
    +	}
    +
    +	@Override
    +	public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
    +		try {
    +			return new JDBCTableSink(committer, serializer, outputFormat, fieldNames, fieldTypes);
    +		} catch (Exception e) {
    +			LOG.warn("Failed to create a copy of the sink.", e);
    +			return null;
    +		}
    +	}
    +
    +	@Override
    +	protected boolean sendValues(Iterable<Row> value, long timestamp) throws Exception {
    +		for (Row r : value) {
    +			try {
    +				outputFormat.writeRecord(r);
    --- End diff --
    
    This doesn't guarantee in any way  that the values are actually being sent; you need some kind of flushing functionality for this to work properly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3712#discussion_r132166866
  
    --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTypeUtil.java ---
    @@ -0,0 +1,87 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.io.jdbc;
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.common.typeutils.CompositeType;
    +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
    +
    +import java.sql.Types;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.BIG_DEC_TYPE_INFO;
    +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.BIG_INT_TYPE_INFO;
    +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.BOOLEAN_TYPE_INFO;
    +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.BYTE_TYPE_INFO;
    +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.CHAR_TYPE_INFO;
    +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.DATE_TYPE_INFO;
    +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.DOUBLE_TYPE_INFO;
    +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.FLOAT_TYPE_INFO;
    +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO;
    +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
    +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.SHORT_TYPE_INFO;
    +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
    +
    +class JDBCTypeUtil {
    +	private static final Map<BasicTypeInfo<?>, Integer> BASIC_TYPES;
    +
    +	static {
    +		HashMap<BasicTypeInfo<?>, Integer> m = new HashMap<>();
    +		m.put(STRING_TYPE_INFO, Types.VARCHAR);
    +		m.put(BOOLEAN_TYPE_INFO, Types.BOOLEAN);
    +		m.put(BYTE_TYPE_INFO, Types.TINYINT);
    +		m.put(SHORT_TYPE_INFO, Types.SMALLINT);
    +		m.put(INT_TYPE_INFO, Types.INTEGER);
    +		m.put(LONG_TYPE_INFO, Types.BIGINT);
    +		m.put(FLOAT_TYPE_INFO, Types.FLOAT);
    +		m.put(DOUBLE_TYPE_INFO, Types.DOUBLE);
    +		m.put(CHAR_TYPE_INFO, Types.SMALLINT);
    --- End diff --
    
    `JDBCOutputFormat` will insert a `SMALLINT` by casting to `short`. This cast will fail for `Character`. 
    
    Please double check the type assignment and align it with `JDBCOutputFormat.writeRecord()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by haohui <gi...@git.apache.org>.
Github user haohui commented on the issue:

    https://github.com/apache/flink/pull/3712
  
    Thanks @fhueske ! Updated the PR to address the comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3712#discussion_r128219296
  
    --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java ---
    @@ -0,0 +1,85 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.io.jdbc;
    +
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.typeutils.RowTypeInfo;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.table.sinks.AppendStreamTableSink;
    +import org.apache.flink.table.sinks.BatchTableSink;
    +import org.apache.flink.table.sinks.TableSink;
    +import org.apache.flink.types.Row;
    +
    +/**
    + * An at-least-once Table sink for JDBC.
    + */
    +public class JDBCAppendTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> {
    +	private final JDBCSinkFunction sink;
    +
    +	private String[] fieldNames;
    +	private TypeInformation[] fieldTypes;
    +
    +	JDBCAppendTableSink(JDBCOutputFormat outputFormat) {
    +		this.sink = new JDBCSinkFunction(outputFormat);
    +	}
    +
    +	public static JDBCAppendTableSinkBuilder builder() {
    +		return new JDBCAppendTableSinkBuilder();
    +	}
    +
    +	@Override
    +	public void emitDataStream(DataStream<Row> dataStream) {
    +		dataStream.addSink(sink);
    +	}
    +
    +	@Override
    +	public void emitDataSet(DataSet<Row> dataSet) {
    +		dataSet.output(sink.outputFormat);
    +	}
    +
    +	@Override
    +	public TypeInformation<Row> getOutputType() {
    +		return new RowTypeInfo(fieldTypes, fieldNames);
    +	}
    +
    +	@Override
    +	public String[] getFieldNames() {
    +		return fieldNames;
    +	}
    +
    +	@Override
    +	public TypeInformation<?>[] getFieldTypes() {
    +		return fieldTypes;
    +	}
    +
    +	@Override
    +	public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
    +		JDBCAppendTableSink copy = new JDBCAppendTableSink(sink.outputFormat);
    --- End diff --
    
    We could validate that the types of the `JDBCOutputFormat` match the `fieldTypes` which are provided by the optimizer. 
    Or do you have concerns regarding such a check?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3712#discussion_r128218692
  
    --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java ---
    @@ -0,0 +1,85 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.io.jdbc;
    +
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.typeutils.RowTypeInfo;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.table.sinks.AppendStreamTableSink;
    +import org.apache.flink.table.sinks.BatchTableSink;
    +import org.apache.flink.table.sinks.TableSink;
    +import org.apache.flink.types.Row;
    +
    +/**
    + * An at-least-once Table sink for JDBC.
    --- End diff --
    
    Add a comment that `exactly-once` can be achieved by idempotent insert operations?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3712#discussion_r128226313
  
    --- Diff: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkTest.java ---
    @@ -0,0 +1,79 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.io.jdbc;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.java.typeutils.RowTypeInfo;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.datastream.DataStreamSink;
    +import org.apache.flink.streaming.api.datastream.DataStreamSource;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
    +import org.apache.flink.streaming.api.operators.StreamSource;
    +import org.apache.flink.types.Row;
    +
    +import org.junit.Test;
    +import org.mockito.invocation.InvocationOnMock;
    +import org.mockito.stubbing.Answer;
    +
    +import java.io.IOException;
    +
    +import static org.junit.Assert.assertSame;
    +import static org.mockito.Matchers.any;
    +import static org.mockito.Mockito.doAnswer;
    +import static org.mockito.Mockito.mock;
    +
    +/**
    + * Test for JDBCAppendTableSink.
    + */
    +public class JDBCAppendTableSinkTest {
    +	private static final String[] FIELD_NAMES = new String[]{"foo"};
    +	private static final TypeInformation[] FIELD_TYPES = new TypeInformation[]{
    +		BasicTypeInfo.STRING_TYPE_INFO
    +	};
    +	private static final RowTypeInfo ROW_TYPE = new RowTypeInfo(FIELD_TYPES, FIELD_NAMES);
    +
    +	@Test
    +	public void testAppendTableSink() throws IOException {
    +			JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
    +				.setDrivername("foo")
    +				.setDBUrl("bar")
    +				.setQuery("insert into %s (id) values (?)")
    +				.setFieldTypes(FIELD_TYPES)
    +				.build();
    +
    +		StreamExecutionEnvironment env =
    +		mock(StreamExecutionEnvironment.class);
    +		doAnswer(new Answer() {
    +			@Override
    +			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
    +				return invocationOnMock.getArguments()[0];
    +			}
    +		}).when(env).clean(any());
    +
    +		TypeSerializer<Row> ts = ROW_TYPE.createSerializer(mock(ExecutionConfig.class));
    +		FromElementsFunction<Row> func = new FromElementsFunction<>(ts, Row.of("foo"));
    +		DataStream<Row> ds = new DataStreamSource<>(env, ROW_TYPE, new StreamSource<>(func), false, "foo");
    +		DataStreamSink<Row> dsSink = ds.addSink(sink.getSink());
    --- End diff --
    
    I think we should test for the correctness of the `emitDataStream()` method.
    Could be done as follows:
    
    ```
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    DataStream<Row> ds = env.fromCollection(Collections.singleton(Row.of("foo")), ROW_TYPE);
    sink.emitDataStream(ds);
    
    Collection<Integer> sinkIds = env.getStreamGraph().getSinkIDs();
    assertEquals(1, sinkIds.size());
    int sinkId = sinkIds.iterator().next();
    
    StreamSink planSink = (StreamSink)env.getStreamGraph().getStreamNode(sinkId).getOperator();
    assertSame(sink.getSink(), planSink.getUserFunction());
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3712#discussion_r132157359
  
    --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java ---
    @@ -0,0 +1,97 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.io.jdbc;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.util.Preconditions;
    +
    +import static org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.DEFAULT_BATCH_INTERVAL;
    +
    +/**
    + * A builder to configure and build the JDBCAppendTableSink.
    + */
    +public class JDBCAppendTableSinkBuilder {
    +	private String username;
    +	private String password;
    +	private String driverName;
    +	private String dbURL;
    +	private String query;
    +	private int batchSize = DEFAULT_BATCH_INTERVAL;
    +	private TypeInformation<?>[] fieldTypes;
    +
    +	public JDBCAppendTableSinkBuilder setUsername(String username) {
    +		this.username = username;
    +		return this;
    +	}
    +
    +	public JDBCAppendTableSinkBuilder setPassword(String password) {
    +		this.password = password;
    +		return this;
    +	}
    +
    +	public JDBCAppendTableSinkBuilder setDrivername(String drivername) {
    +		this.driverName = drivername;
    +		return this;
    +	}
    +
    +	public JDBCAppendTableSinkBuilder setDBUrl(String dbURL) {
    +		this.dbURL = dbURL;
    +		return this;
    +	}
    +
    +	public JDBCAppendTableSinkBuilder setQuery(String query) {
    +		this.query = query;
    +		return this;
    +	}
    +
    +	public JDBCAppendTableSinkBuilder setBatchSize(int batchSize) {
    +		this.batchSize = batchSize;
    +		return this;
    +	}
    +
    +	public JDBCAppendTableSinkBuilder setFieldTypes(TypeInformation<?>... fieldTypes) {
    --- End diff --
    
    Should we rename the method to `setParameterTypes()` and offer an overloaded version `setParameterTypes(int... paramTypes)` that allows to specify types as `java.sql.Types`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3712#discussion_r128220251
  
    --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java ---
    @@ -218,10 +225,7 @@ public void writeRecord(Row row) throws IOException {
     	@Override
     	public void close() throws IOException {
     		try {
    -			if (upload != null) {
    -				upload.executeBatch();
    -				upload.close();
    --- End diff --
    
    we should `close()` the `PreparedStatement`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3712#discussion_r113411053
  
    --- Diff: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSinkTest.java ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.java.io.jdbc;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.java.typeutils.RowTypeInfo;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
    +import org.apache.flink.types.Row;
    +import org.junit.Test;
    +import org.mockito.Mockito;
    +
    +import static org.junit.Assert.assertArrayEquals;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNotSame;
    +import static org.mockito.Matchers.anyString;
    +import static org.mockito.Matchers.same;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.verify;
    +
    +public class JDBCTableSinkTest {
    +	private static final String[] FIELD_NAMES = new String[] {"foo"};
    +	private static final TypeInformation[] FIELD_TYPES = new TypeInformation[] {
    --- End diff --
    
    remove space after `[]`. move `STRING_TYPE_INFO` to this line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/3712
  
    The GenericWriteAheadSink would work for this. It can be implemented just the `CassandraWriteAheadSink`. Since we can also use transactions it can be a bit more sophisticated even.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3712#discussion_r128219589
  
    --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java ---
    @@ -0,0 +1,97 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.io.jdbc;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.util.Preconditions;
    +
    +import static org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.DEFAULT_BATCH_INTERVAL;
    +
    +/**
    + * A builder to configure and build the JDBCAppendTableSink.
    + */
    +public class JDBCAppendTableSinkBuilder {
    +	private String username;
    +	private String password;
    +	private String driverName;
    +	private String dbURL;
    +	private String query;
    +	private int batchInterval = DEFAULT_BATCH_INTERVAL;
    +	private TypeInformation<?>[] fieldTypes;
    +
    +	public JDBCAppendTableSinkBuilder setUsername(String username) {
    +		this.username = username;
    +		return this;
    +	}
    +
    +	public JDBCAppendTableSinkBuilder setPassword(String password) {
    +		this.password = password;
    +		return this;
    +	}
    +
    +	public JDBCAppendTableSinkBuilder setDrivername(String drivername) {
    +		this.driverName = drivername;
    +		return this;
    +	}
    +
    +	public JDBCAppendTableSinkBuilder setDBUrl(String dbURL) {
    +		this.dbURL = dbURL;
    +		return this;
    +	}
    +
    +	public JDBCAppendTableSinkBuilder setQuery(String query) {
    +		this.query = query;
    +		return this;
    +	}
    +
    +	public JDBCAppendTableSinkBuilder setBatchInterval(int batchInterval) {
    --- End diff --
    
    IMO, interval has a temporal connotation. `batchInterval` -> `batchSize`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3712
  
    Hi @haohui, I think a JdbcTableSink would be a great feature! 
    
    However, there is a big issue with wrapping the `JdbcOutputFormat`. OutputFormats are not integrated with Flink's checkpointing mechanism. The `JdbcOutputFormat` buffers rows to write them out in batches. When records are buffered that arrived before the last checkpoint, they will be lost in case of a failure because they will not be replayed.
    
    The JdbcTableSink should be integrated with Flink's checkpointing mechanism. In a nutshell, it should buffer records and commit them to the database when a checkpoint is taken. I think we need to think a bit more about a proper design for this feature. @zentol and @aljoscha might have some thoughts on this as well as they are more familiar with the implementation of checkpoint-aware sinks.
    
    What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by haohui <gi...@git.apache.org>.
Github user haohui commented on the issue:

    https://github.com/apache/flink/pull/3712
  
    Correct me if I'm wrong -- will something like the following work?
    
    ```
    +	@Override
    +	public void snapshotState(FunctionSnapshotContext context) throws Exception {
    +		outputFormat.flush();
    +	}
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3712#discussion_r126718250
  
    --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java ---
    @@ -0,0 +1,102 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.io.jdbc;
    +
    +import org.apache.flink.api.common.functions.RuntimeContext;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.typeutils.RowTypeInfo;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.state.FunctionInitializationContext;
    +import org.apache.flink.runtime.state.FunctionSnapshotContext;
    +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.table.sinks.AppendStreamTableSink;
    +import org.apache.flink.table.sinks.TableSink;
    +import org.apache.flink.types.Row;
    +
    +/**
    + * An at-least-once Table sink for JDBC.
    + */
    +public class JDBCTableSink extends RichSinkFunction<Row>
    --- End diff --
    
    I would not extend `RichSinkFunction`. Although this might work in practice, I think this mixes the logical representation of a table (in the catalog and during optimization) with the actual runtime code. I'd rather implement a separate JdbcSinkFunction (within this file) and instantiate it in `emitDataStream()`.
    
    I also think that we should implement the `BatchTableSink` interface which would directly use the `JdbcOutputFormat`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3712#discussion_r132146108
  
    --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java ---
    @@ -0,0 +1,97 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.io.jdbc;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.util.Preconditions;
    +
    +import static org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.DEFAULT_BATCH_INTERVAL;
    +
    +/**
    + * A builder to configure and build the JDBCAppendTableSink.
    + */
    +public class JDBCAppendTableSinkBuilder {
    +	private String username;
    +	private String password;
    +	private String driverName;
    +	private String dbURL;
    +	private String query;
    +	private int batchSize = DEFAULT_BATCH_INTERVAL;
    +	private TypeInformation<?>[] fieldTypes;
    +
    +	public JDBCAppendTableSinkBuilder setUsername(String username) {
    --- End diff --
    
    Add JavaDocs to the public configuration methods.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3712#discussion_r122972333
  
    --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java ---
    @@ -0,0 +1,102 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.io.jdbc;
    +
    +import org.apache.flink.api.common.functions.RuntimeContext;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.typeutils.RowTypeInfo;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.state.FunctionInitializationContext;
    +import org.apache.flink.runtime.state.FunctionSnapshotContext;
    +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.table.sinks.AppendStreamTableSink;
    +import org.apache.flink.table.sinks.TableSink;
    +import org.apache.flink.types.Row;
    +
    +/**
    + * An at-least-once Table sink for JDBC.
    + */
    +public class JDBCTableSink extends RichSinkFunction<Row>
    --- End diff --
    
    I think it could also be easily extended to support batch output by implementing the `BatchTableSink` interface and implementing `emitDataSet(dataSet: DataSet[Row])` as
    ```
    def emitDataSet(dataSet: DataSet[Row]): Unit = {
      dataSet.output(jdbcOutputFormat)
    }
    ```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by haohui <gi...@git.apache.org>.
Github user haohui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3712#discussion_r132022891
  
    --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java ---
    @@ -0,0 +1,85 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.io.jdbc;
    +
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.typeutils.RowTypeInfo;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.table.sinks.AppendStreamTableSink;
    +import org.apache.flink.table.sinks.BatchTableSink;
    +import org.apache.flink.table.sinks.TableSink;
    +import org.apache.flink.types.Row;
    +
    +/**
    + * An at-least-once Table sink for JDBC.
    + */
    +public class JDBCAppendTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> {
    +	private final JDBCSinkFunction sink;
    +
    +	private String[] fieldNames;
    +	private TypeInformation[] fieldTypes;
    +
    +	JDBCAppendTableSink(JDBCOutputFormat outputFormat) {
    +		this.sink = new JDBCSinkFunction(outputFormat);
    +	}
    +
    +	public static JDBCAppendTableSinkBuilder builder() {
    +		return new JDBCAppendTableSinkBuilder();
    +	}
    +
    +	@Override
    +	public void emitDataStream(DataStream<Row> dataStream) {
    +		dataStream.addSink(sink);
    +	}
    +
    +	@Override
    +	public void emitDataSet(DataSet<Row> dataSet) {
    +		dataSet.output(sink.outputFormat);
    +	}
    +
    +	@Override
    +	public TypeInformation<Row> getOutputType() {
    +		return new RowTypeInfo(fieldTypes, fieldNames);
    +	}
    +
    +	@Override
    +	public String[] getFieldNames() {
    +		return fieldNames;
    +	}
    +
    +	@Override
    +	public TypeInformation<?>[] getFieldTypes() {
    +		return fieldTypes;
    +	}
    +
    +	@Override
    +	public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
    +		JDBCAppendTableSink copy = new JDBCAppendTableSink(sink.outputFormat);
    --- End diff --
    
    The `JDBCOutputFormat` now is only constructed via `JDBCAppendableSinkBuilder`, thus the types should always match, but it is a good idea to add the checks to catch potential bugs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3712#discussion_r132142854
  
    --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java ---
    @@ -0,0 +1,97 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.io.jdbc;
    +
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.typeutils.RowTypeInfo;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.table.sinks.AppendStreamTableSink;
    +import org.apache.flink.table.sinks.BatchTableSink;
    +import org.apache.flink.table.sinks.TableSink;
    +import org.apache.flink.types.Row;
    +import org.apache.flink.util.Preconditions;
    +
    +/**
    + * An at-least-once Table sink for JDBC.
    + *
    + * <p>The mechanisms of Flink guarantees delivering messages at-least-once to this sink.
    + * However, one common use case is to run idempotent queries (e.g., <code>REPLACE</code> or
    + * <code>INSERT OVERWRITE</code>) to upsert into the database and achieve exactly-once semantic.</p>
    + */
    +public class JDBCAppendTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> {
    +	private final JDBCSinkFunction sink;
    +
    +	private String[] fieldNames;
    +	private TypeInformation[] fieldTypes;
    +
    +	JDBCAppendTableSink(JDBCOutputFormat outputFormat) {
    +		this.sink = new JDBCSinkFunction(outputFormat);
    +	}
    +
    +	public static JDBCAppendTableSinkBuilder builder() {
    +		return new JDBCAppendTableSinkBuilder();
    +	}
    +
    +	@Override
    +	public void emitDataStream(DataStream<Row> dataStream) {
    +		dataStream.addSink(sink);
    +	}
    +
    +	@Override
    +	public void emitDataSet(DataSet<Row> dataSet) {
    +		dataSet.output(sink.outputFormat);
    +	}
    +
    +	@Override
    +	public TypeInformation<Row> getOutputType() {
    +		return new RowTypeInfo(fieldTypes, fieldNames);
    +	}
    +
    +	@Override
    +	public String[] getFieldNames() {
    +		return fieldNames;
    +	}
    +
    +	@Override
    +	public TypeInformation<?>[] getFieldTypes() {
    +		return fieldTypes;
    +	}
    +
    +	@Override
    +	public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
    +		int[] types = sink.outputFormat.getTypesArray();
    +		Preconditions.checkArgument(fieldTypes.length == types.length);
    +		for (int i = 0; i < types.length; ++i) {
    +			Preconditions.checkArgument(JDBCTypeUtil.typeInformationToSqlType(fieldTypes[i]) == types[i],
    +				"Incompatible types between fields and JDBC format at " + i);
    +		}
    +
    +		JDBCAppendTableSink copy = new JDBCAppendTableSink(sink.outputFormat);
    --- End diff --
    
    Passing the reference should be fine, but to be sure we could create a deep copy via `SerializationUtils.clone()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by haohui <gi...@git.apache.org>.
Github user haohui commented on the issue:

    https://github.com/apache/flink/pull/3712
  
    Thanks for your pointer of the prototype!
    
    > Do you intend to provide exactly-once guarantees for arbitrary updates?
    
    As I think about it a little bit more, I think it might make sense to start with the at-least-once semantic first. In practice we make the JDBC call idempotent using `INSERT IF NOT EXISTS`.
    
    The exactly-once part is more tricky and let's separate it out for now. What do you think?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3712#discussion_r120576032
  
    --- Diff: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSinkTest.java ---
    @@ -0,0 +1,79 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.io.jdbc;
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.typeutils.RowTypeInfo;
    +import org.apache.flink.runtime.state.FunctionSnapshotContext;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.types.Row;
    +
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertArrayEquals;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNotSame;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.verify;
    +
    +/**
    + * Test for JDBCTableSink.
    + */
    +public class JDBCTableSinkTest {
    +	private static final String[] FIELD_NAMES = new String[]{"foo"};
    +	private static final TypeInformation[] FIELD_TYPES = new TypeInformation[]{
    +		BasicTypeInfo.STRING_TYPE_INFO
    +	};
    +
    +	@Test
    +	public void testOutputSink() throws Exception {
    +		JDBCOutputFormat outputFormat = mock(JDBCOutputFormat.class);
    +		JDBCTableSink sink = new JDBCTableSink(outputFormat);
    +		@SuppressWarnings("unchecked")
    +		DataStream<Row> dataStream = (DataStream<Row>) mock(DataStream.class);
    +		sink.emitDataStream(dataStream);
    +		verify(dataStream).addSink(sink);
    --- End diff --
    
    you don't have to test this, as it is not a detail of the JDBCTableSink but the table API.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3712#discussion_r126724241
  
    --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java ---
    @@ -0,0 +1,102 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.io.jdbc;
    +
    +import org.apache.flink.api.common.functions.RuntimeContext;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.typeutils.RowTypeInfo;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.state.FunctionInitializationContext;
    +import org.apache.flink.runtime.state.FunctionSnapshotContext;
    +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.table.sinks.AppendStreamTableSink;
    +import org.apache.flink.table.sinks.TableSink;
    +import org.apache.flink.types.Row;
    +
    +/**
    + * An at-least-once Table sink for JDBC.
    + */
    +public class JDBCTableSink extends RichSinkFunction<Row>
    +	implements AppendStreamTableSink<Row>, CheckpointedFunction {
    +	private final JDBCOutputFormat outputFormat;
    +
    +	private String[] fieldNames;
    +	private TypeInformation[] fieldTypes;
    +
    +	public JDBCTableSink(JDBCOutputFormat outputFormat) {
    +		this.outputFormat = outputFormat;
    +	}
    +
    +	@Override
    +	public void emitDataStream(DataStream<Row> dataStream) {
    +		dataStream.addSink(this);
    +	}
    +
    +	@Override
    +	public TypeInformation<Row> getOutputType() {
    +		return new RowTypeInfo(fieldTypes, fieldNames);
    +	}
    +
    +	@Override
    +	public String[] getFieldNames() {
    +		return fieldNames;
    +	}
    +
    +	@Override
    +	public TypeInformation<?>[] getFieldTypes() {
    +		return fieldTypes;
    +	}
    +
    +	@Override
    +	public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
    +		JDBCTableSink copy = new JDBCTableSink(outputFormat);
    +		copy.fieldNames = fieldNames;
    +		copy.fieldTypes = fieldTypes;
    --- End diff --
    
    We could validate that the types provided by the Table API are compatible with the types that the `JdbcOutputFormat` expects to avoid exceptions during execution.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3712#discussion_r122970495
  
    --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java ---
    @@ -0,0 +1,102 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.io.jdbc;
    +
    +import org.apache.flink.api.common.functions.RuntimeContext;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.typeutils.RowTypeInfo;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.state.FunctionInitializationContext;
    +import org.apache.flink.runtime.state.FunctionSnapshotContext;
    +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.table.sinks.AppendStreamTableSink;
    +import org.apache.flink.table.sinks.TableSink;
    +import org.apache.flink.types.Row;
    +
    +/**
    + * An at-least-once Table sink for JDBC.
    + */
    +public class JDBCTableSink extends RichSinkFunction<Row>
    +	implements AppendStreamTableSink<Row>, CheckpointedFunction {
    +	private final JDBCOutputFormat outputFormat;
    +
    +	private String[] fieldNames;
    +	private TypeInformation[] fieldTypes;
    +
    +	public JDBCTableSink(JDBCOutputFormat outputFormat) {
    --- End diff --
    
    What do you think about not exposing the `JDBCOutputFormat` to the user, but to configure it internally.
    
    Of course we would need many of the configuration parameters (user, pw, driver, dburl, and table name). Users could either specify field names of the table to write to (fields are mapped by position) or not (we use the field names of the `Table` to emit). For this information we can construct a parameterized insert query: `INSERT INTO $table ($f1, $f2, $f3) VALUES (?, ?, ?)`.  The field types are automatically provided by the `configure()` call.
    
    This would be a tighter integration with the Table API (using provided field types and possibly field names).
    Does this work for your use case or do you need the flexibility of specifying your own query?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by haohui <gi...@git.apache.org>.
Github user haohui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3712#discussion_r120765156
  
    --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java ---
    @@ -202,14 +202,20 @@ public void writeRecord(Row row) throws IOException {
     			upload.addBatch();
     			batchCount++;
     			if (batchCount >= batchInterval) {
    -				upload.executeBatch();
    -				batchCount = 0;
    +				flush();
     			}
     		} catch (SQLException | IllegalArgumentException e) {
     			throw new IllegalArgumentException("writeRecord() failed", e);
     		}
     	}
     
    +	void flush() throws SQLException {
    +		if (upload != null) {
    +			upload.executeBatch();
    --- End diff --
    
    It is a synchronous call. It will throw `SQLException` and abort the sink. The behavior has not been changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3712
  
    Yes, I think that would work well. We could provide a builder for the table source and forward calls to the `JDBCOutputFormatBuilder`. 
    
    Regarding the query template: we can also make it an optional parameter to override the standard `INSERT INTO ... VALUES` template if set. 
    
    If you want the source to support upsert writes you might want to implement the `UpsertStreamTableSink` rather than the `AppendStreamTableSink`. The `UpsertStreamTableSink` supports append writes as well as updates and deletes by exposing the unique key fields of a `Table`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by haohui <gi...@git.apache.org>.
Github user haohui commented on the issue:

    https://github.com/apache/flink/pull/3712
  
    It is important to have some flexibility on the query as different SQL engines have slightly different syntax on DML.
    
    For example, SQLite supports INSERT OVERWRITE where MySQL supports REPLACE INTO to upsert records with unique keys.
    
    I like the APIs you proposed, do you think it addresses your concerns if it forwards the parameters to JDbCOutputFormat and construct it internally?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3712
  
    It would also be good to add the `JdbcTableSink` to the documentation (incl. an example) once the API is fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3712#discussion_r122971462
  
    --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java ---
    @@ -0,0 +1,102 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.io.jdbc;
    +
    +import org.apache.flink.api.common.functions.RuntimeContext;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.typeutils.RowTypeInfo;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.state.FunctionInitializationContext;
    +import org.apache.flink.runtime.state.FunctionSnapshotContext;
    +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.table.sinks.AppendStreamTableSink;
    +import org.apache.flink.table.sinks.TableSink;
    +import org.apache.flink.types.Row;
    +
    +/**
    + * An at-least-once Table sink for JDBC.
    + */
    +public class JDBCTableSink extends RichSinkFunction<Row>
    --- End diff --
    
    Rename to `JDBCAppendTableSink`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---