You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Yuanzhe Yang <yy...@gmail.com> on 2017/01/03 12:03:25 UTC

how to ingest a database with a Kafka Connect cluster in parallel?

Hi all,

I am trying to run a Kafka Connect cluster to ingest data from a relational
database with jdbc connector.

I have been investigating many other solutions including Spark, Flink and
Flume before using Kafka Connect, but none of them can be used to ingest
relational databases in a clusterable way. With "cluster" I mean ingesting
one database with several distributed processes in parallel, instead of
each process in the cluster ingesting different databases. Kafka Connect is
the option I am investigating currently. After reading the documentation, I
have not found any clear statement about if my use case can be supported,
so I have to make a test to figure it out.

I created a cluster with the following docker container configuration:

---
version: '2'
services:
 zookeeper:
   image: confluentinc/cp-zookeeper
   hostname: zookeeper
   ports:
     - "2181"
   environment:
     ZOOKEEPER_CLIENT_PORT: 2181
     ZOOKEEPER_TICK_TIME: 2000

  broker1:
   image: confluentinc/cp-kafka
   hostname: broker1
   depends_on:
     - zookeeper
   ports:
     - '9092'
   environment:
     KAFKA_BROKER_ID: 1
     KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
     KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker1:9092'

  broker2:
   image: confluentinc/cp-kafka
   hostname: broker2
   depends_on:
     - zookeeper
   ports:
     - '9092'
   environment:
     KAFKA_BROKER_ID: 2
     KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
     KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker2:9092'

  broker3:
   image: confluentinc/cp-kafka
   hostname: broker3
   depends_on:
     - zookeeper
   ports:
     - '9092'
   environment:
     KAFKA_BROKER_ID: 3
     KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
     KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker3:9092'

  schema_registry:
   image: confluentinc/cp-schema-registry
   hostname: schema_registry
   depends_on:
     - zookeeper
     - broker1
     - broker2
     - broker3
   ports:
     - '8081'
   environment:
     SCHEMA_REGISTRY_HOST_NAME: schema_registry
     SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'

  connect1:
   image: confluentinc/cp-kafka-connect
   hostname: connect1
   depends_on:
     - zookeeper
     - broker1
     - broker2
     - broker3
     - schema_registry
   ports:
     - "8083"
   environment:
     CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,broker3:9092'
     CONNECT_REST_ADVERTISED_HOST_NAME: connect1
     CONNECT_REST_PORT: 8083
     CONNECT_GROUP_ID: compose-connect-group
     CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
     CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
     CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
     CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
     CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8081
'
     CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
     CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
http://schema_registry:8081'
     CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
JsonConverter
     CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.
JsonConverter
     CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'

  connect2:
   image: confluentinc/cp-kafka-connect
   hostname: connect2
   depends_on:
     - zookeeper
     - broker1
     - broker2
     - broker3
     - schema_registry
   ports:
     - "8083"
   environment:
     CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,broker3:9092'
     CONNECT_REST_ADVERTISED_HOST_NAME: connect2
     CONNECT_REST_PORT: 8083
     CONNECT_GROUP_ID: compose-connect-group
     CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
     CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
     CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
     CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
     CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8081
'
     CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
     CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
http://schema_registry:8081'
     CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
JsonConverter
     CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.
JsonConverter
     CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'

  connect3:
   image: confluentinc/cp-kafka-connect
   hostname: connect3
   depends_on:
     - zookeeper
     - broker1
     - broker2
     - broker3
     - schema_registry
   ports:
     - "8083"
   environment:
     CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,broker3:9092'
     CONNECT_REST_ADVERTISED_HOST_NAME: connect3
     CONNECT_REST_PORT: 8083
     CONNECT_GROUP_ID: compose-connect-group
     CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
     CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
     CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
     CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
     CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8081
'
     CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
     CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
http://schema_registry:8081'
     CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
JsonConverter
     CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.
JsonConverter
     CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'

  control-center:
   image: confluentinc/cp-enterprise-control-center
   depends_on:
     - zookeeper
     - broker1
     - broker2
     - broker3
     - schema_registry
     - connect1
     - connect2
     - connect3
   ports:
     - "9021:9021"
   environment:
     CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,bro
ker3:9092'
     CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
     CONTROL_CENTER_CONNECT_CLUSTER: 'connect1:8083,connect2:8083,c
onnect3:8083'
     CONTROL_CENTER_REPLICATION_FACTOR: 1
     CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
     CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
     PORT: 9021

  postgres:
   image: postgres
   hostname: postgres
   ports:
     - "5432"
   environment:
     POSTGRES_PASSWORD: postgres

The Kafka cluster is running properly, but I don't know how to verify if
the Kafka Connect cluster is running properly. I prepared some test data in
the database, and created a source connector with the following
configuration:

{
 "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
 "name": "test",
 "tasks.max": 3,
 "key.converter": "org.apache.kafka.connect.json.JsonConverter",
 "value.converter": "org.apache.kafka.connect.json.JsonConverter",
 "connection.url": "jdbc:postgresql://postgres:5432/postgres?user=postgres&
password=postgres",
 "table.whitelist": "pgbench_accounts",
 "batch.max.rows": 1,
 "topic.prefix": "test",
 "mode": "incrementing",
 "incrementing.column.name": "aid"
}

The ingestion process is correct and I can consume the produced messages.
But I still have no way to figure out if the ingestion is parallelized. I
called the status API and received the following result:

{
   "name":"test",
   "connector":{
      "state":"RUNNING",
      "worker_id":"connect2:8083"
   },
   "tasks":[
      {
         "state":"RUNNING",
         "id":0,
         "worker_id":"connect3:8083"
      }
   ]
}

This result is the same for all instances. Does it mean the ingestion tasks
are not parallelized? Is there anything important I am missing or this type
of clustering is simply not supported?

Any comments and suggestions are highly appreciated. Have a nice day!

Best regards,
Yang

Re: how to ingest a database with a Kafka Connect cluster in parallel?

Posted by Yuanzhe Yang <yy...@gmail.com>.
Hi Ewen,

OK. Thanks a lot for your feedback!

Best regards,
Yang

2017-01-03 22:42 GMT+01:00 Ewen Cheslack-Postava <ew...@confluent.io>:

> It's an implementation detail of the JDBC connector. You could potentially
> write a connector that parallelizes at that level, but you lose other
> potentially useful properties (e.g. ordering). To split at this level you'd
> have to do something like have each task be responsible for a subset of
> rowids in the database.
>
> -Ewen
>
> On Tue, Jan 3, 2017 at 1:24 PM, Yuanzhe Yang <yy...@gmail.com> wrote:
>
> > Hi Ewen,
> >
> > Thanks a lot for your reply. So it means we cannot parallelize ingestion
> of
> > one table with multiple processes. Is it because of Kafka Connect or the
> > JDBC connector?
> >
> > Have a nice day.
> >
> > Best regards,
> > Yang
> >
> >
> > 2017-01-03 20:55 GMT+01:00 Ewen Cheslack-Postava <ew...@confluent.io>:
> >
> > > The unit of parallelism in connect is a task. It's only listing one
> task,
> > > so you only have one process copying data. The connector can consume
> data
> > > from within a single *database* in parallel, but each *table* must be
> > > handled by a single task. Since your table whitelist only includes a
> > single
> > > table, the connector will only generate a single task. If you add more
> > > tables to the whitelist then you'll see more tasks in the status API
> > > output.
> > >
> > > -Ewen
> > >
> > > On Tue, Jan 3, 2017 at 4:03 AM, Yuanzhe Yang <yy...@gmail.com>
> wrote:
> > >
> > > > Hi all,
> > > >
> > > > I am trying to run a Kafka Connect cluster to ingest data from a
> > > relational
> > > > database with jdbc connector.
> > > >
> > > > I have been investigating many other solutions including Spark, Flink
> > and
> > > > Flume before using Kafka Connect, but none of them can be used to
> > ingest
> > > > relational databases in a clusterable way. With "cluster" I mean
> > > ingesting
> > > > one database with several distributed processes in parallel, instead
> of
> > > > each process in the cluster ingesting different databases. Kafka
> > Connect
> > > is
> > > > the option I am investigating currently. After reading the
> > > documentation, I
> > > > have not found any clear statement about if my use case can be
> > supported,
> > > > so I have to make a test to figure it out.
> > > >
> > > > I created a cluster with the following docker container
> configuration:
> > > >
> > > > ---
> > > > version: '2'
> > > > services:
> > > >  zookeeper:
> > > >    image: confluentinc/cp-zookeeper
> > > >    hostname: zookeeper
> > > >    ports:
> > > >      - "2181"
> > > >    environment:
> > > >      ZOOKEEPER_CLIENT_PORT: 2181
> > > >      ZOOKEEPER_TICK_TIME: 2000
> > > >
> > > >   broker1:
> > > >    image: confluentinc/cp-kafka
> > > >    hostname: broker1
> > > >    depends_on:
> > > >      - zookeeper
> > > >    ports:
> > > >      - '9092'
> > > >    environment:
> > > >      KAFKA_BROKER_ID: 1
> > > >      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > > >      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker1:9092'
> > > >
> > > >   broker2:
> > > >    image: confluentinc/cp-kafka
> > > >    hostname: broker2
> > > >    depends_on:
> > > >      - zookeeper
> > > >    ports:
> > > >      - '9092'
> > > >    environment:
> > > >      KAFKA_BROKER_ID: 2
> > > >      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > > >      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker2:9092'
> > > >
> > > >   broker3:
> > > >    image: confluentinc/cp-kafka
> > > >    hostname: broker3
> > > >    depends_on:
> > > >      - zookeeper
> > > >    ports:
> > > >      - '9092'
> > > >    environment:
> > > >      KAFKA_BROKER_ID: 3
> > > >      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > > >      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker3:9092'
> > > >
> > > >   schema_registry:
> > > >    image: confluentinc/cp-schema-registry
> > > >    hostname: schema_registry
> > > >    depends_on:
> > > >      - zookeeper
> > > >      - broker1
> > > >      - broker2
> > > >      - broker3
> > > >    ports:
> > > >      - '8081'
> > > >    environment:
> > > >      SCHEMA_REGISTRY_HOST_NAME: schema_registry
> > > >      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
> > > >
> > > >   connect1:
> > > >    image: confluentinc/cp-kafka-connect
> > > >    hostname: connect1
> > > >    depends_on:
> > > >      - zookeeper
> > > >      - broker1
> > > >      - broker2
> > > >      - broker3
> > > >      - schema_registry
> > > >    ports:
> > > >      - "8083"
> > > >    environment:
> > > >      CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,
> > broker3:9092'
> > > >      CONNECT_REST_ADVERTISED_HOST_NAME: connect1
> > > >      CONNECT_REST_PORT: 8083
> > > >      CONNECT_GROUP_ID: compose-connect-group
> > > >      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
> > > >      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
> > > >      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
> > > >      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
> > > >      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: '
> > > > http://schema_registry:8081
> > > > '
> > > >      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.
> AvroConverter
> > > >      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
> > > > http://schema_registry:8081'
> > > >      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
> > > > JsonConverter
> > > >      CONNECT_INTERNAL_VALUE_CONVERTER:
> org.apache.kafka.connect.json.
> > > > JsonConverter
> > > >      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > > >
> > > >   connect2:
> > > >    image: confluentinc/cp-kafka-connect
> > > >    hostname: connect2
> > > >    depends_on:
> > > >      - zookeeper
> > > >      - broker1
> > > >      - broker2
> > > >      - broker3
> > > >      - schema_registry
> > > >    ports:
> > > >      - "8083"
> > > >    environment:
> > > >      CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,
> > broker3:9092'
> > > >      CONNECT_REST_ADVERTISED_HOST_NAME: connect2
> > > >      CONNECT_REST_PORT: 8083
> > > >      CONNECT_GROUP_ID: compose-connect-group
> > > >      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
> > > >      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
> > > >      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
> > > >      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
> > > >      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: '
> > > > http://schema_registry:8081
> > > > '
> > > >      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.
> AvroConverter
> > > >      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
> > > > http://schema_registry:8081'
> > > >      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
> > > > JsonConverter
> > > >      CONNECT_INTERNAL_VALUE_CONVERTER:
> org.apache.kafka.connect.json.
> > > > JsonConverter
> > > >      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > > >
> > > >   connect3:
> > > >    image: confluentinc/cp-kafka-connect
> > > >    hostname: connect3
> > > >    depends_on:
> > > >      - zookeeper
> > > >      - broker1
> > > >      - broker2
> > > >      - broker3
> > > >      - schema_registry
> > > >    ports:
> > > >      - "8083"
> > > >    environment:
> > > >      CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,
> > broker3:9092'
> > > >      CONNECT_REST_ADVERTISED_HOST_NAME: connect3
> > > >      CONNECT_REST_PORT: 8083
> > > >      CONNECT_GROUP_ID: compose-connect-group
> > > >      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
> > > >      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
> > > >      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
> > > >      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
> > > >      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: '
> > > > http://schema_registry:8081
> > > > '
> > > >      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.
> AvroConverter
> > > >      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
> > > > http://schema_registry:8081'
> > > >      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
> > > > JsonConverter
> > > >      CONNECT_INTERNAL_VALUE_CONVERTER:
> org.apache.kafka.connect.json.
> > > > JsonConverter
> > > >      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > > >
> > > >   control-center:
> > > >    image: confluentinc/cp-enterprise-control-center
> > > >    depends_on:
> > > >      - zookeeper
> > > >      - broker1
> > > >      - broker2
> > > >      - broker3
> > > >      - schema_registry
> > > >      - connect1
> > > >      - connect2
> > > >      - connect3
> > > >    ports:
> > > >      - "9021:9021"
> > > >    environment:
> > > >      CONTROL_CENTER_BOOTSTRAP_SERVERS:
> 'broker1:9092,broker2:9092,bro
> > > > ker3:9092'
> > > >      CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > > >      CONTROL_CENTER_CONNECT_CLUSTER: 'connect1:8083,connect2:8083,c
> > > > onnect3:8083'
> > > >      CONTROL_CENTER_REPLICATION_FACTOR: 1
> > > >      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
> > > >      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
> > > >      PORT: 9021
> > > >
> > > >   postgres:
> > > >    image: postgres
> > > >    hostname: postgres
> > > >    ports:
> > > >      - "5432"
> > > >    environment:
> > > >      POSTGRES_PASSWORD: postgres
> > > >
> > > > The Kafka cluster is running properly, but I don't know how to verify
> > if
> > > > the Kafka Connect cluster is running properly. I prepared some test
> > data
> > > in
> > > > the database, and created a source connector with the following
> > > > configuration:
> > > >
> > > > {
> > > >  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
> > > >  "name": "test",
> > > >  "tasks.max": 3,
> > > >  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
> > > >  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
> > > >  "connection.url": "jdbc:postgresql://postgres:
> > > > 5432/postgres?user=postgres&
> > > > password=postgres",
> > > >  "table.whitelist": "pgbench_accounts",
> > > >  "batch.max.rows": 1,
> > > >  "topic.prefix": "test",
> > > >  "mode": "incrementing",
> > > >  "incrementing.column.name": "aid"
> > > > }
> > > >
> > > > The ingestion process is correct and I can consume the produced
> > messages.
> > > > But I still have no way to figure out if the ingestion is
> > parallelized. I
> > > > called the status API and received the following result:
> > > >
> > > > {
> > > >    "name":"test",
> > > >    "connector":{
> > > >       "state":"RUNNING",
> > > >       "worker_id":"connect2:8083"
> > > >    },
> > > >    "tasks":[
> > > >       {
> > > >          "state":"RUNNING",
> > > >          "id":0,
> > > >          "worker_id":"connect3:8083"
> > > >       }
> > > >    ]
> > > > }
> > > >
> > > > This result is the same for all instances. Does it mean the ingestion
> > > tasks
> > > > are not parallelized? Is there anything important I am missing or
> this
> > > type
> > > > of clustering is simply not supported?
> > > >
> > > > Any comments and suggestions are highly appreciated. Have a nice day!
> > > >
> > > > Best regards,
> > > > Yang
> > > >
> > >
> >
>

Re: how to ingest a database with a Kafka Connect cluster in parallel?

Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
It's an implementation detail of the JDBC connector. You could potentially
write a connector that parallelizes at that level, but you lose other
potentially useful properties (e.g. ordering). To split at this level you'd
have to do something like have each task be responsible for a subset of
rowids in the database.

-Ewen

On Tue, Jan 3, 2017 at 1:24 PM, Yuanzhe Yang <yy...@gmail.com> wrote:

> Hi Ewen,
>
> Thanks a lot for your reply. So it means we cannot parallelize ingestion of
> one table with multiple processes. Is it because of Kafka Connect or the
> JDBC connector?
>
> Have a nice day.
>
> Best regards,
> Yang
>
>
> 2017-01-03 20:55 GMT+01:00 Ewen Cheslack-Postava <ew...@confluent.io>:
>
> > The unit of parallelism in connect is a task. It's only listing one task,
> > so you only have one process copying data. The connector can consume data
> > from within a single *database* in parallel, but each *table* must be
> > handled by a single task. Since your table whitelist only includes a
> single
> > table, the connector will only generate a single task. If you add more
> > tables to the whitelist then you'll see more tasks in the status API
> > output.
> >
> > -Ewen
> >
> > On Tue, Jan 3, 2017 at 4:03 AM, Yuanzhe Yang <yy...@gmail.com> wrote:
> >
> > > Hi all,
> > >
> > > I am trying to run a Kafka Connect cluster to ingest data from a
> > relational
> > > database with jdbc connector.
> > >
> > > I have been investigating many other solutions including Spark, Flink
> and
> > > Flume before using Kafka Connect, but none of them can be used to
> ingest
> > > relational databases in a clusterable way. With "cluster" I mean
> > ingesting
> > > one database with several distributed processes in parallel, instead of
> > > each process in the cluster ingesting different databases. Kafka
> Connect
> > is
> > > the option I am investigating currently. After reading the
> > documentation, I
> > > have not found any clear statement about if my use case can be
> supported,
> > > so I have to make a test to figure it out.
> > >
> > > I created a cluster with the following docker container configuration:
> > >
> > > ---
> > > version: '2'
> > > services:
> > >  zookeeper:
> > >    image: confluentinc/cp-zookeeper
> > >    hostname: zookeeper
> > >    ports:
> > >      - "2181"
> > >    environment:
> > >      ZOOKEEPER_CLIENT_PORT: 2181
> > >      ZOOKEEPER_TICK_TIME: 2000
> > >
> > >   broker1:
> > >    image: confluentinc/cp-kafka
> > >    hostname: broker1
> > >    depends_on:
> > >      - zookeeper
> > >    ports:
> > >      - '9092'
> > >    environment:
> > >      KAFKA_BROKER_ID: 1
> > >      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > >      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker1:9092'
> > >
> > >   broker2:
> > >    image: confluentinc/cp-kafka
> > >    hostname: broker2
> > >    depends_on:
> > >      - zookeeper
> > >    ports:
> > >      - '9092'
> > >    environment:
> > >      KAFKA_BROKER_ID: 2
> > >      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > >      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker2:9092'
> > >
> > >   broker3:
> > >    image: confluentinc/cp-kafka
> > >    hostname: broker3
> > >    depends_on:
> > >      - zookeeper
> > >    ports:
> > >      - '9092'
> > >    environment:
> > >      KAFKA_BROKER_ID: 3
> > >      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > >      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker3:9092'
> > >
> > >   schema_registry:
> > >    image: confluentinc/cp-schema-registry
> > >    hostname: schema_registry
> > >    depends_on:
> > >      - zookeeper
> > >      - broker1
> > >      - broker2
> > >      - broker3
> > >    ports:
> > >      - '8081'
> > >    environment:
> > >      SCHEMA_REGISTRY_HOST_NAME: schema_registry
> > >      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
> > >
> > >   connect1:
> > >    image: confluentinc/cp-kafka-connect
> > >    hostname: connect1
> > >    depends_on:
> > >      - zookeeper
> > >      - broker1
> > >      - broker2
> > >      - broker3
> > >      - schema_registry
> > >    ports:
> > >      - "8083"
> > >    environment:
> > >      CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,
> broker3:9092'
> > >      CONNECT_REST_ADVERTISED_HOST_NAME: connect1
> > >      CONNECT_REST_PORT: 8083
> > >      CONNECT_GROUP_ID: compose-connect-group
> > >      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
> > >      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
> > >      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
> > >      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
> > >      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: '
> > > http://schema_registry:8081
> > > '
> > >      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
> > >      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
> > > http://schema_registry:8081'
> > >      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
> > > JsonConverter
> > >      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.
> > > JsonConverter
> > >      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > >
> > >   connect2:
> > >    image: confluentinc/cp-kafka-connect
> > >    hostname: connect2
> > >    depends_on:
> > >      - zookeeper
> > >      - broker1
> > >      - broker2
> > >      - broker3
> > >      - schema_registry
> > >    ports:
> > >      - "8083"
> > >    environment:
> > >      CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,
> broker3:9092'
> > >      CONNECT_REST_ADVERTISED_HOST_NAME: connect2
> > >      CONNECT_REST_PORT: 8083
> > >      CONNECT_GROUP_ID: compose-connect-group
> > >      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
> > >      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
> > >      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
> > >      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
> > >      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: '
> > > http://schema_registry:8081
> > > '
> > >      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
> > >      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
> > > http://schema_registry:8081'
> > >      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
> > > JsonConverter
> > >      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.
> > > JsonConverter
> > >      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > >
> > >   connect3:
> > >    image: confluentinc/cp-kafka-connect
> > >    hostname: connect3
> > >    depends_on:
> > >      - zookeeper
> > >      - broker1
> > >      - broker2
> > >      - broker3
> > >      - schema_registry
> > >    ports:
> > >      - "8083"
> > >    environment:
> > >      CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,
> broker3:9092'
> > >      CONNECT_REST_ADVERTISED_HOST_NAME: connect3
> > >      CONNECT_REST_PORT: 8083
> > >      CONNECT_GROUP_ID: compose-connect-group
> > >      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
> > >      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
> > >      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
> > >      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
> > >      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: '
> > > http://schema_registry:8081
> > > '
> > >      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
> > >      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
> > > http://schema_registry:8081'
> > >      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
> > > JsonConverter
> > >      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.
> > > JsonConverter
> > >      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > >
> > >   control-center:
> > >    image: confluentinc/cp-enterprise-control-center
> > >    depends_on:
> > >      - zookeeper
> > >      - broker1
> > >      - broker2
> > >      - broker3
> > >      - schema_registry
> > >      - connect1
> > >      - connect2
> > >      - connect3
> > >    ports:
> > >      - "9021:9021"
> > >    environment:
> > >      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,bro
> > > ker3:9092'
> > >      CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > >      CONTROL_CENTER_CONNECT_CLUSTER: 'connect1:8083,connect2:8083,c
> > > onnect3:8083'
> > >      CONTROL_CENTER_REPLICATION_FACTOR: 1
> > >      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
> > >      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
> > >      PORT: 9021
> > >
> > >   postgres:
> > >    image: postgres
> > >    hostname: postgres
> > >    ports:
> > >      - "5432"
> > >    environment:
> > >      POSTGRES_PASSWORD: postgres
> > >
> > > The Kafka cluster is running properly, but I don't know how to verify
> if
> > > the Kafka Connect cluster is running properly. I prepared some test
> data
> > in
> > > the database, and created a source connector with the following
> > > configuration:
> > >
> > > {
> > >  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
> > >  "name": "test",
> > >  "tasks.max": 3,
> > >  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
> > >  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
> > >  "connection.url": "jdbc:postgresql://postgres:
> > > 5432/postgres?user=postgres&
> > > password=postgres",
> > >  "table.whitelist": "pgbench_accounts",
> > >  "batch.max.rows": 1,
> > >  "topic.prefix": "test",
> > >  "mode": "incrementing",
> > >  "incrementing.column.name": "aid"
> > > }
> > >
> > > The ingestion process is correct and I can consume the produced
> messages.
> > > But I still have no way to figure out if the ingestion is
> parallelized. I
> > > called the status API and received the following result:
> > >
> > > {
> > >    "name":"test",
> > >    "connector":{
> > >       "state":"RUNNING",
> > >       "worker_id":"connect2:8083"
> > >    },
> > >    "tasks":[
> > >       {
> > >          "state":"RUNNING",
> > >          "id":0,
> > >          "worker_id":"connect3:8083"
> > >       }
> > >    ]
> > > }
> > >
> > > This result is the same for all instances. Does it mean the ingestion
> > tasks
> > > are not parallelized? Is there anything important I am missing or this
> > type
> > > of clustering is simply not supported?
> > >
> > > Any comments and suggestions are highly appreciated. Have a nice day!
> > >
> > > Best regards,
> > > Yang
> > >
> >
>

Re: how to ingest a database with a Kafka Connect cluster in parallel?

Posted by Yuanzhe Yang <yy...@gmail.com>.
Hi Ewen,

Thanks a lot for your reply. So it means we cannot parallelize ingestion of
one table with multiple processes. Is it because of Kafka Connect or the
JDBC connector?

Have a nice day.

Best regards,
Yang


2017-01-03 20:55 GMT+01:00 Ewen Cheslack-Postava <ew...@confluent.io>:

> The unit of parallelism in connect is a task. It's only listing one task,
> so you only have one process copying data. The connector can consume data
> from within a single *database* in parallel, but each *table* must be
> handled by a single task. Since your table whitelist only includes a single
> table, the connector will only generate a single task. If you add more
> tables to the whitelist then you'll see more tasks in the status API
> output.
>
> -Ewen
>
> On Tue, Jan 3, 2017 at 4:03 AM, Yuanzhe Yang <yy...@gmail.com> wrote:
>
> > Hi all,
> >
> > I am trying to run a Kafka Connect cluster to ingest data from a
> relational
> > database with jdbc connector.
> >
> > I have been investigating many other solutions including Spark, Flink and
> > Flume before using Kafka Connect, but none of them can be used to ingest
> > relational databases in a clusterable way. With "cluster" I mean
> ingesting
> > one database with several distributed processes in parallel, instead of
> > each process in the cluster ingesting different databases. Kafka Connect
> is
> > the option I am investigating currently. After reading the
> documentation, I
> > have not found any clear statement about if my use case can be supported,
> > so I have to make a test to figure it out.
> >
> > I created a cluster with the following docker container configuration:
> >
> > ---
> > version: '2'
> > services:
> >  zookeeper:
> >    image: confluentinc/cp-zookeeper
> >    hostname: zookeeper
> >    ports:
> >      - "2181"
> >    environment:
> >      ZOOKEEPER_CLIENT_PORT: 2181
> >      ZOOKEEPER_TICK_TIME: 2000
> >
> >   broker1:
> >    image: confluentinc/cp-kafka
> >    hostname: broker1
> >    depends_on:
> >      - zookeeper
> >    ports:
> >      - '9092'
> >    environment:
> >      KAFKA_BROKER_ID: 1
> >      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> >      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker1:9092'
> >
> >   broker2:
> >    image: confluentinc/cp-kafka
> >    hostname: broker2
> >    depends_on:
> >      - zookeeper
> >    ports:
> >      - '9092'
> >    environment:
> >      KAFKA_BROKER_ID: 2
> >      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> >      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker2:9092'
> >
> >   broker3:
> >    image: confluentinc/cp-kafka
> >    hostname: broker3
> >    depends_on:
> >      - zookeeper
> >    ports:
> >      - '9092'
> >    environment:
> >      KAFKA_BROKER_ID: 3
> >      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> >      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker3:9092'
> >
> >   schema_registry:
> >    image: confluentinc/cp-schema-registry
> >    hostname: schema_registry
> >    depends_on:
> >      - zookeeper
> >      - broker1
> >      - broker2
> >      - broker3
> >    ports:
> >      - '8081'
> >    environment:
> >      SCHEMA_REGISTRY_HOST_NAME: schema_registry
> >      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
> >
> >   connect1:
> >    image: confluentinc/cp-kafka-connect
> >    hostname: connect1
> >    depends_on:
> >      - zookeeper
> >      - broker1
> >      - broker2
> >      - broker3
> >      - schema_registry
> >    ports:
> >      - "8083"
> >    environment:
> >      CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,broker3:9092'
> >      CONNECT_REST_ADVERTISED_HOST_NAME: connect1
> >      CONNECT_REST_PORT: 8083
> >      CONNECT_GROUP_ID: compose-connect-group
> >      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
> >      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
> >      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
> >      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
> >      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: '
> > http://schema_registry:8081
> > '
> >      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
> >      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
> > http://schema_registry:8081'
> >      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
> > JsonConverter
> >      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.
> > JsonConverter
> >      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> >
> >   connect2:
> >    image: confluentinc/cp-kafka-connect
> >    hostname: connect2
> >    depends_on:
> >      - zookeeper
> >      - broker1
> >      - broker2
> >      - broker3
> >      - schema_registry
> >    ports:
> >      - "8083"
> >    environment:
> >      CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,broker3:9092'
> >      CONNECT_REST_ADVERTISED_HOST_NAME: connect2
> >      CONNECT_REST_PORT: 8083
> >      CONNECT_GROUP_ID: compose-connect-group
> >      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
> >      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
> >      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
> >      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
> >      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: '
> > http://schema_registry:8081
> > '
> >      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
> >      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
> > http://schema_registry:8081'
> >      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
> > JsonConverter
> >      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.
> > JsonConverter
> >      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> >
> >   connect3:
> >    image: confluentinc/cp-kafka-connect
> >    hostname: connect3
> >    depends_on:
> >      - zookeeper
> >      - broker1
> >      - broker2
> >      - broker3
> >      - schema_registry
> >    ports:
> >      - "8083"
> >    environment:
> >      CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,broker3:9092'
> >      CONNECT_REST_ADVERTISED_HOST_NAME: connect3
> >      CONNECT_REST_PORT: 8083
> >      CONNECT_GROUP_ID: compose-connect-group
> >      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
> >      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
> >      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
> >      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
> >      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: '
> > http://schema_registry:8081
> > '
> >      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
> >      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
> > http://schema_registry:8081'
> >      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
> > JsonConverter
> >      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.
> > JsonConverter
> >      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> >
> >   control-center:
> >    image: confluentinc/cp-enterprise-control-center
> >    depends_on:
> >      - zookeeper
> >      - broker1
> >      - broker2
> >      - broker3
> >      - schema_registry
> >      - connect1
> >      - connect2
> >      - connect3
> >    ports:
> >      - "9021:9021"
> >    environment:
> >      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,bro
> > ker3:9092'
> >      CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> >      CONTROL_CENTER_CONNECT_CLUSTER: 'connect1:8083,connect2:8083,c
> > onnect3:8083'
> >      CONTROL_CENTER_REPLICATION_FACTOR: 1
> >      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
> >      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
> >      PORT: 9021
> >
> >   postgres:
> >    image: postgres
> >    hostname: postgres
> >    ports:
> >      - "5432"
> >    environment:
> >      POSTGRES_PASSWORD: postgres
> >
> > The Kafka cluster is running properly, but I don't know how to verify if
> > the Kafka Connect cluster is running properly. I prepared some test data
> in
> > the database, and created a source connector with the following
> > configuration:
> >
> > {
> >  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
> >  "name": "test",
> >  "tasks.max": 3,
> >  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
> >  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
> >  "connection.url": "jdbc:postgresql://postgres:
> > 5432/postgres?user=postgres&
> > password=postgres",
> >  "table.whitelist": "pgbench_accounts",
> >  "batch.max.rows": 1,
> >  "topic.prefix": "test",
> >  "mode": "incrementing",
> >  "incrementing.column.name": "aid"
> > }
> >
> > The ingestion process is correct and I can consume the produced messages.
> > But I still have no way to figure out if the ingestion is parallelized. I
> > called the status API and received the following result:
> >
> > {
> >    "name":"test",
> >    "connector":{
> >       "state":"RUNNING",
> >       "worker_id":"connect2:8083"
> >    },
> >    "tasks":[
> >       {
> >          "state":"RUNNING",
> >          "id":0,
> >          "worker_id":"connect3:8083"
> >       }
> >    ]
> > }
> >
> > This result is the same for all instances. Does it mean the ingestion
> tasks
> > are not parallelized? Is there anything important I am missing or this
> type
> > of clustering is simply not supported?
> >
> > Any comments and suggestions are highly appreciated. Have a nice day!
> >
> > Best regards,
> > Yang
> >
>

Re: how to ingest a database with a Kafka Connect cluster in parallel?

Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
The unit of parallelism in connect is a task. It's only listing one task,
so you only have one process copying data. The connector can consume data
from within a single *database* in parallel, but each *table* must be
handled by a single task. Since your table whitelist only includes a single
table, the connector will only generate a single task. If you add more
tables to the whitelist then you'll see more tasks in the status API output.

-Ewen

On Tue, Jan 3, 2017 at 4:03 AM, Yuanzhe Yang <yy...@gmail.com> wrote:

> Hi all,
>
> I am trying to run a Kafka Connect cluster to ingest data from a relational
> database with jdbc connector.
>
> I have been investigating many other solutions including Spark, Flink and
> Flume before using Kafka Connect, but none of them can be used to ingest
> relational databases in a clusterable way. With "cluster" I mean ingesting
> one database with several distributed processes in parallel, instead of
> each process in the cluster ingesting different databases. Kafka Connect is
> the option I am investigating currently. After reading the documentation, I
> have not found any clear statement about if my use case can be supported,
> so I have to make a test to figure it out.
>
> I created a cluster with the following docker container configuration:
>
> ---
> version: '2'
> services:
>  zookeeper:
>    image: confluentinc/cp-zookeeper
>    hostname: zookeeper
>    ports:
>      - "2181"
>    environment:
>      ZOOKEEPER_CLIENT_PORT: 2181
>      ZOOKEEPER_TICK_TIME: 2000
>
>   broker1:
>    image: confluentinc/cp-kafka
>    hostname: broker1
>    depends_on:
>      - zookeeper
>    ports:
>      - '9092'
>    environment:
>      KAFKA_BROKER_ID: 1
>      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
>      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker1:9092'
>
>   broker2:
>    image: confluentinc/cp-kafka
>    hostname: broker2
>    depends_on:
>      - zookeeper
>    ports:
>      - '9092'
>    environment:
>      KAFKA_BROKER_ID: 2
>      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
>      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker2:9092'
>
>   broker3:
>    image: confluentinc/cp-kafka
>    hostname: broker3
>    depends_on:
>      - zookeeper
>    ports:
>      - '9092'
>    environment:
>      KAFKA_BROKER_ID: 3
>      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
>      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker3:9092'
>
>   schema_registry:
>    image: confluentinc/cp-schema-registry
>    hostname: schema_registry
>    depends_on:
>      - zookeeper
>      - broker1
>      - broker2
>      - broker3
>    ports:
>      - '8081'
>    environment:
>      SCHEMA_REGISTRY_HOST_NAME: schema_registry
>      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
>
>   connect1:
>    image: confluentinc/cp-kafka-connect
>    hostname: connect1
>    depends_on:
>      - zookeeper
>      - broker1
>      - broker2
>      - broker3
>      - schema_registry
>    ports:
>      - "8083"
>    environment:
>      CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,broker3:9092'
>      CONNECT_REST_ADVERTISED_HOST_NAME: connect1
>      CONNECT_REST_PORT: 8083
>      CONNECT_GROUP_ID: compose-connect-group
>      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
>      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
>      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
>      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
>      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: '
> http://schema_registry:8081
> '
>      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
>      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
> http://schema_registry:8081'
>      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
> JsonConverter
>      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.
> JsonConverter
>      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
>
>   connect2:
>    image: confluentinc/cp-kafka-connect
>    hostname: connect2
>    depends_on:
>      - zookeeper
>      - broker1
>      - broker2
>      - broker3
>      - schema_registry
>    ports:
>      - "8083"
>    environment:
>      CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,broker3:9092'
>      CONNECT_REST_ADVERTISED_HOST_NAME: connect2
>      CONNECT_REST_PORT: 8083
>      CONNECT_GROUP_ID: compose-connect-group
>      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
>      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
>      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
>      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
>      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: '
> http://schema_registry:8081
> '
>      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
>      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
> http://schema_registry:8081'
>      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
> JsonConverter
>      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.
> JsonConverter
>      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
>
>   connect3:
>    image: confluentinc/cp-kafka-connect
>    hostname: connect3
>    depends_on:
>      - zookeeper
>      - broker1
>      - broker2
>      - broker3
>      - schema_registry
>    ports:
>      - "8083"
>    environment:
>      CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,broker3:9092'
>      CONNECT_REST_ADVERTISED_HOST_NAME: connect3
>      CONNECT_REST_PORT: 8083
>      CONNECT_GROUP_ID: compose-connect-group
>      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
>      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
>      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
>      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
>      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: '
> http://schema_registry:8081
> '
>      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
>      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
> http://schema_registry:8081'
>      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
> JsonConverter
>      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.
> JsonConverter
>      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
>
>   control-center:
>    image: confluentinc/cp-enterprise-control-center
>    depends_on:
>      - zookeeper
>      - broker1
>      - broker2
>      - broker3
>      - schema_registry
>      - connect1
>      - connect2
>      - connect3
>    ports:
>      - "9021:9021"
>    environment:
>      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,bro
> ker3:9092'
>      CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
>      CONTROL_CENTER_CONNECT_CLUSTER: 'connect1:8083,connect2:8083,c
> onnect3:8083'
>      CONTROL_CENTER_REPLICATION_FACTOR: 1
>      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
>      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
>      PORT: 9021
>
>   postgres:
>    image: postgres
>    hostname: postgres
>    ports:
>      - "5432"
>    environment:
>      POSTGRES_PASSWORD: postgres
>
> The Kafka cluster is running properly, but I don't know how to verify if
> the Kafka Connect cluster is running properly. I prepared some test data in
> the database, and created a source connector with the following
> configuration:
>
> {
>  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
>  "name": "test",
>  "tasks.max": 3,
>  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
>  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
>  "connection.url": "jdbc:postgresql://postgres:
> 5432/postgres?user=postgres&
> password=postgres",
>  "table.whitelist": "pgbench_accounts",
>  "batch.max.rows": 1,
>  "topic.prefix": "test",
>  "mode": "incrementing",
>  "incrementing.column.name": "aid"
> }
>
> The ingestion process is correct and I can consume the produced messages.
> But I still have no way to figure out if the ingestion is parallelized. I
> called the status API and received the following result:
>
> {
>    "name":"test",
>    "connector":{
>       "state":"RUNNING",
>       "worker_id":"connect2:8083"
>    },
>    "tasks":[
>       {
>          "state":"RUNNING",
>          "id":0,
>          "worker_id":"connect3:8083"
>       }
>    ]
> }
>
> This result is the same for all instances. Does it mean the ingestion tasks
> are not parallelized? Is there anything important I am missing or this type
> of clustering is simply not supported?
>
> Any comments and suggestions are highly appreciated. Have a nice day!
>
> Best regards,
> Yang
>