You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Swapnil Chougule <th...@gmail.com> on 2016/09/12 06:41:16 UTC

Flink JDBC JDBCOutputFormat Open

Hi Team,

I want to know how tasknumber & numtasks help in opening db connection in
Flink JDBC JDBCOutputFormat Open. I checked with docs where it says:
taskNumber - The number of the parallel instance.numTasks - The number of
parallel tasks.But couldn't get clear idea among parallel instance &
parallel tasks. How do they contribute in concurrency with JDBC Source/Sink?

I also checked with code but couldn't drill down further

/**
* Connects to the target database and initializes the prepared statement.
*
* @param taskNumber The number of the parallel instance.
* @throws IOException Thrown, if the output could not be opened due to an
* I/O problem.
*/
@Override
public void open(int taskNumber, int numTasks) throws IOException {
try {
establishConnection();
upload = dbConn.prepareStatement(query);
} catch (SQLException sqe) {
throw new IllegalArgumentException("open() failed.", sqe);
} catch (ClassNotFoundException cnfe) {
throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
}
}
private void establishConnection() throws SQLException,
ClassNotFoundException {
Class.forName(drivername);
if (username == null) {
dbConn = DriverManager.getConnection(dbURL);
} else {
dbConn = DriverManager.getConnection(dbURL, username, password);
}
}

Thanks,
Swapnil

Re: Flink JDBC JDBCOutputFormat Open

Posted by Swapnil Chougule <th...@gmail.com>.
Thanks Chesnay for update.

On Tue, Sep 13, 2016 at 12:13 AM, Chesnay Schepler <ch...@apache.org>
wrote:

> Hello,
>
> the JDBC Sink completely ignores the taskNumber and parallelism.
>
> Regards,
> Chesnay
>
>
> On 12.09.2016 08:41, Swapnil Chougule wrote:
>
> Hi Team,
>
> I want to know how tasknumber & numtasks help in opening db connection in
> Flink JDBC JDBCOutputFormat Open. I checked with docs where it says:
> taskNumber - The number of the parallel instance. numTasks - The number
> of parallel tasks. But couldn't get clear idea among parallel instance &
> parallel tasks. How do they contribute in concurrency with JDBC Source/Sink?
>
> I also checked with code but couldn't drill down further
>
> /**
> * Connects to the target database and initializes the prepared statement.
> *
> * @param taskNumber The number of the parallel instance.
> * @throws IOException Thrown, if the output could not be opened due to an
> * I/O problem.
> */
> @Override
> public void open(int taskNumber, int numTasks) throws IOException {
> try {
> establishConnection();
> upload = dbConn.prepareStatement(query);
> } catch (SQLException sqe) {
> throw new IllegalArgumentException("open() failed.", sqe);
> } catch (ClassNotFoundException cnfe) {
> throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
> }
> }
>
> private void establishConnection() throws SQLException,
> ClassNotFoundException {
> Class.forName(drivername);
> if (username == null) {
> dbConn = DriverManager.getConnection(dbURL);
> } else {
> dbConn = DriverManager.getConnection(dbURL, username, password);
> }
> }
>
> Thanks,
> Swapnil
>
>
>
>

Re: Flink JDBC JDBCOutputFormat Open

Posted by Chesnay Schepler <ch...@apache.org>.
Hello,

the JDBC Sink completely ignores the taskNumber and parallelism.

Regards,
Chesnay

On 12.09.2016 08:41, Swapnil Chougule wrote:
> Hi Team,
>
> I want to know how tasknumber & numtasks help in opening db connection 
> in Flink JDBC JDBCOutputFormat Open. I checked with docs where it says:
>
>     |taskNumber| - The number of the parallel instance.
>     |numTasks| - The number of parallel tasks.
>
> But couldn't get clear idea among parallel instance & parallel tasks. 
> How do they contribute in concurrency with JDBC Source/Sink?
>
> I also checked with code but couldn't drill down further
>
> /**
> * Connects to the target database and initializes the prepared statement.
>
> *
>
> * @param taskNumber The number of the parallel instance.
>
> * @throws IOException Thrown, if the output could not be opened due to an
>
> * I/O problem.
>
> */
>
> @Override
>
> public void open(int taskNumber, int numTasks) throws IOException {
>
> try {
>
> establishConnection();
>
> upload = dbConn.prepareStatement(query);
>
> } catch (SQLException sqe) {
>
> throw new IllegalArgumentException("open() failed.", sqe);
>
> } catch (ClassNotFoundException cnfe) {
>
> throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
>
> }
>
> }
>
>
> private void establishConnection() throws SQLException, 
> ClassNotFoundException {
>
> Class.forName(drivername);
>
> if (username == null) {
>
> dbConn = DriverManager.getConnection(dbURL);
>
> } else {
>
> dbConn = DriverManager.getConnection(dbURL, username, password);
>
> }
>
> }
>
> Thanks,
> Swapnil
>
>