You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by vbm <bm...@gmail.com> on 2018/07/03 16:53:36 UTC

Metrics for IgniteDataStreamer

Hi

As part of our POC we wanted to compare the ingestion in to ignite using
Kafka Connect and Ignite Data Streamer.

For comparison, what are the metrics that we can monitor to measure the
performance. 


Regards,
Vishwas



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Questions on IgniteDataStreamer

Posted by vbm <bm...@gmail.com>.
Hi,

Can anyone provide some info on this ?

Regards,
Vishwas



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Metrics for IgniteDataStreamer

Posted by Evgenii Zhuravlev <e....@gmail.com>.
Hi,

It's possible to set only one DataStreamer to the KafkaStreamer, So, I'd
recommend using one KafkaStreamer per cache.

Evgenii

2018-07-05 20:03 GMT+03:00 vbm <bm...@gmail.com>:

> HI Evangii,
>
> Thanks for the reply. I have some more question regarding ignite data
> streamer.
>
> Below is our scenario:
> We have many Kafka topics and now we want to use ignite data streamer to
> pull data to ignite cache and data is such that each kafka topic correspond
> to a cache in ignite.
>
> From what I understand we need to have multiple KafkaStreamer which has one
> to one mapping to ignite data streamer which eventually writes to cache.
> Correct me if I am wrong here.
>
> If there are multiple kafka topics, what is the best approach to load data
> to caches.
>
>
> Regards,
> Vishwas
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>

Re: Metrics for IgniteDataStreamer

Posted by vbm <bm...@gmail.com>.
HI Evangii,

Thanks for the reply. I have some more question regarding ignite data
streamer.

Below is our scenario:
We have many Kafka topics and now we want to use ignite data streamer to
pull data to ignite cache and data is such that each kafka topic correspond
to a cache in ignite.

From what I understand we need to have multiple KafkaStreamer which has one
to one mapping to ignite data streamer which eventually writes to cache.
Correct me if I am wrong here.

If there are multiple kafka topics, what is the best approach to load data
to caches.


Regards,
Vishwas



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Metrics for IgniteDataStreamer

Posted by Evgenii Zhuravlev <e....@gmail.com>.
Well, then you can just check the cache size after the certain period for
each method and compare it. I'm not sure that the metric for the thing you
want will make any sense - if you will have any pauses in ingestion, this
metric won't be informative at all. At the same time, absolutely the same
information can be rook from cache.size.

Evgenii

2018-07-05 12:06 GMT+03:00 vbm <bm...@gmail.com>:

> Hi Evgenii,
>
> To compare the 2 ingestion methods (DataStreamer and KafkaConnect), we
> wanted to know what are the key parameters that needs to be monitored. For
> example: How fast the data is being put in cache.
>
>
> Regards,
> Vishwas
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>

Re: Metrics for IgniteDataStreamer

Posted by vbm <bm...@gmail.com>.
Hi Evgenii,

To compare the 2 ingestion methods (DataStreamer and KafkaConnect), we
wanted to know what are the key parameters that needs to be monitored. For
example: How fast the data is being put in cache. 


Regards,
Vishwas



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Metrics for IgniteDataStreamer

Posted by Evgenii Zhuravlev <e....@gmail.com>.
Hi,

What kind of metrics will help you? Why just checking of the cache size
won't be enough?

Regards,
Evgenii

2018-07-03 19:53 GMT+03:00 vbm <bm...@gmail.com>:

> Hi
>
> As part of our POC we wanted to compare the ingestion in to ignite using
> Kafka Connect and Ignite Data Streamer.
>
> For comparison, what are the metrics that we can monitor to measure the
> performance.
>
>
> Regards,
> Vishwas
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>

Re: Questions on IgniteDataStreamer

Posted by Ilya Kasnacheev <il...@gmail.com>.
Hello!

Can you please at least share the exceptions you are getting?

Regards,
-- 
Ilya Kasnacheev


сб, 1 июн. 2019 г. в 13:55, Om Thacker <om...@gmail.com>:

> Hello vbm,
>
> I am working on the exact same problem. Did you find the solution for the
> same.
> I am using following code in my client application which will listen to
> kafka connect (confluent).
>
> I have one to one mapping for kafka topic and ignite cache. When there is
> an
> insert into db, the kafka listener listens that and using gson library i am
> converting json to object and the stmr.addData() works fine. But while
> updating the value in db, i am facing marshller error.I tried to use
> cache.put() method ,but it gives me cachewriteexception .
>
>
> @KafkaListener(topics = { "kafka-Users" })
>         public void listenUsers(String message) {
>                 logger.error(message);
>                 ObjectMapper mapper = new ObjectMapper();
>                 JsonNode rootNode;
>                 try {
>                         rootNode = mapper.readTree(message);
>                         Users user = new Users();
>                         IgniteDataStreamer<Long, Users> stmr =
> ignite.dataStreamer(IgniteProperties.USERS_CACHE.getName());
> //                      stmr.allowOverwrite(true);
>
>                         /*
>                          * stmr.receiver(new StreamTransformer<Long,
> Users>() {
>                          *
>                          * @Override public Object
> process(MutableEntry<Long, Users> entry,
> Object...
>                          * arguments) throws EntryProcessorException {
> return null; }
>                          *
>                          * });
>                          */
>
>                         /*
>                          * stmr.receiver(StreamTransformer.from((e, arg)
> -> { Users val =
> e.getValue();
>                          * System.out.println(val+" user from reciever
> $$$$$$$$$"); return null;
> }));
>                          */
>
>                         Gson gson = new
>
> GsonBuilder().setFieldNamingPolicy(FieldNamingPolicy.UPPER_CAMEL_CASE).create();
>                         user =
> gson.fromJson(rootNode.get("payload").toString(), Users.class);
>
> stmr.addData(rootNode.get("payload").get("UsersKey").asLong(), user);
>                         stmr.flush(); //
> //                      stmr.allowOverwrite(true);
>                 } catch (Exception e) {
>                         e.printStackTrace();
>                 }
>         }
>
>
>
>
> can you please share your solution for the same.
> Thanks,
> Om Thacker
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>

Re: Questions on IgniteDataStreamer

Posted by Om Thacker <om...@gmail.com>.
Hello vbm,

I am working on the exact same problem. Did you find the solution for the
same.
I am using following code in my client application which will listen to
kafka connect (confluent).

I have one to one mapping for kafka topic and ignite cache. When there is an
insert into db, the kafka listener listens that and using gson library i am
converting json to object and the stmr.addData() works fine. But while
updating the value in db, i am facing marshller error.I tried to use
cache.put() method ,but it gives me cachewriteexception .


@KafkaListener(topics = { "kafka-Users" })
	public void listenUsers(String message) {
		logger.error(message);
		ObjectMapper mapper = new ObjectMapper();
		JsonNode rootNode;
		try {
			rootNode = mapper.readTree(message);
			Users user = new Users();
			IgniteDataStreamer<Long, Users> stmr =
ignite.dataStreamer(IgniteProperties.USERS_CACHE.getName());
//			stmr.allowOverwrite(true);

			/*
			 * stmr.receiver(new StreamTransformer<Long, Users>() {
			 * 
			 * @Override public Object process(MutableEntry<Long, Users> entry,
Object...
			 * arguments) throws EntryProcessorException { return null; }
			 * 
			 * });
			 */

			/*
			 * stmr.receiver(StreamTransformer.from((e, arg) -> { Users val =
e.getValue();
			 * System.out.println(val+" user from reciever $$$$$$$$$"); return null;
}));
			 */

			Gson gson = new
GsonBuilder().setFieldNamingPolicy(FieldNamingPolicy.UPPER_CAMEL_CASE).create();
			user = gson.fromJson(rootNode.get("payload").toString(), Users.class);
			stmr.addData(rootNode.get("payload").get("UsersKey").asLong(), user);
			stmr.flush(); //
//			stmr.allowOverwrite(true);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}




can you please share your solution for the same.
Thanks,
Om Thacker



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/