You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@servicecomb.apache.org by Zhang Lei <zh...@boco.com.cn> on 2019/08/17 07:29:28 UTC

[DISCUSS] the performance and reliability trade-offs of KafkaMessage Publisher

Hi, Team

Our previous discussion on PR[1] about using synchronous or asynchronous methods to send Kafka messages, I think need a trade-off in reliability and performance.

Maybe we give the option to the user by allowing the user to customize some parameters, I have the following suggestions about the Kafka producer parameters:

Key: Messages ordered  and can't be lost, but they are allowed to repeat for FSM

1. Default parameter 

max.in.flight.requests.per.connection is 1 (User modification is prohibited for ordered)
acks is -1
retries is greater than 0

2. Allow users to define most parameters of the Kafka producer, E.g.

acks
retries
buffer.memory
compresstion.type
min.insync.replicas > 1 (use with acks)
replication.factor > min.insync.replicas
timeout.ms
request.timeout.ms
metadata.fetch.timeout.ms
max.block.ms
max.request.size

3. KafkaProducer.send(record, callback) or KafkaProducer.send(record).get()

KafkaProducer.send(record).get() can cause performance problems, but we can fix it by deploying multiple alphas

KafkaProducer.send(record, callback) set max.block.ms=0 & large enough buffer.memory. But we still have to deal with the callback failure scenario.
In asynchronous mode, if the message is sent, but the acknowledgment has not been received, the buffer pool is full, and the configuration file is set to not limit the timeout for the blocking timeout, which means that the production end is blocked all the time. Ensure that data is not lost.

Maybe we can use the parameters to allow users to choose to use synchronous or asynchronous sending mode, and use asynchronous mode to get better performance when there is a reliable network and Kafka cluster.

[1] https://github.com/apache/servicecomb-pack/pull/540 <https://github.com/apache/servicecomb-pack/pull/540>

Lei Zhang


Re: [DISCUSS] the performance and reliability trade-offs of KafkaMessage Publisher

Posted by Willem Jiang <wi...@gmail.com>.
Yeah, that's is why I said we need a kafka cluster on the back end.
It's better we have alpha cluster to receive the events.
I agree we could provide different configuration for the user to
choice between the performance and stability of system.

Willem Jiang

Twitter: willemjiang
Weibo: 姜宁willem

On Sat, Aug 17, 2019 at 4:37 PM Zhang Lei <co...@qq.com> wrote:
>
> Hi, Willem
>
> If you use asynchronous sending, we also need to consider the case of alpha crash
>
> Lei Zhang
>
> > 在 2019年8月17日,下午4:29,Willem Jiang <wi...@gmail.com> 写道:
> >
> > I think we think too much on the Omega side.
> > From my experience, the kafka client can get much better performance
> > by using the async invocation with batch processing.
> > If want to guarantee the delivery of the kafka, we'd better build a
> > kafka cluster on the server side instead of notifying Omega the Alpha
> > cannot process any message.
> >
> > For the publish method, I think we just need to log the event which
> > cannot be send to kafka for further investigation.
> >
> > Willem Jiang
> >
> > Twitter: willemjiang
> > Weibo: 姜宁willem
> >
> > On Sat, Aug 17, 2019 at 4:11 PM Zhang Lei <zh...@boco.com.cn> wrote:
> >>
> >> Add another point:
> >>
> >> We still use the asynchronous send method. Use parameter alpha.feature.akka.channel.kafka.producer.batchSize trigger producer.flush(), the balance of performance and reliability depends on the parameters set by the user.
> >>
> >> The most important thing is that Omega knows that Alpha has made a mistake.
> >>
> >> List<BaseEvent> eventCache = new LinkedList();
> >>
> >> @Override
> >> public void publish(Object data) {
> >>    if(logger.isDebugEnabled()){
> >>        logger.debug("send message [{}] to [{}]", data, topic);
> >>    }
> >>
> >>    try {
> >>        if(data instanceof BaseEvent) {
> >>            BaseEvent event = (BaseEvent) data;
> >>            eventCache.add(event);
> >>            if(eventCache.size()== batchSize){
> >>              try {
> >>                      List<Future> kafkaFutures = new LinkedList<>();
> >>                      for(BaseEvent event : eventCache){
> >>                        kafkaFutures.add(kafkaTemplate.send(topic, event.getGlobalTxId(), Callback...);
> >>                      }
> >>                      producer.flush();
> >>                      for (Future future : kafkaFutures) {
> >>                        future.get();
> >>                      }
> >>                      eventCache.clear();
> >>                           } catch (Exception ex) {
> >>                  logger.warn("Sending events to Kafka failed", ex);
> >>                  throw new Exception("Commit failed as send to Kafka failed",ex);
> >>                   }
> >>            }
> >>        }else{
> >>            throw new UnsupportedOperationException("data must be BaseEvent type");
> >>        }
> >>    } catch (InterruptedException | ExecutionException | UnsupportedOperationException e) {
> >>        logger.error("publish Exception = [{}]", e.getMessage(), e);
> >>        throw new RuntimeException(e);
> >>    }
> >> }
> >>
> >>
> >>
> >>> 在 2019年8月17日,下午3:29,Zhang Lei <zh...@boco.com.cn> 写道:
> >>>
> >>> Hi, Team
> >>>
> >>> Our previous discussion on PR[1] about using synchronous or asynchronous methods to send Kafka messages, I think need a trade-off in reliability and performance.
> >>>
> >>> Maybe we give the option to the user by allowing the user to customize some parameters, I have the following suggestions about the Kafka producer parameters:
> >>>
> >>> Key: Messages ordered  and can't be lost, but they are allowed to repeat for FSM
> >>>
> >>> 1. Default parameter
> >>>
> >>> max.in.flight.requests.per.connection is 1 (User modification is prohibited for ordered)
> >>> acks is -1
> >>> retries is greater than 0
> >>>
> >>> 2. Allow users to define most parameters of the Kafka producer, E.g.
> >>>
> >>> acks
> >>> retries
> >>> buffer.memory
> >>> compresstion.type
> >>> min.insync.replicas > 1 (use with acks)
> >>> replication.factor > min.insync.replicas
> >>> timeout.ms
> >>> request.timeout.ms
> >>> metadata.fetch.timeout.ms
> >>> max.block.ms
> >>> max.request.size
> >>>
> >>> 3. KafkaProducer.send(record, callback) or KafkaProducer.send(record).get()
> >>>
> >>> KafkaProducer.send(record).get() can cause performance problems, but we can fix it by deploying multiple alphas
> >>>
> >>> KafkaProducer.send(record, callback) set max.block.ms=0 & large enough buffer.memory. But we still have to deal with the callback failure scenario.
> >>> In asynchronous mode, if the message is sent, but the acknowledgment has not been received, the buffer pool is full, and the configuration file is set to not limit the timeout for the blocking timeout, which means that the production end is blocked all the time. Ensure that data is not lost.
> >>>
> >>> Maybe we can use the parameters to allow users to choose to use synchronous or asynchronous sending mode, and use asynchronous mode to get better performance when there is a reliable network and Kafka cluster.
> >>>
> >>> [1] https://github.com/apache/servicecomb-pack/pull/540 <https://github.com/apache/servicecomb-pack/pull/540>
> >>>
> >>> Lei Zhang
> >>>
> >>
> >
>
>
>

Re: [DISCUSS] the performance and reliability trade-offs of KafkaMessage Publisher

Posted by Zhang Lei <co...@qq.com>.
Hi, Willem

If you use asynchronous sending, we also need to consider the case of alpha crash

Lei Zhang

> 在 2019年8月17日,下午4:29,Willem Jiang <wi...@gmail.com> 写道:
> 
> I think we think too much on the Omega side.
> From my experience, the kafka client can get much better performance
> by using the async invocation with batch processing.
> If want to guarantee the delivery of the kafka, we'd better build a
> kafka cluster on the server side instead of notifying Omega the Alpha
> cannot process any message.
> 
> For the publish method, I think we just need to log the event which
> cannot be send to kafka for further investigation.
> 
> Willem Jiang
> 
> Twitter: willemjiang
> Weibo: 姜宁willem
> 
> On Sat, Aug 17, 2019 at 4:11 PM Zhang Lei <zh...@boco.com.cn> wrote:
>> 
>> Add another point:
>> 
>> We still use the asynchronous send method. Use parameter alpha.feature.akka.channel.kafka.producer.batchSize trigger producer.flush(), the balance of performance and reliability depends on the parameters set by the user.
>> 
>> The most important thing is that Omega knows that Alpha has made a mistake.
>> 
>> List<BaseEvent> eventCache = new LinkedList();
>> 
>> @Override
>> public void publish(Object data) {
>>    if(logger.isDebugEnabled()){
>>        logger.debug("send message [{}] to [{}]", data, topic);
>>    }
>> 
>>    try {
>>        if(data instanceof BaseEvent) {
>>            BaseEvent event = (BaseEvent) data;
>>            eventCache.add(event);
>>            if(eventCache.size()== batchSize){
>>              try {
>>                      List<Future> kafkaFutures = new LinkedList<>();
>>                      for(BaseEvent event : eventCache){
>>                        kafkaFutures.add(kafkaTemplate.send(topic, event.getGlobalTxId(), Callback...);
>>                      }
>>                      producer.flush();
>>                      for (Future future : kafkaFutures) {
>>                        future.get();
>>                      }
>>                      eventCache.clear();
>>                           } catch (Exception ex) {
>>                  logger.warn("Sending events to Kafka failed", ex);
>>                  throw new Exception("Commit failed as send to Kafka failed",ex);
>>                   }
>>            }
>>        }else{
>>            throw new UnsupportedOperationException("data must be BaseEvent type");
>>        }
>>    } catch (InterruptedException | ExecutionException | UnsupportedOperationException e) {
>>        logger.error("publish Exception = [{}]", e.getMessage(), e);
>>        throw new RuntimeException(e);
>>    }
>> }
>> 
>> 
>> 
>>> 在 2019年8月17日,下午3:29,Zhang Lei <zh...@boco.com.cn> 写道:
>>> 
>>> Hi, Team
>>> 
>>> Our previous discussion on PR[1] about using synchronous or asynchronous methods to send Kafka messages, I think need a trade-off in reliability and performance.
>>> 
>>> Maybe we give the option to the user by allowing the user to customize some parameters, I have the following suggestions about the Kafka producer parameters:
>>> 
>>> Key: Messages ordered  and can't be lost, but they are allowed to repeat for FSM
>>> 
>>> 1. Default parameter
>>> 
>>> max.in.flight.requests.per.connection is 1 (User modification is prohibited for ordered)
>>> acks is -1
>>> retries is greater than 0
>>> 
>>> 2. Allow users to define most parameters of the Kafka producer, E.g.
>>> 
>>> acks
>>> retries
>>> buffer.memory
>>> compresstion.type
>>> min.insync.replicas > 1 (use with acks)
>>> replication.factor > min.insync.replicas
>>> timeout.ms
>>> request.timeout.ms
>>> metadata.fetch.timeout.ms
>>> max.block.ms
>>> max.request.size
>>> 
>>> 3. KafkaProducer.send(record, callback) or KafkaProducer.send(record).get()
>>> 
>>> KafkaProducer.send(record).get() can cause performance problems, but we can fix it by deploying multiple alphas
>>> 
>>> KafkaProducer.send(record, callback) set max.block.ms=0 & large enough buffer.memory. But we still have to deal with the callback failure scenario.
>>> In asynchronous mode, if the message is sent, but the acknowledgment has not been received, the buffer pool is full, and the configuration file is set to not limit the timeout for the blocking timeout, which means that the production end is blocked all the time. Ensure that data is not lost.
>>> 
>>> Maybe we can use the parameters to allow users to choose to use synchronous or asynchronous sending mode, and use asynchronous mode to get better performance when there is a reliable network and Kafka cluster.
>>> 
>>> [1] https://github.com/apache/servicecomb-pack/pull/540 <https://github.com/apache/servicecomb-pack/pull/540>
>>> 
>>> Lei Zhang
>>> 
>> 
> 




Re: [DISCUSS] the performance and reliability trade-offs of KafkaMessage Publisher

Posted by Willem Jiang <wi...@gmail.com>.
I think we think too much on the Omega side.
From my experience, the kafka client can get much better performance
by using the async invocation with batch processing.
If want to guarantee the delivery of the kafka, we'd better build a
kafka cluster on the server side instead of notifying Omega the Alpha
cannot process any message.

For the publish method, I think we just need to log the event which
cannot be send to kafka for further investigation.

Willem Jiang

Twitter: willemjiang
Weibo: 姜宁willem

On Sat, Aug 17, 2019 at 4:11 PM Zhang Lei <zh...@boco.com.cn> wrote:
>
> Add another point:
>
> We still use the asynchronous send method. Use parameter alpha.feature.akka.channel.kafka.producer.batchSize trigger producer.flush(), the balance of performance and reliability depends on the parameters set by the user.
>
> The most important thing is that Omega knows that Alpha has made a mistake.
>
> List<BaseEvent> eventCache = new LinkedList();
>
> @Override
> public void publish(Object data) {
>     if(logger.isDebugEnabled()){
>         logger.debug("send message [{}] to [{}]", data, topic);
>     }
>
>     try {
>         if(data instanceof BaseEvent) {
>             BaseEvent event = (BaseEvent) data;
>             eventCache.add(event);
>             if(eventCache.size()== batchSize){
>               try {
>                       List<Future> kafkaFutures = new LinkedList<>();
>                       for(BaseEvent event : eventCache){
>                         kafkaFutures.add(kafkaTemplate.send(topic, event.getGlobalTxId(), Callback...);
>                       }
>                       producer.flush();
>                       for (Future future : kafkaFutures) {
>                         future.get();
>                       }
>                       eventCache.clear();
>                            } catch (Exception ex) {
>                   logger.warn("Sending events to Kafka failed", ex);
>                   throw new Exception("Commit failed as send to Kafka failed",ex);
>                    }
>             }
>         }else{
>             throw new UnsupportedOperationException("data must be BaseEvent type");
>         }
>     } catch (InterruptedException | ExecutionException | UnsupportedOperationException e) {
>         logger.error("publish Exception = [{}]", e.getMessage(), e);
>         throw new RuntimeException(e);
>     }
> }
>
>
>
> > 在 2019年8月17日,下午3:29,Zhang Lei <zh...@boco.com.cn> 写道:
> >
> > Hi, Team
> >
> > Our previous discussion on PR[1] about using synchronous or asynchronous methods to send Kafka messages, I think need a trade-off in reliability and performance.
> >
> > Maybe we give the option to the user by allowing the user to customize some parameters, I have the following suggestions about the Kafka producer parameters:
> >
> > Key: Messages ordered  and can't be lost, but they are allowed to repeat for FSM
> >
> > 1. Default parameter
> >
> > max.in.flight.requests.per.connection is 1 (User modification is prohibited for ordered)
> > acks is -1
> > retries is greater than 0
> >
> > 2. Allow users to define most parameters of the Kafka producer, E.g.
> >
> > acks
> > retries
> > buffer.memory
> > compresstion.type
> > min.insync.replicas > 1 (use with acks)
> > replication.factor > min.insync.replicas
> > timeout.ms
> > request.timeout.ms
> > metadata.fetch.timeout.ms
> > max.block.ms
> > max.request.size
> >
> > 3. KafkaProducer.send(record, callback) or KafkaProducer.send(record).get()
> >
> > KafkaProducer.send(record).get() can cause performance problems, but we can fix it by deploying multiple alphas
> >
> > KafkaProducer.send(record, callback) set max.block.ms=0 & large enough buffer.memory. But we still have to deal with the callback failure scenario.
> > In asynchronous mode, if the message is sent, but the acknowledgment has not been received, the buffer pool is full, and the configuration file is set to not limit the timeout for the blocking timeout, which means that the production end is blocked all the time. Ensure that data is not lost.
> >
> > Maybe we can use the parameters to allow users to choose to use synchronous or asynchronous sending mode, and use asynchronous mode to get better performance when there is a reliable network and Kafka cluster.
> >
> > [1] https://github.com/apache/servicecomb-pack/pull/540 <https://github.com/apache/servicecomb-pack/pull/540>
> >
> > Lei Zhang
> >
>

Re: [DISCUSS] the performance and reliability trade-offs of KafkaMessage Publisher

Posted by Zhang Lei <zh...@boco.com.cn>.
Add another point:

We still use the asynchronous send method. Use parameter alpha.feature.akka.channel.kafka.producer.batchSize trigger producer.flush(), the balance of performance and reliability depends on the parameters set by the user. 

The most important thing is that Omega knows that Alpha has made a mistake.

List<BaseEvent> eventCache = new LinkedList();

@Override
public void publish(Object data) {
    if(logger.isDebugEnabled()){
        logger.debug("send message [{}] to [{}]", data, topic);
    }

    try {
        if(data instanceof BaseEvent) {
            BaseEvent event = (BaseEvent) data;
            eventCache.add(event);
            if(eventCache.size()== batchSize){
              try {
	              List<Future> kafkaFutures = new LinkedList<>();
	              for(BaseEvent event : eventCache){
	                kafkaFutures.add(kafkaTemplate.send(topic, event.getGlobalTxId(), Callback...);
	              }
	              producer.flush();
	              for (Future future : kafkaFutures) {
	                future.get();
	              }
	              eventCache.clear();
			   } catch (Exception ex) {
                  logger.warn("Sending events to Kafka failed", ex);
                  throw new Exception("Commit failed as send to Kafka failed",ex);
        	   }
            }
        }else{
            throw new UnsupportedOperationException("data must be BaseEvent type");
        }
    } catch (InterruptedException | ExecutionException | UnsupportedOperationException e) {
        logger.error("publish Exception = [{}]", e.getMessage(), e);
        throw new RuntimeException(e);
    }
}



> 在 2019年8月17日,下午3:29,Zhang Lei <zh...@boco.com.cn> 写道:
> 
> Hi, Team
> 
> Our previous discussion on PR[1] about using synchronous or asynchronous methods to send Kafka messages, I think need a trade-off in reliability and performance.
> 
> Maybe we give the option to the user by allowing the user to customize some parameters, I have the following suggestions about the Kafka producer parameters:
> 
> Key: Messages ordered  and can't be lost, but they are allowed to repeat for FSM
> 
> 1. Default parameter 
> 
> max.in.flight.requests.per.connection is 1 (User modification is prohibited for ordered)
> acks is -1
> retries is greater than 0
> 
> 2. Allow users to define most parameters of the Kafka producer, E.g.
> 
> acks
> retries
> buffer.memory
> compresstion.type
> min.insync.replicas > 1 (use with acks)
> replication.factor > min.insync.replicas
> timeout.ms
> request.timeout.ms
> metadata.fetch.timeout.ms
> max.block.ms
> max.request.size
> 
> 3. KafkaProducer.send(record, callback) or KafkaProducer.send(record).get()
> 
> KafkaProducer.send(record).get() can cause performance problems, but we can fix it by deploying multiple alphas
> 
> KafkaProducer.send(record, callback) set max.block.ms=0 & large enough buffer.memory. But we still have to deal with the callback failure scenario.
> In asynchronous mode, if the message is sent, but the acknowledgment has not been received, the buffer pool is full, and the configuration file is set to not limit the timeout for the blocking timeout, which means that the production end is blocked all the time. Ensure that data is not lost.
> 
> Maybe we can use the parameters to allow users to choose to use synchronous or asynchronous sending mode, and use asynchronous mode to get better performance when there is a reliable network and Kafka cluster.
> 
> [1] https://github.com/apache/servicecomb-pack/pull/540 <https://github.com/apache/servicecomb-pack/pull/540>
> 
> Lei Zhang
>