You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by Alexey <js...@list.ru> on 2016/05/25 12:54:07 UTC

Need help with Ignite KafkaStreamer

Hi All,

I am trying to start KafkaStreamer and I really need help.

I used an example http://apacheignite.gridgain.org/docs/getting-started as a
pattern and added only necessary properties. I consider that incorrect
properties are the main problem. But I didn't find any information what
values are correct. When I start KafkaStreamer and send message I see errors
in Ignite logs that properties are not valid and warning about ignoring my
message

Could you please take a look at log errors and my function and advice how I
can fix it.

[15:58:44,569][INFO ][ignite-#57%null%][VerifiableProperties] Verifying
properties
[15:58:44,615][WARN ][ignite-#57%null%][VerifiableProperties] Property
bootstrap.servers is not valid
[15:58:44,615][INFO ][ignite-#57%null%][VerifiableProperties] Property
group.id is overridden to test
[15:58:44,616][WARN ][ignite-#57%null%][VerifiableProperties] Property
key.deserializer is not valid
[15:58:44,616][WARN ][ignite-#57%null%][VerifiableProperties] Property
key.serializer is not valid
[15:58:44,616][WARN ][ignite-#57%null%][VerifiableProperties] Property
value.deserializer is not valid
[15:58:44,616][WARN ][ignite-#57%null%][VerifiableProperties] Property
value.serializer is not valid

[15:59:45,077][WARN ][pool-4-thread-3][root] Message is ignored due to an
error [msg=MessageAndMetadata(test-topic,0,Message(magic = 0, attributes =
0, crc = 1312744161, key = java.nio.HeapByteBuffer[pos=0 lim=4 cap=13],
payload = java.nio.HeapByteBuffer[pos=0 lim=5
cap=5]),94,kafka.serializer.StringDecoder@6079d2fa,kafka.serializer.StringDecoder@6ee18190)]

@Override
    public void execute(ServiceContext ctx) throws Exception {
        KafkaStreamer<String, String, String> kafkaStreamer = new
KafkaStreamer<>();

        try (IgniteDataStreamer<String, String> stmr =
ignite.dataStreamer(stmCache.getName())) {
            // Allow data updates.
            stmr.allowOverwrite(true);

            kafkaStreamer.setIgnite(ignite);
            kafkaStreamer.setStreamer(stmr);

            // set the topic
            kafkaStreamer.setTopic("test-topic");

            // set the number of threads to process Kafka streams
            kafkaStreamer.setThreads(4);

            // set Kafka consumer configurations
            Properties props = new Properties();
            props.put("key.serializer",
                   
"org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer",
                   
"org.apache.kafka.common.serialization.StringSerializer");

            props.put("bootstrap.servers", "localhost:9092");
            props.put("zookeeper.connect", "localhost:2181");
            props.put("group.id", "test");
            props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");

            ConsumerConfig config = new ConsumerConfig(props);
            kafkaStreamer.setConsumerConfig(config);

            // set decoders
            StringDecoder keyDecoder = new StringDecoder(null);
            StringDecoder valueDecoder = new StringDecoder(null);
            kafkaStreamer.setKeyDecoder(keyDecoder);
            kafkaStreamer.setValueDecoder(valueDecoder);

            kafkaStreamer.start();
            System.out.println("Kafka streamer started!");
        }
    }




--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Need-help-with-Ignite-KafkaStreamer-tp5186.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: Need help with Ignite KafkaStreamer

Posted by vkulichenko <va...@gmail.com>.
How many entries did you load? IgniteDataStreamer buffers the data and most
likely it just wait for buffers to fill up. You can try setting the time
interval after which buffers will be flushed even if they are not full:

stmr.autoFlushFrequency(1000); // Set to 1 sec.

This way all your data will be eventually propagated into the cache.

-Val



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Need-help-with-Ignite-KafkaStreamer-tp5186p5323.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: Need help with Ignite KafkaStreamer

Posted by Alexey <js...@list.ru>.
I removed  'try (...) {}' block and now I don't see any errors in the code.
I added some logs in KafkaStreamer and according to the log adding data to
steamer finished without exception and any errors. But unfortunately the
cache that used for streamer is still empty.

[15:12:44,450][WARN ][pool-4-thread-1][root] Cache name KafkaCache
[15:12:44,450][WARN ][pool-4-thread-1][root] Msg.key key1
[15:12:44,451][WARN ][pool-4-thread-1][root] Msg.message test-msg2
[15:12:44,451][WARN ][pool-4-thread-1][root] Success!!!

I tried to add receiver (or visitor) to the streamer, check the size of
cache after sending kafka message but it seems streamer does not receive any
messages.
How can I check that KafkaStreamer works properly?



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Need-help-with-Ignite-KafkaStreamer-tp5186p5312.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: Need help with Ignite KafkaStreamer

Posted by vkulichenko <va...@gmail.com>.
You should not use try-with-resources block, because it implicitly closes the
streamer at the end, but you want to keep it opened. Simply remove the 'try
(...) {}' from the code and it should work.

-Val



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Need-help-with-Ignite-KafkaStreamer-tp5186p5303.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: Need help with Ignite KafkaStreamer

Posted by Alexey <js...@list.ru>.
Hi, Thank you for your answer!
I build jar from latest master and rerun the test. Now I see the following
error and stack trace. But I still don't have any ideas what's wrong with my
example. Could you please advice how to fix this error?


[18:14:01,391][ERROR][pool-4-thread-1][root] Message is ignored due to an
error [msg=MessageAndMetadata(test-topic,0,Message(magic = 0, attributes =
0, crc = 3139897862, key = java.nio.HeapByteBuffer[pos=0 lim=4 cap=14],
payload = java.nio.HeapByteBuffer[pos=0 lim=6
cap=6]),126,kafka.serializer.StringDecoder@5e36b9b9,kafka.serializer.StringDecoder@5e36b9b9)]
java.lang.IllegalStateException: Data streamer has been closed.
	at
org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.enterBusy(DataStreamerImpl.java:355)
	at
org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addDataInternal(DataStreamerImpl.java:550)
	at
org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addData(DataStreamerImpl.java:604)
	at
org.apache.ignite.stream.kafka.KafkaStreamer$1.run(KafkaStreamer.java:180)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Need-help-with-Ignite-KafkaStreamer-tp5186p5284.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: Need help with Ignite KafkaStreamer

Posted by vkulichenko <va...@gmail.com>.
Hi,

The logging is incorrect there and the exception that causes the is lost. I
just fixed this in master. Can you try building from master and rerun the
test? You should see traces in the log.

-Val



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Need-help-with-Ignite-KafkaStreamer-tp5186p5198.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.