You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Yuanzhe Yang <yy...@gmail.com> on 2017/01/03 12:03:25 UTC
how to ingest a database with a Kafka Connect cluster in parallel?
Hi all,
I am trying to run a Kafka Connect cluster to ingest data from a relational
database with jdbc connector.
I have been investigating many other solutions including Spark, Flink and
Flume before using Kafka Connect, but none of them can be used to ingest
relational databases in a clusterable way. With "cluster" I mean ingesting
one database with several distributed processes in parallel, instead of
each process in the cluster ingesting different databases. Kafka Connect is
the option I am investigating currently. After reading the documentation, I
have not found any clear statement about if my use case can be supported,
so I have to make a test to figure it out.
I created a cluster with the following docker container configuration:
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper
hostname: zookeeper
ports:
- "2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker1:
image: confluentinc/cp-kafka
hostname: broker1
depends_on:
- zookeeper
ports:
- '9092'
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker1:9092'
broker2:
image: confluentinc/cp-kafka
hostname: broker2
depends_on:
- zookeeper
ports:
- '9092'
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker2:9092'
broker3:
image: confluentinc/cp-kafka
hostname: broker3
depends_on:
- zookeeper
ports:
- '9092'
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker3:9092'
schema_registry:
image: confluentinc/cp-schema-registry
hostname: schema_registry
depends_on:
- zookeeper
- broker1
- broker2
- broker3
ports:
- '8081'
environment:
SCHEMA_REGISTRY_HOST_NAME: schema_registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
connect1:
image: confluentinc/cp-kafka-connect
hostname: connect1
depends_on:
- zookeeper
- broker1
- broker2
- broker3
- schema_registry
ports:
- "8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,broker3:9092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect1
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8081
'
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
http://schema_registry:8081'
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.
JsonConverter
CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
connect2:
image: confluentinc/cp-kafka-connect
hostname: connect2
depends_on:
- zookeeper
- broker1
- broker2
- broker3
- schema_registry
ports:
- "8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,broker3:9092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect2
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8081
'
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
http://schema_registry:8081'
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.
JsonConverter
CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
connect3:
image: confluentinc/cp-kafka-connect
hostname: connect3
depends_on:
- zookeeper
- broker1
- broker2
- broker3
- schema_registry
ports:
- "8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,broker3:9092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect3
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8081
'
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
http://schema_registry:8081'
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.
JsonConverter
CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
control-center:
image: confluentinc/cp-enterprise-control-center
depends_on:
- zookeeper
- broker1
- broker2
- broker3
- schema_registry
- connect1
- connect2
- connect3
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,bro
ker3:9092'
CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
CONTROL_CENTER_CONNECT_CLUSTER: 'connect1:8083,connect2:8083,c
onnect3:8083'
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
PORT: 9021
postgres:
image: postgres
hostname: postgres
ports:
- "5432"
environment:
POSTGRES_PASSWORD: postgres
The Kafka cluster is running properly, but I don't know how to verify if
the Kafka Connect cluster is running properly. I prepared some test data in
the database, and created a source connector with the following
configuration:
{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"name": "test",
"tasks.max": 3,
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"connection.url": "jdbc:postgresql://postgres:5432/postgres?user=postgres&
password=postgres",
"table.whitelist": "pgbench_accounts",
"batch.max.rows": 1,
"topic.prefix": "test",
"mode": "incrementing",
"incrementing.column.name": "aid"
}
The ingestion process is correct and I can consume the produced messages.
But I still have no way to figure out if the ingestion is parallelized. I
called the status API and received the following result:
{
"name":"test",
"connector":{
"state":"RUNNING",
"worker_id":"connect2:8083"
},
"tasks":[
{
"state":"RUNNING",
"id":0,
"worker_id":"connect3:8083"
}
]
}
This result is the same for all instances. Does it mean the ingestion tasks
are not parallelized? Is there anything important I am missing or this type
of clustering is simply not supported?
Any comments and suggestions are highly appreciated. Have a nice day!
Best regards,
Yang
Re: how to ingest a database with a Kafka Connect cluster in parallel?
Posted by Yuanzhe Yang <yy...@gmail.com>.
Hi Ewen,
OK. Thanks a lot for your feedback!
Best regards,
Yang
2017-01-03 22:42 GMT+01:00 Ewen Cheslack-Postava <ew...@confluent.io>:
> It's an implementation detail of the JDBC connector. You could potentially
> write a connector that parallelizes at that level, but you lose other
> potentially useful properties (e.g. ordering). To split at this level you'd
> have to do something like have each task be responsible for a subset of
> rowids in the database.
>
> -Ewen
>
> On Tue, Jan 3, 2017 at 1:24 PM, Yuanzhe Yang <yy...@gmail.com> wrote:
>
> > Hi Ewen,
> >
> > Thanks a lot for your reply. So it means we cannot parallelize ingestion
> of
> > one table with multiple processes. Is it because of Kafka Connect or the
> > JDBC connector?
> >
> > Have a nice day.
> >
> > Best regards,
> > Yang
> >
> >
> > 2017-01-03 20:55 GMT+01:00 Ewen Cheslack-Postava <ew...@confluent.io>:
> >
> > > The unit of parallelism in connect is a task. It's only listing one
> task,
> > > so you only have one process copying data. The connector can consume
> data
> > > from within a single *database* in parallel, but each *table* must be
> > > handled by a single task. Since your table whitelist only includes a
> > single
> > > table, the connector will only generate a single task. If you add more
> > > tables to the whitelist then you'll see more tasks in the status API
> > > output.
> > >
> > > -Ewen
> > >
> > > On Tue, Jan 3, 2017 at 4:03 AM, Yuanzhe Yang <yy...@gmail.com>
> wrote:
> > >
> > > > Hi all,
> > > >
> > > > I am trying to run a Kafka Connect cluster to ingest data from a
> > > relational
> > > > database with jdbc connector.
> > > >
> > > > I have been investigating many other solutions including Spark, Flink
> > and
> > > > Flume before using Kafka Connect, but none of them can be used to
> > ingest
> > > > relational databases in a clusterable way. With "cluster" I mean
> > > ingesting
> > > > one database with several distributed processes in parallel, instead
> of
> > > > each process in the cluster ingesting different databases. Kafka
> > Connect
> > > is
> > > > the option I am investigating currently. After reading the
> > > documentation, I
> > > > have not found any clear statement about if my use case can be
> > supported,
> > > > so I have to make a test to figure it out.
> > > >
> > > > I created a cluster with the following docker container
> configuration:
> > > >
> > > > ---
> > > > version: '2'
> > > > services:
> > > > zookeeper:
> > > > image: confluentinc/cp-zookeeper
> > > > hostname: zookeeper
> > > > ports:
> > > > - "2181"
> > > > environment:
> > > > ZOOKEEPER_CLIENT_PORT: 2181
> > > > ZOOKEEPER_TICK_TIME: 2000
> > > >
> > > > broker1:
> > > > image: confluentinc/cp-kafka
> > > > hostname: broker1
> > > > depends_on:
> > > > - zookeeper
> > > > ports:
> > > > - '9092'
> > > > environment:
> > > > KAFKA_BROKER_ID: 1
> > > > KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > > > KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker1:9092'
> > > >
> > > > broker2:
> > > > image: confluentinc/cp-kafka
> > > > hostname: broker2
> > > > depends_on:
> > > > - zookeeper
> > > > ports:
> > > > - '9092'
> > > > environment:
> > > > KAFKA_BROKER_ID: 2
> > > > KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > > > KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker2:9092'
> > > >
> > > > broker3:
> > > > image: confluentinc/cp-kafka
> > > > hostname: broker3
> > > > depends_on:
> > > > - zookeeper
> > > > ports:
> > > > - '9092'
> > > > environment:
> > > > KAFKA_BROKER_ID: 3
> > > > KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > > > KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker3:9092'
> > > >
> > > > schema_registry:
> > > > image: confluentinc/cp-schema-registry
> > > > hostname: schema_registry
> > > > depends_on:
> > > > - zookeeper
> > > > - broker1
> > > > - broker2
> > > > - broker3
> > > > ports:
> > > > - '8081'
> > > > environment:
> > > > SCHEMA_REGISTRY_HOST_NAME: schema_registry
> > > > SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
> > > >
> > > > connect1:
> > > > image: confluentinc/cp-kafka-connect
> > > > hostname: connect1
> > > > depends_on:
> > > > - zookeeper
> > > > - broker1
> > > > - broker2
> > > > - broker3
> > > > - schema_registry
> > > > ports:
> > > > - "8083"
> > > > environment:
> > > > CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,
> > broker3:9092'
> > > > CONNECT_REST_ADVERTISED_HOST_NAME: connect1
> > > > CONNECT_REST_PORT: 8083
> > > > CONNECT_GROUP_ID: compose-connect-group
> > > > CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
> > > > CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
> > > > CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
> > > > CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
> > > > CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: '
> > > > http://schema_registry:8081
> > > > '
> > > > CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.
> AvroConverter
> > > > CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
> > > > http://schema_registry:8081'
> > > > CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
> > > > JsonConverter
> > > > CONNECT_INTERNAL_VALUE_CONVERTER:
> org.apache.kafka.connect.json.
> > > > JsonConverter
> > > > CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > > >
> > > > connect2:
> > > > image: confluentinc/cp-kafka-connect
> > > > hostname: connect2
> > > > depends_on:
> > > > - zookeeper
> > > > - broker1
> > > > - broker2
> > > > - broker3
> > > > - schema_registry
> > > > ports:
> > > > - "8083"
> > > > environment:
> > > > CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,
> > broker3:9092'
> > > > CONNECT_REST_ADVERTISED_HOST_NAME: connect2
> > > > CONNECT_REST_PORT: 8083
> > > > CONNECT_GROUP_ID: compose-connect-group
> > > > CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
> > > > CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
> > > > CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
> > > > CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
> > > > CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: '
> > > > http://schema_registry:8081
> > > > '
> > > > CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.
> AvroConverter
> > > > CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
> > > > http://schema_registry:8081'
> > > > CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
> > > > JsonConverter
> > > > CONNECT_INTERNAL_VALUE_CONVERTER:
> org.apache.kafka.connect.json.
> > > > JsonConverter
> > > > CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > > >
> > > > connect3:
> > > > image: confluentinc/cp-kafka-connect
> > > > hostname: connect3
> > > > depends_on:
> > > > - zookeeper
> > > > - broker1
> > > > - broker2
> > > > - broker3
> > > > - schema_registry
> > > > ports:
> > > > - "8083"
> > > > environment:
> > > > CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,
> > broker3:9092'
> > > > CONNECT_REST_ADVERTISED_HOST_NAME: connect3
> > > > CONNECT_REST_PORT: 8083
> > > > CONNECT_GROUP_ID: compose-connect-group
> > > > CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
> > > > CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
> > > > CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
> > > > CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
> > > > CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: '
> > > > http://schema_registry:8081
> > > > '
> > > > CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.
> AvroConverter
> > > > CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
> > > > http://schema_registry:8081'
> > > > CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
> > > > JsonConverter
> > > > CONNECT_INTERNAL_VALUE_CONVERTER:
> org.apache.kafka.connect.json.
> > > > JsonConverter
> > > > CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > > >
> > > > control-center:
> > > > image: confluentinc/cp-enterprise-control-center
> > > > depends_on:
> > > > - zookeeper
> > > > - broker1
> > > > - broker2
> > > > - broker3
> > > > - schema_registry
> > > > - connect1
> > > > - connect2
> > > > - connect3
> > > > ports:
> > > > - "9021:9021"
> > > > environment:
> > > > CONTROL_CENTER_BOOTSTRAP_SERVERS:
> 'broker1:9092,broker2:9092,bro
> > > > ker3:9092'
> > > > CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > > > CONTROL_CENTER_CONNECT_CLUSTER: 'connect1:8083,connect2:8083,c
> > > > onnect3:8083'
> > > > CONTROL_CENTER_REPLICATION_FACTOR: 1
> > > > CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
> > > > CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
> > > > PORT: 9021
> > > >
> > > > postgres:
> > > > image: postgres
> > > > hostname: postgres
> > > > ports:
> > > > - "5432"
> > > > environment:
> > > > POSTGRES_PASSWORD: postgres
> > > >
> > > > The Kafka cluster is running properly, but I don't know how to verify
> > if
> > > > the Kafka Connect cluster is running properly. I prepared some test
> > data
> > > in
> > > > the database, and created a source connector with the following
> > > > configuration:
> > > >
> > > > {
> > > > "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
> > > > "name": "test",
> > > > "tasks.max": 3,
> > > > "key.converter": "org.apache.kafka.connect.json.JsonConverter",
> > > > "value.converter": "org.apache.kafka.connect.json.JsonConverter",
> > > > "connection.url": "jdbc:postgresql://postgres:
> > > > 5432/postgres?user=postgres&
> > > > password=postgres",
> > > > "table.whitelist": "pgbench_accounts",
> > > > "batch.max.rows": 1,
> > > > "topic.prefix": "test",
> > > > "mode": "incrementing",
> > > > "incrementing.column.name": "aid"
> > > > }
> > > >
> > > > The ingestion process is correct and I can consume the produced
> > messages.
> > > > But I still have no way to figure out if the ingestion is
> > parallelized. I
> > > > called the status API and received the following result:
> > > >
> > > > {
> > > > "name":"test",
> > > > "connector":{
> > > > "state":"RUNNING",
> > > > "worker_id":"connect2:8083"
> > > > },
> > > > "tasks":[
> > > > {
> > > > "state":"RUNNING",
> > > > "id":0,
> > > > "worker_id":"connect3:8083"
> > > > }
> > > > ]
> > > > }
> > > >
> > > > This result is the same for all instances. Does it mean the ingestion
> > > tasks
> > > > are not parallelized? Is there anything important I am missing or
> this
> > > type
> > > > of clustering is simply not supported?
> > > >
> > > > Any comments and suggestions are highly appreciated. Have a nice day!
> > > >
> > > > Best regards,
> > > > Yang
> > > >
> > >
> >
>
Re: how to ingest a database with a Kafka Connect cluster in parallel?
Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
It's an implementation detail of the JDBC connector. You could potentially
write a connector that parallelizes at that level, but you lose other
potentially useful properties (e.g. ordering). To split at this level you'd
have to do something like have each task be responsible for a subset of
rowids in the database.
-Ewen
On Tue, Jan 3, 2017 at 1:24 PM, Yuanzhe Yang <yy...@gmail.com> wrote:
> Hi Ewen,
>
> Thanks a lot for your reply. So it means we cannot parallelize ingestion of
> one table with multiple processes. Is it because of Kafka Connect or the
> JDBC connector?
>
> Have a nice day.
>
> Best regards,
> Yang
>
>
> 2017-01-03 20:55 GMT+01:00 Ewen Cheslack-Postava <ew...@confluent.io>:
>
> > The unit of parallelism in connect is a task. It's only listing one task,
> > so you only have one process copying data. The connector can consume data
> > from within a single *database* in parallel, but each *table* must be
> > handled by a single task. Since your table whitelist only includes a
> single
> > table, the connector will only generate a single task. If you add more
> > tables to the whitelist then you'll see more tasks in the status API
> > output.
> >
> > -Ewen
> >
> > On Tue, Jan 3, 2017 at 4:03 AM, Yuanzhe Yang <yy...@gmail.com> wrote:
> >
> > > Hi all,
> > >
> > > I am trying to run a Kafka Connect cluster to ingest data from a
> > relational
> > > database with jdbc connector.
> > >
> > > I have been investigating many other solutions including Spark, Flink
> and
> > > Flume before using Kafka Connect, but none of them can be used to
> ingest
> > > relational databases in a clusterable way. With "cluster" I mean
> > ingesting
> > > one database with several distributed processes in parallel, instead of
> > > each process in the cluster ingesting different databases. Kafka
> Connect
> > is
> > > the option I am investigating currently. After reading the
> > documentation, I
> > > have not found any clear statement about if my use case can be
> supported,
> > > so I have to make a test to figure it out.
> > >
> > > I created a cluster with the following docker container configuration:
> > >
> > > ---
> > > version: '2'
> > > services:
> > > zookeeper:
> > > image: confluentinc/cp-zookeeper
> > > hostname: zookeeper
> > > ports:
> > > - "2181"
> > > environment:
> > > ZOOKEEPER_CLIENT_PORT: 2181
> > > ZOOKEEPER_TICK_TIME: 2000
> > >
> > > broker1:
> > > image: confluentinc/cp-kafka
> > > hostname: broker1
> > > depends_on:
> > > - zookeeper
> > > ports:
> > > - '9092'
> > > environment:
> > > KAFKA_BROKER_ID: 1
> > > KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > > KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker1:9092'
> > >
> > > broker2:
> > > image: confluentinc/cp-kafka
> > > hostname: broker2
> > > depends_on:
> > > - zookeeper
> > > ports:
> > > - '9092'
> > > environment:
> > > KAFKA_BROKER_ID: 2
> > > KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > > KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker2:9092'
> > >
> > > broker3:
> > > image: confluentinc/cp-kafka
> > > hostname: broker3
> > > depends_on:
> > > - zookeeper
> > > ports:
> > > - '9092'
> > > environment:
> > > KAFKA_BROKER_ID: 3
> > > KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > > KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker3:9092'
> > >
> > > schema_registry:
> > > image: confluentinc/cp-schema-registry
> > > hostname: schema_registry
> > > depends_on:
> > > - zookeeper
> > > - broker1
> > > - broker2
> > > - broker3
> > > ports:
> > > - '8081'
> > > environment:
> > > SCHEMA_REGISTRY_HOST_NAME: schema_registry
> > > SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
> > >
> > > connect1:
> > > image: confluentinc/cp-kafka-connect
> > > hostname: connect1
> > > depends_on:
> > > - zookeeper
> > > - broker1
> > > - broker2
> > > - broker3
> > > - schema_registry
> > > ports:
> > > - "8083"
> > > environment:
> > > CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,
> broker3:9092'
> > > CONNECT_REST_ADVERTISED_HOST_NAME: connect1
> > > CONNECT_REST_PORT: 8083
> > > CONNECT_GROUP_ID: compose-connect-group
> > > CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
> > > CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
> > > CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
> > > CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
> > > CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: '
> > > http://schema_registry:8081
> > > '
> > > CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
> > > CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
> > > http://schema_registry:8081'
> > > CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
> > > JsonConverter
> > > CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.
> > > JsonConverter
> > > CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > >
> > > connect2:
> > > image: confluentinc/cp-kafka-connect
> > > hostname: connect2
> > > depends_on:
> > > - zookeeper
> > > - broker1
> > > - broker2
> > > - broker3
> > > - schema_registry
> > > ports:
> > > - "8083"
> > > environment:
> > > CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,
> broker3:9092'
> > > CONNECT_REST_ADVERTISED_HOST_NAME: connect2
> > > CONNECT_REST_PORT: 8083
> > > CONNECT_GROUP_ID: compose-connect-group
> > > CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
> > > CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
> > > CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
> > > CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
> > > CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: '
> > > http://schema_registry:8081
> > > '
> > > CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
> > > CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
> > > http://schema_registry:8081'
> > > CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
> > > JsonConverter
> > > CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.
> > > JsonConverter
> > > CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > >
> > > connect3:
> > > image: confluentinc/cp-kafka-connect
> > > hostname: connect3
> > > depends_on:
> > > - zookeeper
> > > - broker1
> > > - broker2
> > > - broker3
> > > - schema_registry
> > > ports:
> > > - "8083"
> > > environment:
> > > CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,
> broker3:9092'
> > > CONNECT_REST_ADVERTISED_HOST_NAME: connect3
> > > CONNECT_REST_PORT: 8083
> > > CONNECT_GROUP_ID: compose-connect-group
> > > CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
> > > CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
> > > CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
> > > CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
> > > CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: '
> > > http://schema_registry:8081
> > > '
> > > CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
> > > CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
> > > http://schema_registry:8081'
> > > CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
> > > JsonConverter
> > > CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.
> > > JsonConverter
> > > CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > >
> > > control-center:
> > > image: confluentinc/cp-enterprise-control-center
> > > depends_on:
> > > - zookeeper
> > > - broker1
> > > - broker2
> > > - broker3
> > > - schema_registry
> > > - connect1
> > > - connect2
> > > - connect3
> > > ports:
> > > - "9021:9021"
> > > environment:
> > > CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,bro
> > > ker3:9092'
> > > CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > > CONTROL_CENTER_CONNECT_CLUSTER: 'connect1:8083,connect2:8083,c
> > > onnect3:8083'
> > > CONTROL_CENTER_REPLICATION_FACTOR: 1
> > > CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
> > > CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
> > > PORT: 9021
> > >
> > > postgres:
> > > image: postgres
> > > hostname: postgres
> > > ports:
> > > - "5432"
> > > environment:
> > > POSTGRES_PASSWORD: postgres
> > >
> > > The Kafka cluster is running properly, but I don't know how to verify
> if
> > > the Kafka Connect cluster is running properly. I prepared some test
> data
> > in
> > > the database, and created a source connector with the following
> > > configuration:
> > >
> > > {
> > > "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
> > > "name": "test",
> > > "tasks.max": 3,
> > > "key.converter": "org.apache.kafka.connect.json.JsonConverter",
> > > "value.converter": "org.apache.kafka.connect.json.JsonConverter",
> > > "connection.url": "jdbc:postgresql://postgres:
> > > 5432/postgres?user=postgres&
> > > password=postgres",
> > > "table.whitelist": "pgbench_accounts",
> > > "batch.max.rows": 1,
> > > "topic.prefix": "test",
> > > "mode": "incrementing",
> > > "incrementing.column.name": "aid"
> > > }
> > >
> > > The ingestion process is correct and I can consume the produced
> messages.
> > > But I still have no way to figure out if the ingestion is
> parallelized. I
> > > called the status API and received the following result:
> > >
> > > {
> > > "name":"test",
> > > "connector":{
> > > "state":"RUNNING",
> > > "worker_id":"connect2:8083"
> > > },
> > > "tasks":[
> > > {
> > > "state":"RUNNING",
> > > "id":0,
> > > "worker_id":"connect3:8083"
> > > }
> > > ]
> > > }
> > >
> > > This result is the same for all instances. Does it mean the ingestion
> > tasks
> > > are not parallelized? Is there anything important I am missing or this
> > type
> > > of clustering is simply not supported?
> > >
> > > Any comments and suggestions are highly appreciated. Have a nice day!
> > >
> > > Best regards,
> > > Yang
> > >
> >
>
Re: how to ingest a database with a Kafka Connect cluster in parallel?
Posted by Yuanzhe Yang <yy...@gmail.com>.
Hi Ewen,
Thanks a lot for your reply. So it means we cannot parallelize ingestion of
one table with multiple processes. Is it because of Kafka Connect or the
JDBC connector?
Have a nice day.
Best regards,
Yang
2017-01-03 20:55 GMT+01:00 Ewen Cheslack-Postava <ew...@confluent.io>:
> The unit of parallelism in connect is a task. It's only listing one task,
> so you only have one process copying data. The connector can consume data
> from within a single *database* in parallel, but each *table* must be
> handled by a single task. Since your table whitelist only includes a single
> table, the connector will only generate a single task. If you add more
> tables to the whitelist then you'll see more tasks in the status API
> output.
>
> -Ewen
>
> On Tue, Jan 3, 2017 at 4:03 AM, Yuanzhe Yang <yy...@gmail.com> wrote:
>
> > Hi all,
> >
> > I am trying to run a Kafka Connect cluster to ingest data from a
> relational
> > database with jdbc connector.
> >
> > I have been investigating many other solutions including Spark, Flink and
> > Flume before using Kafka Connect, but none of them can be used to ingest
> > relational databases in a clusterable way. With "cluster" I mean
> ingesting
> > one database with several distributed processes in parallel, instead of
> > each process in the cluster ingesting different databases. Kafka Connect
> is
> > the option I am investigating currently. After reading the
> documentation, I
> > have not found any clear statement about if my use case can be supported,
> > so I have to make a test to figure it out.
> >
> > I created a cluster with the following docker container configuration:
> >
> > ---
> > version: '2'
> > services:
> > zookeeper:
> > image: confluentinc/cp-zookeeper
> > hostname: zookeeper
> > ports:
> > - "2181"
> > environment:
> > ZOOKEEPER_CLIENT_PORT: 2181
> > ZOOKEEPER_TICK_TIME: 2000
> >
> > broker1:
> > image: confluentinc/cp-kafka
> > hostname: broker1
> > depends_on:
> > - zookeeper
> > ports:
> > - '9092'
> > environment:
> > KAFKA_BROKER_ID: 1
> > KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker1:9092'
> >
> > broker2:
> > image: confluentinc/cp-kafka
> > hostname: broker2
> > depends_on:
> > - zookeeper
> > ports:
> > - '9092'
> > environment:
> > KAFKA_BROKER_ID: 2
> > KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker2:9092'
> >
> > broker3:
> > image: confluentinc/cp-kafka
> > hostname: broker3
> > depends_on:
> > - zookeeper
> > ports:
> > - '9092'
> > environment:
> > KAFKA_BROKER_ID: 3
> > KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker3:9092'
> >
> > schema_registry:
> > image: confluentinc/cp-schema-registry
> > hostname: schema_registry
> > depends_on:
> > - zookeeper
> > - broker1
> > - broker2
> > - broker3
> > ports:
> > - '8081'
> > environment:
> > SCHEMA_REGISTRY_HOST_NAME: schema_registry
> > SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
> >
> > connect1:
> > image: confluentinc/cp-kafka-connect
> > hostname: connect1
> > depends_on:
> > - zookeeper
> > - broker1
> > - broker2
> > - broker3
> > - schema_registry
> > ports:
> > - "8083"
> > environment:
> > CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,broker3:9092'
> > CONNECT_REST_ADVERTISED_HOST_NAME: connect1
> > CONNECT_REST_PORT: 8083
> > CONNECT_GROUP_ID: compose-connect-group
> > CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
> > CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
> > CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
> > CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
> > CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: '
> > http://schema_registry:8081
> > '
> > CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
> > CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
> > http://schema_registry:8081'
> > CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
> > JsonConverter
> > CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.
> > JsonConverter
> > CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> >
> > connect2:
> > image: confluentinc/cp-kafka-connect
> > hostname: connect2
> > depends_on:
> > - zookeeper
> > - broker1
> > - broker2
> > - broker3
> > - schema_registry
> > ports:
> > - "8083"
> > environment:
> > CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,broker3:9092'
> > CONNECT_REST_ADVERTISED_HOST_NAME: connect2
> > CONNECT_REST_PORT: 8083
> > CONNECT_GROUP_ID: compose-connect-group
> > CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
> > CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
> > CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
> > CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
> > CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: '
> > http://schema_registry:8081
> > '
> > CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
> > CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
> > http://schema_registry:8081'
> > CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
> > JsonConverter
> > CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.
> > JsonConverter
> > CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> >
> > connect3:
> > image: confluentinc/cp-kafka-connect
> > hostname: connect3
> > depends_on:
> > - zookeeper
> > - broker1
> > - broker2
> > - broker3
> > - schema_registry
> > ports:
> > - "8083"
> > environment:
> > CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,broker3:9092'
> > CONNECT_REST_ADVERTISED_HOST_NAME: connect3
> > CONNECT_REST_PORT: 8083
> > CONNECT_GROUP_ID: compose-connect-group
> > CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
> > CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
> > CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
> > CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
> > CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: '
> > http://schema_registry:8081
> > '
> > CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
> > CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
> > http://schema_registry:8081'
> > CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
> > JsonConverter
> > CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.
> > JsonConverter
> > CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> >
> > control-center:
> > image: confluentinc/cp-enterprise-control-center
> > depends_on:
> > - zookeeper
> > - broker1
> > - broker2
> > - broker3
> > - schema_registry
> > - connect1
> > - connect2
> > - connect3
> > ports:
> > - "9021:9021"
> > environment:
> > CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,bro
> > ker3:9092'
> > CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > CONTROL_CENTER_CONNECT_CLUSTER: 'connect1:8083,connect2:8083,c
> > onnect3:8083'
> > CONTROL_CENTER_REPLICATION_FACTOR: 1
> > CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
> > CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
> > PORT: 9021
> >
> > postgres:
> > image: postgres
> > hostname: postgres
> > ports:
> > - "5432"
> > environment:
> > POSTGRES_PASSWORD: postgres
> >
> > The Kafka cluster is running properly, but I don't know how to verify if
> > the Kafka Connect cluster is running properly. I prepared some test data
> in
> > the database, and created a source connector with the following
> > configuration:
> >
> > {
> > "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
> > "name": "test",
> > "tasks.max": 3,
> > "key.converter": "org.apache.kafka.connect.json.JsonConverter",
> > "value.converter": "org.apache.kafka.connect.json.JsonConverter",
> > "connection.url": "jdbc:postgresql://postgres:
> > 5432/postgres?user=postgres&
> > password=postgres",
> > "table.whitelist": "pgbench_accounts",
> > "batch.max.rows": 1,
> > "topic.prefix": "test",
> > "mode": "incrementing",
> > "incrementing.column.name": "aid"
> > }
> >
> > The ingestion process is correct and I can consume the produced messages.
> > But I still have no way to figure out if the ingestion is parallelized. I
> > called the status API and received the following result:
> >
> > {
> > "name":"test",
> > "connector":{
> > "state":"RUNNING",
> > "worker_id":"connect2:8083"
> > },
> > "tasks":[
> > {
> > "state":"RUNNING",
> > "id":0,
> > "worker_id":"connect3:8083"
> > }
> > ]
> > }
> >
> > This result is the same for all instances. Does it mean the ingestion
> tasks
> > are not parallelized? Is there anything important I am missing or this
> type
> > of clustering is simply not supported?
> >
> > Any comments and suggestions are highly appreciated. Have a nice day!
> >
> > Best regards,
> > Yang
> >
>
Re: how to ingest a database with a Kafka Connect cluster in parallel?
Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
The unit of parallelism in connect is a task. It's only listing one task,
so you only have one process copying data. The connector can consume data
from within a single *database* in parallel, but each *table* must be
handled by a single task. Since your table whitelist only includes a single
table, the connector will only generate a single task. If you add more
tables to the whitelist then you'll see more tasks in the status API output.
-Ewen
On Tue, Jan 3, 2017 at 4:03 AM, Yuanzhe Yang <yy...@gmail.com> wrote:
> Hi all,
>
> I am trying to run a Kafka Connect cluster to ingest data from a relational
> database with jdbc connector.
>
> I have been investigating many other solutions including Spark, Flink and
> Flume before using Kafka Connect, but none of them can be used to ingest
> relational databases in a clusterable way. With "cluster" I mean ingesting
> one database with several distributed processes in parallel, instead of
> each process in the cluster ingesting different databases. Kafka Connect is
> the option I am investigating currently. After reading the documentation, I
> have not found any clear statement about if my use case can be supported,
> so I have to make a test to figure it out.
>
> I created a cluster with the following docker container configuration:
>
> ---
> version: '2'
> services:
> zookeeper:
> image: confluentinc/cp-zookeeper
> hostname: zookeeper
> ports:
> - "2181"
> environment:
> ZOOKEEPER_CLIENT_PORT: 2181
> ZOOKEEPER_TICK_TIME: 2000
>
> broker1:
> image: confluentinc/cp-kafka
> hostname: broker1
> depends_on:
> - zookeeper
> ports:
> - '9092'
> environment:
> KAFKA_BROKER_ID: 1
> KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker1:9092'
>
> broker2:
> image: confluentinc/cp-kafka
> hostname: broker2
> depends_on:
> - zookeeper
> ports:
> - '9092'
> environment:
> KAFKA_BROKER_ID: 2
> KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker2:9092'
>
> broker3:
> image: confluentinc/cp-kafka
> hostname: broker3
> depends_on:
> - zookeeper
> ports:
> - '9092'
> environment:
> KAFKA_BROKER_ID: 3
> KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker3:9092'
>
> schema_registry:
> image: confluentinc/cp-schema-registry
> hostname: schema_registry
> depends_on:
> - zookeeper
> - broker1
> - broker2
> - broker3
> ports:
> - '8081'
> environment:
> SCHEMA_REGISTRY_HOST_NAME: schema_registry
> SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
>
> connect1:
> image: confluentinc/cp-kafka-connect
> hostname: connect1
> depends_on:
> - zookeeper
> - broker1
> - broker2
> - broker3
> - schema_registry
> ports:
> - "8083"
> environment:
> CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,broker3:9092'
> CONNECT_REST_ADVERTISED_HOST_NAME: connect1
> CONNECT_REST_PORT: 8083
> CONNECT_GROUP_ID: compose-connect-group
> CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
> CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
> CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
> CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
> CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: '
> http://schema_registry:8081
> '
> CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
> CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
> http://schema_registry:8081'
> CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
> JsonConverter
> CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.
> JsonConverter
> CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
>
> connect2:
> image: confluentinc/cp-kafka-connect
> hostname: connect2
> depends_on:
> - zookeeper
> - broker1
> - broker2
> - broker3
> - schema_registry
> ports:
> - "8083"
> environment:
> CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,broker3:9092'
> CONNECT_REST_ADVERTISED_HOST_NAME: connect2
> CONNECT_REST_PORT: 8083
> CONNECT_GROUP_ID: compose-connect-group
> CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
> CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
> CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
> CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
> CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: '
> http://schema_registry:8081
> '
> CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
> CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
> http://schema_registry:8081'
> CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
> JsonConverter
> CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.
> JsonConverter
> CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
>
> connect3:
> image: confluentinc/cp-kafka-connect
> hostname: connect3
> depends_on:
> - zookeeper
> - broker1
> - broker2
> - broker3
> - schema_registry
> ports:
> - "8083"
> environment:
> CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,broker3:9092'
> CONNECT_REST_ADVERTISED_HOST_NAME: connect3
> CONNECT_REST_PORT: 8083
> CONNECT_GROUP_ID: compose-connect-group
> CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
> CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
> CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
> CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
> CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: '
> http://schema_registry:8081
> '
> CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
> CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
> http://schema_registry:8081'
> CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
> JsonConverter
> CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.
> JsonConverter
> CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
>
> control-center:
> image: confluentinc/cp-enterprise-control-center
> depends_on:
> - zookeeper
> - broker1
> - broker2
> - broker3
> - schema_registry
> - connect1
> - connect2
> - connect3
> ports:
> - "9021:9021"
> environment:
> CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,bro
> ker3:9092'
> CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> CONTROL_CENTER_CONNECT_CLUSTER: 'connect1:8083,connect2:8083,c
> onnect3:8083'
> CONTROL_CENTER_REPLICATION_FACTOR: 1
> CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
> CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
> PORT: 9021
>
> postgres:
> image: postgres
> hostname: postgres
> ports:
> - "5432"
> environment:
> POSTGRES_PASSWORD: postgres
>
> The Kafka cluster is running properly, but I don't know how to verify if
> the Kafka Connect cluster is running properly. I prepared some test data in
> the database, and created a source connector with the following
> configuration:
>
> {
> "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
> "name": "test",
> "tasks.max": 3,
> "key.converter": "org.apache.kafka.connect.json.JsonConverter",
> "value.converter": "org.apache.kafka.connect.json.JsonConverter",
> "connection.url": "jdbc:postgresql://postgres:
> 5432/postgres?user=postgres&
> password=postgres",
> "table.whitelist": "pgbench_accounts",
> "batch.max.rows": 1,
> "topic.prefix": "test",
> "mode": "incrementing",
> "incrementing.column.name": "aid"
> }
>
> The ingestion process is correct and I can consume the produced messages.
> But I still have no way to figure out if the ingestion is parallelized. I
> called the status API and received the following result:
>
> {
> "name":"test",
> "connector":{
> "state":"RUNNING",
> "worker_id":"connect2:8083"
> },
> "tasks":[
> {
> "state":"RUNNING",
> "id":0,
> "worker_id":"connect3:8083"
> }
> ]
> }
>
> This result is the same for all instances. Does it mean the ingestion tasks
> are not parallelized? Is there anything important I am missing or this type
> of clustering is simply not supported?
>
> Any comments and suggestions are highly appreciated. Have a nice day!
>
> Best regards,
> Yang
>