You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Yu Watanabe <yu...@gmail.com> on 2020/01/15 03:29:32 UTC

When does kafka connect commit messages to kafka broker ?

Hello.

I would like to ask question regarding to kafka connect.

I am trying to import over 1 million+ records from postgres to kafka
broker using kafka connect using jdbc driver .
However, it seems that topic for storing messages is not created immediately.

I have built my kafka-connect and broker using strimzi (0.15.0 , kafka 2.3.1 ).

When does kafka connect create topic to store messages in broker ?

Thanks,
Yu Watnaabe

-- 
Yu Watanabe

linkedin: www.linkedin.com/in/yuwatanabe1/
twitter:   twitter.com/yuwtennis

Re: When does kafka connect commit messages to kafka broker ?

Posted by Yu Watanabe <yu...@gmail.com>.
I appreciate if I could get some advice with this if anyone had
experienced similar issue.

Thanks,
Yu Watanabe

On Wed, Jan 15, 2020 at 10:53 PM Yu Watanabe <yu...@gmail.com> wrote:
>
> I found the reason. I am not sure if this is Azure problem  or Jdbc
> problem though...
>
> First of all my apology that I had not elaborated my environment.
>
> I use,
>
> DataSource: Azure PostgresSQL Server (Read Replica)
> Kafka Connect 2.3.1 (strimzi 0.15.0)
> Kafka Broker 2.3.1  (strimzi 0.15.0)
>
> In this condition,  I had below connect property.
>
> =============================================================================================================
> {
>   "name": "custom_query",
>   "config": {
>     "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
>     "connection.url" :
> "jdbc:postgresql://XXXX:5432/YYY?user=BBBB&password=CCCC&ssl=true&readOnly=true&defaultRowFetchSize=5000",
>     "mode" : "incrementing",
>     "incrementing.column.name" : "sid",
>     "validate.non.null": false,
>     "table.whitelist" : "DDD",
>     "topic.prefix" : "db_",
>     "tasks.max" : "1"
>   }
> }
> =============================================================================================================
>
> Above property executes below query.
>
> =============================================================================================================
> SELECT * FROM "public"."DDD" WHERE "public"."DDD"."sid" > ? ORDER BY
> "public"."DDD"."sid" ASC
> =============================================================================================================
>
> However, while testing kafka connect, I found that when "*"  is used,
> jdbc does not return any records.
> So, I changed the query condition by specifying specify column.
>
> =============================================================================================================
> {
>   "name": "custom_query",
>   "config": {
>     "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
>     "connection.url" :
> "jdbc:postgresql://XXXX:5432/YYY?user=BBBB&password=CCCC&ssl=true&readOnly=true&defaultRowFetchSize=5000",
>      "incrementing.column.name" : "sid",
>      "validate.non.null": false,
>      "query": "SELECT sid, m FROM DDD",
>      "topic.prefix" : "custom_query",
>      "tasks.max" : "1"
>   }
> }
> =============================================================================================================
>
> Now kafka connect started producing message to broker.
>
> Thanks,
> Yu Watanabe
>
> On Wed, Jan 15, 2020 at 12:29 PM Yu Watanabe <yu...@gmail.com> wrote:
> >
> > Hello.
> >
> > I would like to ask question regarding to kafka connect.
> >
> > I am trying to import over 1 million+ records from postgres to kafka
> > broker using kafka connect using jdbc driver .
> > However, it seems that topic for storing messages is not created immediately.
> >
> > I have built my kafka-connect and broker using strimzi (0.15.0 , kafka 2.3.1 ).
> >
> > When does kafka connect create topic to store messages in broker ?
> >
> > Thanks,
> > Yu Watnaabe
> >
> > --
> > Yu Watanabe
> >
> > linkedin: www.linkedin.com/in/yuwatanabe1/
> > twitter:   twitter.com/yuwtennis
>
>
>
> --
> Yu Watanabe
>
> linkedin: www.linkedin.com/in/yuwatanabe1/
> twitter:   twitter.com/yuwtennis



-- 
Yu Watanabe

linkedin: www.linkedin.com/in/yuwatanabe1/
twitter:   twitter.com/yuwtennis

Re: When does kafka connect commit messages to kafka broker ?

Posted by Yu Watanabe <yu...@gmail.com>.
I found the reason. I am not sure if this is Azure problem  or Jdbc
problem though...

First of all my apology that I had not elaborated my environment.

I use,

DataSource: Azure PostgresSQL Server (Read Replica)
Kafka Connect 2.3.1 (strimzi 0.15.0)
Kafka Broker 2.3.1  (strimzi 0.15.0)

In this condition,  I had below connect property.

=============================================================================================================
{
  "name": "custom_query",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url" :
"jdbc:postgresql://XXXX:5432/YYY?user=BBBB&password=CCCC&ssl=true&readOnly=true&defaultRowFetchSize=5000",
    "mode" : "incrementing",
    "incrementing.column.name" : "sid",
    "validate.non.null": false,
    "table.whitelist" : "DDD",
    "topic.prefix" : "db_",
    "tasks.max" : "1"
  }
}
=============================================================================================================

Above property executes below query.

=============================================================================================================
SELECT * FROM "public"."DDD" WHERE "public"."DDD"."sid" > ? ORDER BY
"public"."DDD"."sid" ASC
=============================================================================================================

However, while testing kafka connect, I found that when "*"  is used,
jdbc does not return any records.
So, I changed the query condition by specifying specify column.

=============================================================================================================
{
  "name": "custom_query",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url" :
"jdbc:postgresql://XXXX:5432/YYY?user=BBBB&password=CCCC&ssl=true&readOnly=true&defaultRowFetchSize=5000",
     "incrementing.column.name" : "sid",
     "validate.non.null": false,
     "query": "SELECT sid, m FROM DDD",
     "topic.prefix" : "custom_query",
     "tasks.max" : "1"
  }
}
=============================================================================================================

Now kafka connect started producing message to broker.

Thanks,
Yu Watanabe

On Wed, Jan 15, 2020 at 12:29 PM Yu Watanabe <yu...@gmail.com> wrote:
>
> Hello.
>
> I would like to ask question regarding to kafka connect.
>
> I am trying to import over 1 million+ records from postgres to kafka
> broker using kafka connect using jdbc driver .
> However, it seems that topic for storing messages is not created immediately.
>
> I have built my kafka-connect and broker using strimzi (0.15.0 , kafka 2.3.1 ).
>
> When does kafka connect create topic to store messages in broker ?
>
> Thanks,
> Yu Watnaabe
>
> --
> Yu Watanabe
>
> linkedin: www.linkedin.com/in/yuwatanabe1/
> twitter:   twitter.com/yuwtennis



-- 
Yu Watanabe

linkedin: www.linkedin.com/in/yuwatanabe1/
twitter:   twitter.com/yuwtennis