You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by svonn <sv...@posteo.de> on 2017/11/20 20:46:17 UTC

Issues with multiple kafka connectors, questions regarding ignite caches

Hi!

I have recently started a project with following scenario:
My Kafka is receiving data from two sources (both String key, Byte[] value)
that i want to stream into two different Ignite caches. 
The entire setup is to be built with docker(compose). I will post my
docker-compose.yml at the end.

Now, let's start with the questions and issues:

1. After starting up the docker-compose I can access the Kafka
Control-Center (confluent) and it shows both topics being filled. With the
REST-call: 

{"name": "test-connector-one", 
 "config":       

{"connector.class":"org.apache.ignite.stream.kafka.connect.IgniteSinkConnector",
"tasks.max":"1",
"topics":"TestTopic",
"cacheName":"TestCache",
"igniteCfg":"/etc/kafka-connect/jars/test.xml"}}

I am starting to stream data from Kafkas TestTopic to Ignites TestCache - I
have set the text.xml so that IgniteSinkConnector will start a node in
client-mode only. So far everything works perfectly fine, I can see the data
arriving in TestCache (also used -scan in ignitevisorcmd, the data is
correct).

Now, if I do another REST-Call in order to start the second connector (same
call, different 'name', 'topics' and 'cacheName') the connector appears to
be stopping all workers from test-connector-one (rebalancing?) while
apparantly also stopping the local Ignite-Node (the one in client mode).
This results in neither connector-one nor connector-two to start, since they
both receive the error:

ERROR Task test-connector-one-0 threw an uncaught and unrecoverable
exception (org.apache.kafka.connect.runtime.WorkerSinkTask)
connect                 | java.lang.IllegalStateException: Data streamer has
been closed.
connect                 | 	at
org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.enterBusy(DataStreamerImpl.java:402)
connect                 | 	at
org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addDataInternal(DataStreamerImpl.java:614)
connect                 | 	at
org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addData(DataStreamerImpl.java:676)
connect                 | 	at
org.apache.ignite.stream.kafka.connect.IgniteSinkTask.put(IgniteSinkTask.java:118)
connect                 | 	at
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:464)
connect                 | 	at
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265)
connect                 | 	at
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)
connect                 | 	at
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150)
connect                 | 	at
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
connect                 | 	at
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
connect                 | 	at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect                 | 	at
java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect                 | 	at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
connect                 | 	at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
connect                 | 	at java.lang.Thread.run(Thread.java:745)
connect                 | [2017-11-20 20:09:50,801] ERROR Task is being
killed and will not recover until manually restarted
(org.apache.kafka.connect.runtime.WorkerSinkTask)


What's happening here? How is it 'supposed' to work? 
It's possible to consume both topics at once in one connector.

2. The next question is more general:
The byte array value part stores information that I want to extract and work
with in Ignite, I can deserialize it as a CustomObject. 
Usually, all or most of the data with the same key-String will be needed for
a task.
Are you 'supposed' to deserialize it during the Kafka Connect and actually
store the customObject in the cache?

3. I am having issues doing queries on the data in the cache. Let's say I
want to have all entries with the key (a String) "Test1":

Ignition.setClientMode(true);

        try (Ignite ignite = Ignition.start("test.xml")) {
            IgniteCache<String, byte[]> cache = ignite.cache("TestCache");

            ScanQuery<String, byte[]> scan = new ScanQuery<>(
(*)                    new IgniteBiPredicate<String, byte[]>() {
                        @Override public boolean apply(String key, byte[]
value) {
                            return key.equals("Test1");
                        }
                    }
            );

            Iterable<?> it = cache.query(scan).getAll();

            for (Object next : it){
                System.out.println(next);
            }
        }
 
which results in the error:

>Exception in thread "main" javax.cache.CacheException: class
org.apache.ignite.IgniteCheckedException: Query >execution failed: 

... alot of stacktrace that doesn't look too interesting (to me) .. 
until:

>Caused by: java.lang.ClassCastException: [B cannot be cast to
[Ljava.lang.Byte;

at the line i marked with (*)

The value is stored as byte[] as well, so I am not sure what I am doing
wrong.






I am really in need of some more knowledge about why that stuff is not
working, I was not very successfull at finding it anywhere. 
For question 1) I couldn't find a single up-to-date example, no clue how
you'd use the single/multipleTupleExtractor for instance (especially in
docker environment). Debugging and reading through thousands of loglines is
really tedious.
Question 2) is probably something I'm simply lacking experience in, would be
really helpful getting advice so I don't need to try all possibilities
For question 3 I might miss something obvious, couldn't figure it out for
quite some time now though.

I would be super happy if someone can give me more information!

Best regards,
Svonn



P.S: Here's my docker-compose.yml, I am using a volume to provide the config
files (test.xml) and .jars for the container:

---
version: '3.3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper
    hostname: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    container_name: zookeeper

  broker:
    image: confluentinc/cp-enterprise-kafka
    hostname: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:9092'
      KAFKA_METRIC_REPORTERS:
io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9092
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
    container_name: broker

  schema_registry:
    image: confluentinc/cp-schema-registry
    hostname: schema_registry
    depends_on:
      - zookeeper
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema_registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
    container_name: schemaregistry

  connect:
    image: confluentinc/cp-kafka-connect
    hostname: connect
    depends_on:
      - zookeeper
      - broker
      - schema_registry
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:9092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER:
org.apache.kafka.connect.storage.StringConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL:
'http://schema_registry:8081'
      CONNECT_VALUE_CONVERTER:
org.apache.kafka.connect.converters.ByteArrayConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL:
'http://schema_registry:8081'
      CONNECT_INTERNAL_KEY_CONVERTER:
org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_VALUE_CONVERTER:
org.apache.kafka.connect.json.JsonConverter
      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
    volumes:
      - /usr/local/connector:/etc/kafka-connect/jars
    container_name: connect


  control-center:
    image: confluentinc/cp-enterprise-control-center
    hostname: control-center
    depends_on:
      - zookeeper
      - broker
      - schema_registry
      - connect
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:9092'
      CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      CONTROL_CENTER_CONNECT_CLUSTER: 'connect:8083'
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021
    container_name: controlcenter

  test-data-producer:
    build:test-data-producer/
    depends_on:
    - broker
    - zookeeper
    command: (starts the producer)

  ignite:
    image: "apacheignite/ignite:2.3.0"
    environment:
      - "CONFIG_URI=/usr/local/connector/test.xml"
    volumes:
      - /usr/local/connector:/usr/local/connector
    container_name: ignite












--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Issues with multiple kafka connectors, questions regarding ignite caches

Posted by svonn <sv...@posteo.de>.
I think I figured it out now:

Using the same worker for three different topics might be nonsense either
way, at least in my case - All three topics require different converters,
therefor I'll need three different connectors.
To start multiple instances of connect-standalone, I need to specify the
REST port in the worker.property as well as the file to store the offset.
Following two lines where missing:

rest.port = #differentport#
offset.storage.file.filename=/tmp/#differentname#.offsets

I suspect that this will also work with the distributed mode, I'll have to
try that later.
The issue we encountered simply stops people to use the option of starting
multiple connectors for the same worker property - which probably doesn't
find much use anyway.

Best regards
Sven



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Issues with multiple kafka connectors, questions regarding ignite caches

Posted by Alexey Kukushkin <ku...@gmail.com>.
Hi,

I opened a ticket for this issue:
https://issues.apache.org/jira/browse/IGNITE-7065. If no one pick it up by
next week then I might find time to fix it next week (although I cannot
promise). I am not sure when community is going to release 2.4 but I (or
anyone who fixes it) could share a branch with you and you could grab the
fix from the branch. The fix will be limited to the connector module so it
should be safe to apply it.

Re: Issues with multiple kafka connectors, questions regarding ignite caches

Posted by svonn <sv...@posteo.de>.
Hi,

Apparently the very same bug also affects the standalone kafka connect:
When I add multiple connectors it will start the last one given, shut that
one down (resulting in another 'Data streamer has been closed' exception)
and then start the next one - So there's always only a single one running.
I'm not quite sure how I'm supposed to stream different kafka topics in
different Ignite caches.
Any help would be greatly appreciated.

Best regards,
Svonn



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Issues with multiple kafka connectors, questions regarding ignite caches

Posted by sv...@posteo.de.
Hi!

Thanks for your answer, great insights.
It would be great if you could open a Bug for that. I'll have to go with 
the standalone version for now either way.
I will try to implement a Kafka converter for that as you suggested. 
Let's see if I can get it running!

Best Regards
Svonn


Am 21.11.2017 16:37 schrieb Alexey Kukushkin:
> Hi,
> 
> 	* IgniteSinkTask source code [3] shows that Ignite Kafka Connector
> has this "feature" that stopping any task would stop all other tasks
> running in the same worker. If you look into the stop() method you see
> that the Ignite instances is static and it is closed in the stop
> method. This looks like a usability issue to me. Feel free to open a
> bug or I can open it for you.
> I was able to use multiple sink connectors by running them using the
> apache kafka's connect-standalone.sh script that allows you starting
> multiple connectors. But if I close one of them it really makes the
> other ones unusable making them show that "datastream is already
> closed" error.
> I never used Confluence Control center before so I am not sure why
> starting second connector stops the first one. Anyway, as a
> workaround, you can use the connect-standalone.sh or
> connect-distributed.sh scripts to start the connectors.
> 
> 	* Ignite Kafka Connector stores Kafka's key/value SourceRecord as
> Ignite's key/value Cache.Entry. Unlike Kafka, which allows key to be
> null, Ignite does not support null keys. You will have an error if
> your Kafka SourceRecord's key is null. You can work around null keys
> by providing an "extractor" that builds Cache.Entry from a value. For
> example, this extractor creates keys as value's hash codes and
> converts values to byte arrays of the value's string representation
> (just an example, nothing practical):
> 
>> public class HashCodeKeyValueBuilder implements
>> StreamSingleTupleExtractor<SinkRecord, Integer, byte[]> {
>> @Override public Map.Entry<Integer, byte[]> extract(SinkRecord msg)
>> {
>> return new AbstractMap.SimpleEntry<>(msg.value().hashCode(),
>> msg.value().toString().getBytes());
>> }
>> 
>> }
>> 
>> Then you configure extractor like:
>> 
>> singleTupleExtractorCls=my.domain.HashCodeKeyValueBuilder
>> 
>> StreamSingleTupleExtractor is defined inside ignite-core.jar so you
>> do not need additional dependencies.
>> 
>> Ignite stores custom objects in a cross-platform binary format [1],
>> simple types and arrays of simple types as is.
> 
>> As I understood in your case you have a not-null String key and
>> Byte[] value. In this case Ignite will store this as is so that you
>> create your scan query something like
>> 
>> Query<Cache.Entry<Integer, byte[]>> query = new ScanQuery<>(...)
> 
>> and then deserialise your byte[] array using your custom
>> deserialiser. This has a huge disadvantage that you cannot
>> efficiently work with the objects in Ignite since Ignite does not
>> know about your custom format.
>> 
>> Another option is having custom Kafka converter [2] that would
>> deserialise byte[] array inside Kafka and store them as CustomObject
>> in Ignite. In this case the object would be stored in binary format
>> (you will see o.a.i.i.binary.BinaryObjectImpl in the scan query
>> results). You can work with such objects directly using binary API
>> (most efficient, no serialisation involved, no CustomObject jars
>> required on Ignite server nodes) or work with CustomObject using
>> static type API (less efficient, CustomObject has to be deployed).
> 
>> 
> 
> 
> Links:
> ------
> [1] https://apacheignite.readme.io/docs/binary-marshaller
> [2]
> https://kafka.apache.org/0110/javadoc/org/apache/kafka/connect/storage/Converter.html
> [3]
> https://github.com/gridgain/apache-ignite/blob/cbada5964ee12a8dc44a7db8797f91709b70d831/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java

Re: Issues with multiple kafka connectors, questions regarding ignite caches

Posted by Alexey Kukushkin <ku...@gmail.com>.
Hi,


   1. IgniteSinkTask source code
   <https://github.com/gridgain/apache-ignite/blob/cbada5964ee12a8dc44a7db8797f91709b70d831/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java>
   shows that Ignite Kafka Connector has this "feature" that stopping any task
   would stop all other tasks running in the same worker. If you look into the
   stop() method you see that the Ignite instances is static and it is closed
   in the stop method. This looks like a usability issue to me. Feel free to
   open a bug or I can open it for you.
   I was able to use multiple sink connectors by running them using the
   apache kafka's connect-standalone.sh script that allows you starting
   multiple connectors. But if I close one of them it really makes the other
   ones unusable making them show that "datastream is already closed" error.
   I never used Confluence Control center before so I am not sure why
   starting second connector stops the first one. Anyway, as a workaround, you
   can use the connect-standalone.sh or connect-distributed.sh scripts to
   start the connectors.

   2. Ignite Kafka Connector stores Kafka's key/value SourceRecord as
   Ignite's key/value Cache.Entry. Unlike Kafka, which allows key to be null,
   Ignite does not support null keys. You will have an error if your Kafka
   SourceRecord's key is null. You can work around null keys by providing an
   "extractor" that builds Cache.Entry from a value. For example, this
   extractor creates keys as value's hash codes and converts values to byte
   arrays of the value's string representation (just an example, nothing
   practical):

public class HashCodeKeyValueBuilder implements
StreamSingleTupleExtractor<SinkRecord, Integer, byte[]> {
    @Override public Map.Entry<Integer, byte[]> extract(SinkRecord msg) {
        return new AbstractMap.SimpleEntry<>(msg.value().hashCode(),
msg.value().toString().getBytes());
    }

}

Then you configure extractor like:

singleTupleExtractorCls=my.domain.HashCodeKeyValueBuilder


StreamSingleTupleExtractor is defined inside ignite-core.jar so you do not
need additional dependencies.

Ignite stores custom objects in a cross-platform binary format
<https://apacheignite.readme.io/docs/binary-marshaller>, simple types and
arrays of simple types as is.

As I understood in your case you have a not-null String key and Byte[]
value. In this case Ignite will store this as is so that you create your
scan query something like

Query<Cache.Entry<Integer, byte[]>> query = new ScanQuery<>(...)

and then deserialise your byte[] array using your custom deserialiser. This
has a huge disadvantage that you cannot efficiently work with the objects
in Ignite since Ignite does not know about your custom format.

Another option is having custom Kafka converter
<https://kafka.apache.org/0110/javadoc/org/apache/kafka/connect/storage/Converter.html>
that would deserialise byte[] array inside Kafka and store them as
CustomObject in Ignite. In this case the object would be stored in binary
format (you will see o.a.i.i.binary.BinaryObjectImpl in the scan query
results). You can work with such objects directly using binary API (most
efficient, no serialisation involved, no CustomObject jars required on
Ignite server nodes) or work with CustomObject using static type API (less
efficient, CustomObject has to be deployed).