You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Srikrishna Alla <al...@gmail.com> on 2016/09/30 15:38:39 UTC

Kafka connect jdbc Sink connector issues when moving from MySQL to Oracle DB

Hi,

I am facing issues with jdbc Sink Connector when working with Oracle DB.
This functionality was working fine when I was using MySQL DB.

First error I had was when trying to create table using auto.create = true.
It tried to create table for STRING fields as NVARCHAR2(4000) (which I see
is by default what will be used for STRING Schema Type). This failed as
NVARCHAR2 has support only till 2000.

To rectify this, I created the table and ran connector again expecting it
to write to the DB. Now, I am getting the following error -

[2016-09-30 10:21:16,627] ERROR Task is being killed and will not recover
until manually restarted:
(org.apache.kafka.connect.runtime.WorkerSinkTask:303)
org.apache.kafka.connect.errors.ConnectException: Cannot ALTER to add
missing field SinkRecordField{type=STRING, name='app_name',
isOptional=false}, as it is not optional and does not have a default value
           at
com.att.bdcoe.data_platform.etl.kafka_connectors.jdbc_connector.sink.DbStructure.amendIfNecessary(DbStructure.java:117)
           at
com.att.bdcoe.data_platform.etl.kafka_connectors.jdbc_connector.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:59)
           at
com.att.bdcoe.data_platform.etl.kafka_connectors.jdbc_connector.sink.AlertWriter.write(AlertWriter.java:57)
           at
com.att.bdcoe.data_platform.etl.kafka_connectors.jdbc_connector.sink.EtlJdbcSinkTask.put(EtlJdbcSinkTask.java:53)
           at
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:280)
           at
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:176)
           at
org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
           at
org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
           at
org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
[2016-09-30 10:21:16,629] ERROR Thread WorkerSinkTask-jdbc-sink-connector-0
exiting with uncaught exception:
(org.apache.kafka.connect.util.ShutdownableThread:84)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask
due to unrecoverable exception.
           at
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:304)
           at
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:176)
           at
org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
           at
org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
           at
org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
Exception in thread "WorkerSinkTask-jdbc-sink-connector-0"
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask
due to unrecoverable exception.
           at
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:304)
           at
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:176)
           at
org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
           at
org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
           at
org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)

Please let me know if anyone has seen this issue before.

Thanks,
Sri

Re: Kafka connect jdbc Sink connector issues when moving from MySQL to Oracle DB

Posted by Srikrishna Alla <al...@gmail.com>.
Thanks for the response Shikar. The issue was happening because the table
metadata was sending the column names of the table in upper case and the
connector is expecting the table names to be in lower case. I fixed it by
creating the table with table columns like "updated_by". This way, I am no
longer having this issue. Regd the NVARCHAR2(4000) issue, Oracle DB threw
an error saying out of range for the data_type.

Thanks,
Sri

On Fri, Sep 30, 2016 at 2:03 PM, Shikhar Bhushan <sh...@confluent.io>
wrote:

> Hi Srikrishna,
>
> For future please address questions related to Confluent's connectors to
> the relevant ML (https://groups.google.com/forum/#!forum/confluent-
> platform
> ).
>
> The NVARCHAR2(4000) mapping for string types for Oracle was based on my
> reading of the documentation which states it can hold up to 4000
> characters, and when auto-creating a table we're aiming for the most
> broadly-applicable datatype that makes sense. You mention that it only
> supports upto 2000, is that because the max limit is overridable at the
> db-level? Perhaps CLOB make more sense?
>
> The error you are running into is because the connector refuses to issue
> ALTER's that will add columns which are required (not optional, and no
> default value), as that is potentially unsafe. You will have to add
> the 'app_name'
> column manually. Alternately if you don't require that column to be
> propagated to the database, you can use the `fields.whitelist`
> configuration to whitelist the desired fields.
>
> Best,
>
> Shikhar
>
> On Fri, Sep 30, 2016 at 8:38 AM Srikrishna Alla <allasrikrishna1@gmail.com
> >
> wrote:
>
> > Hi,
> >
> > I am facing issues with jdbc Sink Connector when working with Oracle DB.
> > This functionality was working fine when I was using MySQL DB.
> >
> > First error I had was when trying to create table using auto.create =
> true.
> > It tried to create table for STRING fields as NVARCHAR2(4000) (which I
> see
> > is by default what will be used for STRING Schema Type). This failed as
> > NVARCHAR2 has support only till 2000.
> >
> > To rectify this, I created the table and ran connector again expecting it
> > to write to the DB. Now, I am getting the following error -
> >
> > [2016-09-30 10:21:16,627] ERROR Task is being killed and will not recover
> > until manually restarted:
> > (org.apache.kafka.connect.runtime.WorkerSinkTask:303)
> > org.apache.kafka.connect.errors.ConnectException: Cannot ALTER to add
> > missing field SinkRecordField{type=STRING, name='app_name',
> > isOptional=false}, as it is not optional and does not have a default
> value
> >            at
> >
> > com.att.bdcoe.data_platform.etl.kafka_connectors.jdbc_
> connector.sink.DbStructure.amendIfNecessary(DbStructure.java:117)
> >            at
> >
> > com.att.bdcoe.data_platform.etl.kafka_connectors.jdbc_
> connector.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:59)
> >            at
> >
> > com.att.bdcoe.data_platform.etl.kafka_connectors.jdbc_
> connector.sink.AlertWriter.write(AlertWriter.java:57)
> >            at
> >
> > com.att.bdcoe.data_platform.etl.kafka_connectors.jdbc_connector.sink.
> EtlJdbcSinkTask.put(EtlJdbcSinkTask.java:53)
> >            at
> >
> > org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(
> WorkerSinkTask.java:280)
> >            at
> >
> > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(
> WorkerSinkTask.java:176)
> >            at
> >
> > org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(
> WorkerSinkTaskThread.java:90)
> >            at
> >
> > org.apache.kafka.connect.runtime.WorkerSinkTaskThread.
> execute(WorkerSinkTaskThread.java:58)
> >            at
> >
> > org.apache.kafka.connect.util.ShutdownableThread.run(
> ShutdownableThread.java:82)
> > [2016-09-30 10:21:16,629] ERROR Thread WorkerSinkTask-jdbc-sink-
> connector-0
> > exiting with uncaught exception:
> > (org.apache.kafka.connect.util.ShutdownableThread:84)
> > org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask
> > due to unrecoverable exception.
> >            at
> >
> > org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(
> WorkerSinkTask.java:304)
> >            at
> >
> > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(
> WorkerSinkTask.java:176)
> >            at
> >
> > org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(
> WorkerSinkTaskThread.java:90)
> >            at
> >
> > org.apache.kafka.connect.runtime.WorkerSinkTaskThread.
> execute(WorkerSinkTaskThread.java:58)
> >            at
> >
> > org.apache.kafka.connect.util.ShutdownableThread.run(
> ShutdownableThread.java:82)
> > Exception in thread "WorkerSinkTask-jdbc-sink-connector-0"
> > org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask
> > due to unrecoverable exception.
> >            at
> >
> > org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(
> WorkerSinkTask.java:304)
> >            at
> >
> > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(
> WorkerSinkTask.java:176)
> >            at
> >
> > org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(
> WorkerSinkTaskThread.java:90)
> >            at
> >
> > org.apache.kafka.connect.runtime.WorkerSinkTaskThread.
> execute(WorkerSinkTaskThread.java:58)
> >            at
> >
> > org.apache.kafka.connect.util.ShutdownableThread.run(
> ShutdownableThread.java:82)
> >
> > Please let me know if anyone has seen this issue before.
> >
> > Thanks,
> > Sri
> >
>

Re: Kafka connect jdbc Sink connector issues when moving from MySQL to Oracle DB

Posted by Shikhar Bhushan <sh...@confluent.io>.
Hi Srikrishna,

For future please address questions related to Confluent's connectors to
the relevant ML (https://groups.google.com/forum/#!forum/confluent-platform
).

The NVARCHAR2(4000) mapping for string types for Oracle was based on my
reading of the documentation which states it can hold up to 4000
characters, and when auto-creating a table we're aiming for the most
broadly-applicable datatype that makes sense. You mention that it only
supports upto 2000, is that because the max limit is overridable at the
db-level? Perhaps CLOB make more sense?

The error you are running into is because the connector refuses to issue
ALTER's that will add columns which are required (not optional, and no
default value), as that is potentially unsafe. You will have to add
the 'app_name'
column manually. Alternately if you don't require that column to be
propagated to the database, you can use the `fields.whitelist`
configuration to whitelist the desired fields.

Best,

Shikhar

On Fri, Sep 30, 2016 at 8:38 AM Srikrishna Alla <al...@gmail.com>
wrote:

> Hi,
>
> I am facing issues with jdbc Sink Connector when working with Oracle DB.
> This functionality was working fine when I was using MySQL DB.
>
> First error I had was when trying to create table using auto.create = true.
> It tried to create table for STRING fields as NVARCHAR2(4000) (which I see
> is by default what will be used for STRING Schema Type). This failed as
> NVARCHAR2 has support only till 2000.
>
> To rectify this, I created the table and ran connector again expecting it
> to write to the DB. Now, I am getting the following error -
>
> [2016-09-30 10:21:16,627] ERROR Task is being killed and will not recover
> until manually restarted:
> (org.apache.kafka.connect.runtime.WorkerSinkTask:303)
> org.apache.kafka.connect.errors.ConnectException: Cannot ALTER to add
> missing field SinkRecordField{type=STRING, name='app_name',
> isOptional=false}, as it is not optional and does not have a default value
>            at
>
> com.att.bdcoe.data_platform.etl.kafka_connectors.jdbc_connector.sink.DbStructure.amendIfNecessary(DbStructure.java:117)
>            at
>
> com.att.bdcoe.data_platform.etl.kafka_connectors.jdbc_connector.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:59)
>            at
>
> com.att.bdcoe.data_platform.etl.kafka_connectors.jdbc_connector.sink.AlertWriter.write(AlertWriter.java:57)
>            at
>
> com.att.bdcoe.data_platform.etl.kafka_connectors.jdbc_connector.sink.EtlJdbcSinkTask.put(EtlJdbcSinkTask.java:53)
>            at
>
> org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:280)
>            at
>
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:176)
>            at
>
> org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
>            at
>
> org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
>            at
>
> org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
> [2016-09-30 10:21:16,629] ERROR Thread WorkerSinkTask-jdbc-sink-connector-0
> exiting with uncaught exception:
> (org.apache.kafka.connect.util.ShutdownableThread:84)
> org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask
> due to unrecoverable exception.
>            at
>
> org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:304)
>            at
>
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:176)
>            at
>
> org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
>            at
>
> org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
>            at
>
> org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
> Exception in thread "WorkerSinkTask-jdbc-sink-connector-0"
> org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask
> due to unrecoverable exception.
>            at
>
> org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:304)
>            at
>
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:176)
>            at
>
> org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
>            at
>
> org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
>            at
>
> org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
>
> Please let me know if anyone has seen this issue before.
>
> Thanks,
> Sri
>