You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Flavio Pompermaier <po...@okkam.it> on 2017/09/06 09:38:22 UTC

Apache Phenix integration

Hi to all,
I'm writing a job that uses Apache Phoenix.

At first I used the PhoenixOutputFormat as (hadoop) OutputFormat but it's
not well suited to work with Table API because it cannot handle generic
objects like Rows (it need a DBWritable Object that should be already
present at compile time). So I've looked into the code of the
PhoenixOutputFormat and it's nothing else than a JDBCOutputFormat
(basically).

However, to make it work I had to slightly modify the Flink
JDBCOutputformat, adding a dbConn.commit() after the executeBatch() on the
PreparedStatement. E.g:

    upload.executeBatch();
    dbConn.commit();

For the moment I've just created a dedicated PhoenixJdbcOutpuFormat where
I've added these 2 lines of code starting from the code of the
JDBCOutputformat (it couldn't be extended in this case because all fields
are private).

What do you think about this? Should I open a ticket to add a connection
commit after executeBatch (in order to be compatible with Phoenix) or
something else (e.g. create a Phoenix connector that basically extend
JDBCOutputConnector and ovewrite 2 methods, changing also the visibility of
its fields to protected)?

Best,
Flavio

Re: Apache Phenix integration

Posted by Flavio Pompermaier <po...@okkam.it>.
I opened an issue for this: https://issues.apache.org/jira/browse/FLINK-7605

On Wed, Sep 6, 2017 at 4:24 PM, Flavio Pompermaier <po...@okkam.it>
wrote:

> Maybe this should be well documented also...is there any dedicated page to
> Flink and JDBC connectors?
>
> On Wed, Sep 6, 2017 at 4:12 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Great!
>>
>> If you want to, you can open a PR that adds
>>
>> if (!conn.getAutoCommit()) {
>>   conn.setAutoCommit(true);
>> }
>>
>> to JdbcOutputFormat.open().
>>
>> Cheers, Fabian
>>
>>
>>
>> 2017-09-06 15:55 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>
>>> Hi Fabian,
>>> thanks for the detailed answer. Obviously you are right :)
>>> As stated by https://phoenix.apache.org/tuning.html auto-commit is
>>> disabled by default in Phoenix, but it can be easily enabled just appending
>>> AutoCommit=true to the connection URL or, equivalently, setting the proper
>>> property in the conf object passed to the Phoenix
>>> QueryUtil.getConnectionUrl method that autogenerate the connection URL,
>>> i.e.:
>>>
>>> ----------------------
>>> Job job = Job.getInstance(HBaseConfiguration.create(),
>>> "phoenix-mr-job");
>>> jobConf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
>>> PhoenixRuntime.AUTO_COMMIT_ATTRIB + "=true");
>>> final Properties phoenixProps = PropertiesUtil.extractProperties(new
>>> Properties(), jobConf);
>>> String connUrl = QueryUtil.getConnectionUrl(phoenixProps, jobConf);
>>> ----------------------
>>>
>>> Now my job works also with the standard Flink JDBCOutputformat.
>>> Just to help other people willing to play with Phoenix and HBase I paste
>>> below my simple test job:
>>>
>>> @Test
>>>   public void testPhoenixOutputFormat() throws Exception {
>>>
>>>     final StreamExecutionEnvironment senv = getStreamingExecutionEnv();
>>>     senv.enableCheckpointing(5000);
>>>     DataStream<String> testStream = senv.fromElements("1,aaa,XXX",
>>> "2,bbb,YYY", "3,ccc,ZZZ");
>>>
>>>     // Set the target Phoenix table and the columns
>>>     DataStream<Row> rows = testStream.map(new MapFunction<String, Row>()
>>> {
>>>
>>>       private static final long serialVersionUID = 1L;
>>>
>>>       @Override
>>>       public Row map(String str) throws Exception {
>>>         String[] split = str.split(Pattern.quote(","));
>>>         Row ret = new Row(3);
>>>         ret.setField(0, split[0]);
>>>         ret.setField(1, split[1]);
>>>         ret.setField(2, split[2]);
>>>         return ret;
>>>       }
>>>     }).returns(new RowTypeInfo(BasicTypeInfo.STRI
>>> NG_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.
>>> STRING_TYPE_INFO));
>>>
>>>     Job job = Job.getInstance(HBaseConfiguration.create(),
>>> "phoenix-mr-job");
>>>     PhoenixMapReduceUtil.setOutput(job, "MY_TABLE",
>>> "FIELD_1,FIELD2,FIELD_3");
>>>     final org.apache.hadoop.conf.Configuration jobConf =
>>> job.getConfiguration();
>>>     jobConf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
>>> PhoenixRuntime.AUTO_COMMIT_ATTRIB + "=true");
>>>     final String upsertStatement = PhoenixConfigurationUtil.getUp
>>> sertStatement(jobConf);
>>>     final Properties phoenixProps = PropertiesUtil.extractProperties(new
>>> Properties(), jobConf);
>>>     String connUrl = QueryUtil.getConnectionUrl(phoenixProps, jobConf);
>>>
>>>     rows.writeUsingOutputFormat(JDBCOutputFormat.buildJDBCOutputFormat()
>>>         .setDrivername(org.apache.phoenix.jdbc.PhoenixDriver.class.g
>>> etCanonicalName())
>>>         .setDBUrl(connUrl)
>>>         .setQuery(upsertStatement)
>>>         .setSqlTypes(new int[]{Types.VARCHAR, Types.VARCHAR,
>>> Types.VARCHAR})
>>>         .finish());
>>>
>>>     senv.execute();
>>>   }
>>>
>>> Best,
>>> Flavio
>>>
>>> On Wed, Sep 6, 2017 at 3:26 PM, Fabian Hueske <fh...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> According to the JavaDocs of java.sql.Connection, commit() will throw
>>>> an exception if the connection is in auto commit mode which should be the
>>>> default.
>>>> So adding this change to the JdbcOutputFormat seems a bit risky.
>>>>
>>>> Maybe the Phoenix JDBC connector does not enable auto commits by
>>>> default (or doesn't support it). Can you check that Flavio?
>>>> If the Phoenix connector supports but not activates auto commits by
>>>> default, we can enable it in JdbcOutputFormat.open().
>>>> If auto commits are not supported, we can add a check after execute()
>>>> and call commit() only if Connection.getAutoCommit() returns false.
>>>>
>>>> Best, Fabian
>>>>
>>>>
>>>> 2017-09-06 11:38 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>>>
>>>>> Hi to all,
>>>>> I'm writing a job that uses Apache Phoenix.
>>>>>
>>>>> At first I used the PhoenixOutputFormat as (hadoop) OutputFormat but
>>>>> it's not well suited to work with Table API because it cannot handle
>>>>> generic objects like Rows (it need a DBWritable Object that should be
>>>>> already present at compile time). So I've looked into the code of the
>>>>> PhoenixOutputFormat and it's nothing else than a JDBCOutputFormat
>>>>> (basically).
>>>>>
>>>>> However, to make it work I had to slightly modify the Flink
>>>>> JDBCOutputformat, adding a dbConn.commit() after the executeBatch() on the
>>>>> PreparedStatement. E.g:
>>>>>
>>>>>     upload.executeBatch();
>>>>>     dbConn.commit();
>>>>>
>>>>> For the moment I've just created a dedicated PhoenixJdbcOutpuFormat
>>>>> where I've added these 2 lines of code starting from the code of the
>>>>> JDBCOutputformat (it couldn't be extended in this case because all fields
>>>>> are private).
>>>>>
>>>>> What do you think about this? Should I open a ticket to add a
>>>>> connection commit after executeBatch (in order to be compatible with
>>>>> Phoenix) or something else (e.g. create a Phoenix connector that basically
>>>>> extend JDBCOutputConnector and ovewrite 2 methods, changing also the
>>>>> visibility of its fields to protected)?
>>>>>
>>>>> Best,
>>>>> Flavio
>>>>>
>>>>>
>>>>
>>>
>>>
>>
>

Re: Apache Phenix integration

Posted by Flavio Pompermaier <po...@okkam.it>.
Maybe this should be well documented also...is there any dedicated page to
Flink and JDBC connectors?

On Wed, Sep 6, 2017 at 4:12 PM, Fabian Hueske <fh...@gmail.com> wrote:

> Great!
>
> If you want to, you can open a PR that adds
>
> if (!conn.getAutoCommit()) {
>   conn.setAutoCommit(true);
> }
>
> to JdbcOutputFormat.open().
>
> Cheers, Fabian
>
>
>
> 2017-09-06 15:55 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>
>> Hi Fabian,
>> thanks for the detailed answer. Obviously you are right :)
>> As stated by https://phoenix.apache.org/tuning.html auto-commit is
>> disabled by default in Phoenix, but it can be easily enabled just appending
>> AutoCommit=true to the connection URL or, equivalently, setting the proper
>> property in the conf object passed to the Phoenix
>> QueryUtil.getConnectionUrl method that autogenerate the connection URL,
>> i.e.:
>>
>> ----------------------
>> Job job = Job.getInstance(HBaseConfiguration.create(), "phoenix-mr-job");
>> jobConf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
>> PhoenixRuntime.AUTO_COMMIT_ATTRIB + "=true");
>> final Properties phoenixProps = PropertiesUtil.extractProperties(new
>> Properties(), jobConf);
>> String connUrl = QueryUtil.getConnectionUrl(phoenixProps, jobConf);
>> ----------------------
>>
>> Now my job works also with the standard Flink JDBCOutputformat.
>> Just to help other people willing to play with Phoenix and HBase I paste
>> below my simple test job:
>>
>> @Test
>>   public void testPhoenixOutputFormat() throws Exception {
>>
>>     final StreamExecutionEnvironment senv = getStreamingExecutionEnv();
>>     senv.enableCheckpointing(5000);
>>     DataStream<String> testStream = senv.fromElements("1,aaa,XXX",
>> "2,bbb,YYY", "3,ccc,ZZZ");
>>
>>     // Set the target Phoenix table and the columns
>>     DataStream<Row> rows = testStream.map(new MapFunction<String, Row>() {
>>
>>       private static final long serialVersionUID = 1L;
>>
>>       @Override
>>       public Row map(String str) throws Exception {
>>         String[] split = str.split(Pattern.quote(","));
>>         Row ret = new Row(3);
>>         ret.setField(0, split[0]);
>>         ret.setField(1, split[1]);
>>         ret.setField(2, split[2]);
>>         return ret;
>>       }
>>     }).returns(new RowTypeInfo(BasicTypeInfo.STRI
>> NG_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_
>> INFO));
>>
>>     Job job = Job.getInstance(HBaseConfiguration.create(),
>> "phoenix-mr-job");
>>     PhoenixMapReduceUtil.setOutput(job, "MY_TABLE",
>> "FIELD_1,FIELD2,FIELD_3");
>>     final org.apache.hadoop.conf.Configuration jobConf =
>> job.getConfiguration();
>>     jobConf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
>> PhoenixRuntime.AUTO_COMMIT_ATTRIB + "=true");
>>     final String upsertStatement = PhoenixConfigurationUtil.getUp
>> sertStatement(jobConf);
>>     final Properties phoenixProps = PropertiesUtil.extractProperties(new
>> Properties(), jobConf);
>>     String connUrl = QueryUtil.getConnectionUrl(phoenixProps, jobConf);
>>
>>     rows.writeUsingOutputFormat(JDBCOutputFormat.buildJDBCOutputFormat()
>>         .setDrivername(org.apache.phoenix.jdbc.PhoenixDriver.class.
>> getCanonicalName())
>>         .setDBUrl(connUrl)
>>         .setQuery(upsertStatement)
>>         .setSqlTypes(new int[]{Types.VARCHAR, Types.VARCHAR,
>> Types.VARCHAR})
>>         .finish());
>>
>>     senv.execute();
>>   }
>>
>> Best,
>> Flavio
>>
>> On Wed, Sep 6, 2017 at 3:26 PM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> According to the JavaDocs of java.sql.Connection, commit() will throw an
>>> exception if the connection is in auto commit mode which should be the
>>> default.
>>> So adding this change to the JdbcOutputFormat seems a bit risky.
>>>
>>> Maybe the Phoenix JDBC connector does not enable auto commits by default
>>> (or doesn't support it). Can you check that Flavio?
>>> If the Phoenix connector supports but not activates auto commits by
>>> default, we can enable it in JdbcOutputFormat.open().
>>> If auto commits are not supported, we can add a check after execute()
>>> and call commit() only if Connection.getAutoCommit() returns false.
>>>
>>> Best, Fabian
>>>
>>>
>>> 2017-09-06 11:38 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>>
>>>> Hi to all,
>>>> I'm writing a job that uses Apache Phoenix.
>>>>
>>>> At first I used the PhoenixOutputFormat as (hadoop) OutputFormat but
>>>> it's not well suited to work with Table API because it cannot handle
>>>> generic objects like Rows (it need a DBWritable Object that should be
>>>> already present at compile time). So I've looked into the code of the
>>>> PhoenixOutputFormat and it's nothing else than a JDBCOutputFormat
>>>> (basically).
>>>>
>>>> However, to make it work I had to slightly modify the Flink
>>>> JDBCOutputformat, adding a dbConn.commit() after the executeBatch() on the
>>>> PreparedStatement. E.g:
>>>>
>>>>     upload.executeBatch();
>>>>     dbConn.commit();
>>>>
>>>> For the moment I've just created a dedicated PhoenixJdbcOutpuFormat
>>>> where I've added these 2 lines of code starting from the code of the
>>>> JDBCOutputformat (it couldn't be extended in this case because all fields
>>>> are private).
>>>>
>>>> What do you think about this? Should I open a ticket to add a
>>>> connection commit after executeBatch (in order to be compatible with
>>>> Phoenix) or something else (e.g. create a Phoenix connector that basically
>>>> extend JDBCOutputConnector and ovewrite 2 methods, changing also the
>>>> visibility of its fields to protected)?
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>>
>>>
>>
>>
>

Re: Apache Phenix integration

Posted by Fabian Hueske <fh...@gmail.com>.
Great!

If you want to, you can open a PR that adds

if (!conn.getAutoCommit()) {
  conn.setAutoCommit(true);
}

to JdbcOutputFormat.open().

Cheers, Fabian



2017-09-06 15:55 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:

> Hi Fabian,
> thanks for the detailed answer. Obviously you are right :)
> As stated by https://phoenix.apache.org/tuning.html auto-commit is
> disabled by default in Phoenix, but it can be easily enabled just appending
> AutoCommit=true to the connection URL or, equivalently, setting the proper
> property in the conf object passed to the Phoenix
> QueryUtil.getConnectionUrl method that autogenerate the connection URL,
> i.e.:
>
> ----------------------
> Job job = Job.getInstance(HBaseConfiguration.create(), "phoenix-mr-job");
> jobConf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
> PhoenixRuntime.AUTO_COMMIT_ATTRIB + "=true");
> final Properties phoenixProps = PropertiesUtil.extractProperties(new
> Properties(), jobConf);
> String connUrl = QueryUtil.getConnectionUrl(phoenixProps, jobConf);
> ----------------------
>
> Now my job works also with the standard Flink JDBCOutputformat.
> Just to help other people willing to play with Phoenix and HBase I paste
> below my simple test job:
>
> @Test
>   public void testPhoenixOutputFormat() throws Exception {
>
>     final StreamExecutionEnvironment senv = getStreamingExecutionEnv();
>     senv.enableCheckpointing(5000);
>     DataStream<String> testStream = senv.fromElements("1,aaa,XXX",
> "2,bbb,YYY", "3,ccc,ZZZ");
>
>     // Set the target Phoenix table and the columns
>     DataStream<Row> rows = testStream.map(new MapFunction<String, Row>() {
>
>       private static final long serialVersionUID = 1L;
>
>       @Override
>       public Row map(String str) throws Exception {
>         String[] split = str.split(Pattern.quote(","));
>         Row ret = new Row(3);
>         ret.setField(0, split[0]);
>         ret.setField(1, split[1]);
>         ret.setField(2, split[2]);
>         return ret;
>       }
>     }).returns(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO));
>
>     Job job = Job.getInstance(HBaseConfiguration.create(),
> "phoenix-mr-job");
>     PhoenixMapReduceUtil.setOutput(job, "MY_TABLE",
> "FIELD_1,FIELD2,FIELD_3");
>     final org.apache.hadoop.conf.Configuration jobConf =
> job.getConfiguration();
>     jobConf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
> PhoenixRuntime.AUTO_COMMIT_ATTRIB + "=true");
>     final String upsertStatement = PhoenixConfigurationUtil.
> getUpsertStatement(jobConf);
>     final Properties phoenixProps = PropertiesUtil.extractProperties(new
> Properties(), jobConf);
>     String connUrl = QueryUtil.getConnectionUrl(phoenixProps, jobConf);
>
>     rows.writeUsingOutputFormat(JDBCOutputFormat.buildJDBCOutputFormat()
>         .setDrivername(org.apache.phoenix.jdbc.PhoenixDriver.
> class.getCanonicalName())
>         .setDBUrl(connUrl)
>         .setQuery(upsertStatement)
>         .setSqlTypes(new int[]{Types.VARCHAR, Types.VARCHAR,
> Types.VARCHAR})
>         .finish());
>
>     senv.execute();
>   }
>
> Best,
> Flavio
>
> On Wed, Sep 6, 2017 at 3:26 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi,
>>
>> According to the JavaDocs of java.sql.Connection, commit() will throw an
>> exception if the connection is in auto commit mode which should be the
>> default.
>> So adding this change to the JdbcOutputFormat seems a bit risky.
>>
>> Maybe the Phoenix JDBC connector does not enable auto commits by default
>> (or doesn't support it). Can you check that Flavio?
>> If the Phoenix connector supports but not activates auto commits by
>> default, we can enable it in JdbcOutputFormat.open().
>> If auto commits are not supported, we can add a check after execute() and
>> call commit() only if Connection.getAutoCommit() returns false.
>>
>> Best, Fabian
>>
>>
>> 2017-09-06 11:38 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>
>>> Hi to all,
>>> I'm writing a job that uses Apache Phoenix.
>>>
>>> At first I used the PhoenixOutputFormat as (hadoop) OutputFormat but
>>> it's not well suited to work with Table API because it cannot handle
>>> generic objects like Rows (it need a DBWritable Object that should be
>>> already present at compile time). So I've looked into the code of the
>>> PhoenixOutputFormat and it's nothing else than a JDBCOutputFormat
>>> (basically).
>>>
>>> However, to make it work I had to slightly modify the Flink
>>> JDBCOutputformat, adding a dbConn.commit() after the executeBatch() on the
>>> PreparedStatement. E.g:
>>>
>>>     upload.executeBatch();
>>>     dbConn.commit();
>>>
>>> For the moment I've just created a dedicated PhoenixJdbcOutpuFormat
>>> where I've added these 2 lines of code starting from the code of the
>>> JDBCOutputformat (it couldn't be extended in this case because all fields
>>> are private).
>>>
>>> What do you think about this? Should I open a ticket to add a connection
>>> commit after executeBatch (in order to be compatible with Phoenix) or
>>> something else (e.g. create a Phoenix connector that basically extend
>>> JDBCOutputConnector and ovewrite 2 methods, changing also the visibility of
>>> its fields to protected)?
>>>
>>> Best,
>>> Flavio
>>>
>>>
>>
>
>

Re: Apache Phenix integration

Posted by Flavio Pompermaier <po...@okkam.it>.
Hi Fabian,
thanks for the detailed answer. Obviously you are right :)
As stated by https://phoenix.apache.org/tuning.html auto-commit is disabled
by default in Phoenix, but it can be easily enabled just appending
AutoCommit=true to the connection URL or, equivalently, setting the proper
property in the conf object passed to the Phoenix
QueryUtil.getConnectionUrl method that autogenerate the connection URL,
i.e.:

----------------------
Job job = Job.getInstance(HBaseConfiguration.create(), "phoenix-mr-job");
jobConf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
PhoenixRuntime.AUTO_COMMIT_ATTRIB + "=true");
final Properties phoenixProps = PropertiesUtil.extractProperties(new
Properties(), jobConf);
String connUrl = QueryUtil.getConnectionUrl(phoenixProps, jobConf);
----------------------

Now my job works also with the standard Flink JDBCOutputformat.
Just to help other people willing to play with Phoenix and HBase I paste
below my simple test job:

@Test
  public void testPhoenixOutputFormat() throws Exception {

    final StreamExecutionEnvironment senv = getStreamingExecutionEnv();
    senv.enableCheckpointing(5000);
    DataStream<String> testStream = senv.fromElements("1,aaa,XXX",
"2,bbb,YYY", "3,ccc,ZZZ");

    // Set the target Phoenix table and the columns
    DataStream<Row> rows = testStream.map(new MapFunction<String, Row>() {

      private static final long serialVersionUID = 1L;

      @Override
      public Row map(String str) throws Exception {
        String[] split = str.split(Pattern.quote(","));
        Row ret = new Row(3);
        ret.setField(0, split[0]);
        ret.setField(1, split[1]);
        ret.setField(2, split[2]);
        return ret;
      }
    }).returns(new
RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO));

    Job job = Job.getInstance(HBaseConfiguration.create(),
"phoenix-mr-job");
    PhoenixMapReduceUtil.setOutput(job, "MY_TABLE",
"FIELD_1,FIELD2,FIELD_3");
    final org.apache.hadoop.conf.Configuration jobConf =
job.getConfiguration();
    jobConf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
PhoenixRuntime.AUTO_COMMIT_ATTRIB + "=true");
    final String upsertStatement =
PhoenixConfigurationUtil.getUpsertStatement(jobConf);
    final Properties phoenixProps = PropertiesUtil.extractProperties(new
Properties(), jobConf);
    String connUrl = QueryUtil.getConnectionUrl(phoenixProps, jobConf);

    rows.writeUsingOutputFormat(JDBCOutputFormat.buildJDBCOutputFormat()

.setDrivername(org.apache.phoenix.jdbc.PhoenixDriver.class.getCanonicalName())
        .setDBUrl(connUrl)
        .setQuery(upsertStatement)
        .setSqlTypes(new int[]{Types.VARCHAR, Types.VARCHAR, Types.VARCHAR})
        .finish());

    senv.execute();
  }

Best,
Flavio

On Wed, Sep 6, 2017 at 3:26 PM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi,
>
> According to the JavaDocs of java.sql.Connection, commit() will throw an
> exception if the connection is in auto commit mode which should be the
> default.
> So adding this change to the JdbcOutputFormat seems a bit risky.
>
> Maybe the Phoenix JDBC connector does not enable auto commits by default
> (or doesn't support it). Can you check that Flavio?
> If the Phoenix connector supports but not activates auto commits by
> default, we can enable it in JdbcOutputFormat.open().
> If auto commits are not supported, we can add a check after execute() and
> call commit() only if Connection.getAutoCommit() returns false.
>
> Best, Fabian
>
>
> 2017-09-06 11:38 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>
>> Hi to all,
>> I'm writing a job that uses Apache Phoenix.
>>
>> At first I used the PhoenixOutputFormat as (hadoop) OutputFormat but it's
>> not well suited to work with Table API because it cannot handle generic
>> objects like Rows (it need a DBWritable Object that should be already
>> present at compile time). So I've looked into the code of the
>> PhoenixOutputFormat and it's nothing else than a JDBCOutputFormat
>> (basically).
>>
>> However, to make it work I had to slightly modify the Flink
>> JDBCOutputformat, adding a dbConn.commit() after the executeBatch() on the
>> PreparedStatement. E.g:
>>
>>     upload.executeBatch();
>>     dbConn.commit();
>>
>> For the moment I've just created a dedicated PhoenixJdbcOutpuFormat where
>> I've added these 2 lines of code starting from the code of the
>> JDBCOutputformat (it couldn't be extended in this case because all fields
>> are private).
>>
>> What do you think about this? Should I open a ticket to add a connection
>> commit after executeBatch (in order to be compatible with Phoenix) or
>> something else (e.g. create a Phoenix connector that basically extend
>> JDBCOutputConnector and ovewrite 2 methods, changing also the visibility of
>> its fields to protected)?
>>
>> Best,
>> Flavio
>>
>>
>

Re: Apache Phenix integration

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

According to the JavaDocs of java.sql.Connection, commit() will throw an
exception if the connection is in auto commit mode which should be the
default.
So adding this change to the JdbcOutputFormat seems a bit risky.

Maybe the Phoenix JDBC connector does not enable auto commits by default
(or doesn't support it). Can you check that Flavio?
If the Phoenix connector supports but not activates auto commits by
default, we can enable it in JdbcOutputFormat.open().
If auto commits are not supported, we can add a check after execute() and
call commit() only if Connection.getAutoCommit() returns false.

Best, Fabian


2017-09-06 11:38 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:

> Hi to all,
> I'm writing a job that uses Apache Phoenix.
>
> At first I used the PhoenixOutputFormat as (hadoop) OutputFormat but it's
> not well suited to work with Table API because it cannot handle generic
> objects like Rows (it need a DBWritable Object that should be already
> present at compile time). So I've looked into the code of the
> PhoenixOutputFormat and it's nothing else than a JDBCOutputFormat
> (basically).
>
> However, to make it work I had to slightly modify the Flink
> JDBCOutputformat, adding a dbConn.commit() after the executeBatch() on the
> PreparedStatement. E.g:
>
>     upload.executeBatch();
>     dbConn.commit();
>
> For the moment I've just created a dedicated PhoenixJdbcOutpuFormat where
> I've added these 2 lines of code starting from the code of the
> JDBCOutputformat (it couldn't be extended in this case because all fields
> are private).
>
> What do you think about this? Should I open a ticket to add a connection
> commit after executeBatch (in order to be compatible with Phoenix) or
> something else (e.g. create a Phoenix connector that basically extend
> JDBCOutputConnector and ovewrite 2 methods, changing also the visibility of
> its fields to protected)?
>
> Best,
> Flavio
>
>