You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@servicecomb.apache.org by Zhang Lei <zh...@boco.com.cn> on 2019/08/17 07:29:28 UTC
[DISCUSS] the performance and reliability trade-offs of KafkaMessage
Publisher
Hi, Team
Our previous discussion on PR[1] about using synchronous or asynchronous methods to send Kafka messages, I think need a trade-off in reliability and performance.
Maybe we give the option to the user by allowing the user to customize some parameters, I have the following suggestions about the Kafka producer parameters:
Key: Messages ordered and can't be lost, but they are allowed to repeat for FSM
1. Default parameter
max.in.flight.requests.per.connection is 1 (User modification is prohibited for ordered)
acks is -1
retries is greater than 0
2. Allow users to define most parameters of the Kafka producer, E.g.
acks
retries
buffer.memory
compresstion.type
min.insync.replicas > 1 (use with acks)
replication.factor > min.insync.replicas
timeout.ms
request.timeout.ms
metadata.fetch.timeout.ms
max.block.ms
max.request.size
3. KafkaProducer.send(record, callback) or KafkaProducer.send(record).get()
KafkaProducer.send(record).get() can cause performance problems, but we can fix it by deploying multiple alphas
KafkaProducer.send(record, callback) set max.block.ms=0 & large enough buffer.memory. But we still have to deal with the callback failure scenario.
In asynchronous mode, if the message is sent, but the acknowledgment has not been received, the buffer pool is full, and the configuration file is set to not limit the timeout for the blocking timeout, which means that the production end is blocked all the time. Ensure that data is not lost.
Maybe we can use the parameters to allow users to choose to use synchronous or asynchronous sending mode, and use asynchronous mode to get better performance when there is a reliable network and Kafka cluster.
[1] https://github.com/apache/servicecomb-pack/pull/540 <https://github.com/apache/servicecomb-pack/pull/540>
Lei Zhang
Re: [DISCUSS] the performance and reliability trade-offs of
KafkaMessage Publisher
Posted by Willem Jiang <wi...@gmail.com>.
Yeah, that's is why I said we need a kafka cluster on the back end.
It's better we have alpha cluster to receive the events.
I agree we could provide different configuration for the user to
choice between the performance and stability of system.
Willem Jiang
Twitter: willemjiang
Weibo: 姜宁willem
On Sat, Aug 17, 2019 at 4:37 PM Zhang Lei <co...@qq.com> wrote:
>
> Hi, Willem
>
> If you use asynchronous sending, we also need to consider the case of alpha crash
>
> Lei Zhang
>
> > 在 2019年8月17日,下午4:29,Willem Jiang <wi...@gmail.com> 写道:
> >
> > I think we think too much on the Omega side.
> > From my experience, the kafka client can get much better performance
> > by using the async invocation with batch processing.
> > If want to guarantee the delivery of the kafka, we'd better build a
> > kafka cluster on the server side instead of notifying Omega the Alpha
> > cannot process any message.
> >
> > For the publish method, I think we just need to log the event which
> > cannot be send to kafka for further investigation.
> >
> > Willem Jiang
> >
> > Twitter: willemjiang
> > Weibo: 姜宁willem
> >
> > On Sat, Aug 17, 2019 at 4:11 PM Zhang Lei <zh...@boco.com.cn> wrote:
> >>
> >> Add another point:
> >>
> >> We still use the asynchronous send method. Use parameter alpha.feature.akka.channel.kafka.producer.batchSize trigger producer.flush(), the balance of performance and reliability depends on the parameters set by the user.
> >>
> >> The most important thing is that Omega knows that Alpha has made a mistake.
> >>
> >> List<BaseEvent> eventCache = new LinkedList();
> >>
> >> @Override
> >> public void publish(Object data) {
> >> if(logger.isDebugEnabled()){
> >> logger.debug("send message [{}] to [{}]", data, topic);
> >> }
> >>
> >> try {
> >> if(data instanceof BaseEvent) {
> >> BaseEvent event = (BaseEvent) data;
> >> eventCache.add(event);
> >> if(eventCache.size()== batchSize){
> >> try {
> >> List<Future> kafkaFutures = new LinkedList<>();
> >> for(BaseEvent event : eventCache){
> >> kafkaFutures.add(kafkaTemplate.send(topic, event.getGlobalTxId(), Callback...);
> >> }
> >> producer.flush();
> >> for (Future future : kafkaFutures) {
> >> future.get();
> >> }
> >> eventCache.clear();
> >> } catch (Exception ex) {
> >> logger.warn("Sending events to Kafka failed", ex);
> >> throw new Exception("Commit failed as send to Kafka failed",ex);
> >> }
> >> }
> >> }else{
> >> throw new UnsupportedOperationException("data must be BaseEvent type");
> >> }
> >> } catch (InterruptedException | ExecutionException | UnsupportedOperationException e) {
> >> logger.error("publish Exception = [{}]", e.getMessage(), e);
> >> throw new RuntimeException(e);
> >> }
> >> }
> >>
> >>
> >>
> >>> 在 2019年8月17日,下午3:29,Zhang Lei <zh...@boco.com.cn> 写道:
> >>>
> >>> Hi, Team
> >>>
> >>> Our previous discussion on PR[1] about using synchronous or asynchronous methods to send Kafka messages, I think need a trade-off in reliability and performance.
> >>>
> >>> Maybe we give the option to the user by allowing the user to customize some parameters, I have the following suggestions about the Kafka producer parameters:
> >>>
> >>> Key: Messages ordered and can't be lost, but they are allowed to repeat for FSM
> >>>
> >>> 1. Default parameter
> >>>
> >>> max.in.flight.requests.per.connection is 1 (User modification is prohibited for ordered)
> >>> acks is -1
> >>> retries is greater than 0
> >>>
> >>> 2. Allow users to define most parameters of the Kafka producer, E.g.
> >>>
> >>> acks
> >>> retries
> >>> buffer.memory
> >>> compresstion.type
> >>> min.insync.replicas > 1 (use with acks)
> >>> replication.factor > min.insync.replicas
> >>> timeout.ms
> >>> request.timeout.ms
> >>> metadata.fetch.timeout.ms
> >>> max.block.ms
> >>> max.request.size
> >>>
> >>> 3. KafkaProducer.send(record, callback) or KafkaProducer.send(record).get()
> >>>
> >>> KafkaProducer.send(record).get() can cause performance problems, but we can fix it by deploying multiple alphas
> >>>
> >>> KafkaProducer.send(record, callback) set max.block.ms=0 & large enough buffer.memory. But we still have to deal with the callback failure scenario.
> >>> In asynchronous mode, if the message is sent, but the acknowledgment has not been received, the buffer pool is full, and the configuration file is set to not limit the timeout for the blocking timeout, which means that the production end is blocked all the time. Ensure that data is not lost.
> >>>
> >>> Maybe we can use the parameters to allow users to choose to use synchronous or asynchronous sending mode, and use asynchronous mode to get better performance when there is a reliable network and Kafka cluster.
> >>>
> >>> [1] https://github.com/apache/servicecomb-pack/pull/540 <https://github.com/apache/servicecomb-pack/pull/540>
> >>>
> >>> Lei Zhang
> >>>
> >>
> >
>
>
>
Re: [DISCUSS] the performance and reliability trade-offs of
KafkaMessage Publisher
Posted by Zhang Lei <co...@qq.com>.
Hi, Willem
If you use asynchronous sending, we also need to consider the case of alpha crash
Lei Zhang
> 在 2019年8月17日,下午4:29,Willem Jiang <wi...@gmail.com> 写道:
>
> I think we think too much on the Omega side.
> From my experience, the kafka client can get much better performance
> by using the async invocation with batch processing.
> If want to guarantee the delivery of the kafka, we'd better build a
> kafka cluster on the server side instead of notifying Omega the Alpha
> cannot process any message.
>
> For the publish method, I think we just need to log the event which
> cannot be send to kafka for further investigation.
>
> Willem Jiang
>
> Twitter: willemjiang
> Weibo: 姜宁willem
>
> On Sat, Aug 17, 2019 at 4:11 PM Zhang Lei <zh...@boco.com.cn> wrote:
>>
>> Add another point:
>>
>> We still use the asynchronous send method. Use parameter alpha.feature.akka.channel.kafka.producer.batchSize trigger producer.flush(), the balance of performance and reliability depends on the parameters set by the user.
>>
>> The most important thing is that Omega knows that Alpha has made a mistake.
>>
>> List<BaseEvent> eventCache = new LinkedList();
>>
>> @Override
>> public void publish(Object data) {
>> if(logger.isDebugEnabled()){
>> logger.debug("send message [{}] to [{}]", data, topic);
>> }
>>
>> try {
>> if(data instanceof BaseEvent) {
>> BaseEvent event = (BaseEvent) data;
>> eventCache.add(event);
>> if(eventCache.size()== batchSize){
>> try {
>> List<Future> kafkaFutures = new LinkedList<>();
>> for(BaseEvent event : eventCache){
>> kafkaFutures.add(kafkaTemplate.send(topic, event.getGlobalTxId(), Callback...);
>> }
>> producer.flush();
>> for (Future future : kafkaFutures) {
>> future.get();
>> }
>> eventCache.clear();
>> } catch (Exception ex) {
>> logger.warn("Sending events to Kafka failed", ex);
>> throw new Exception("Commit failed as send to Kafka failed",ex);
>> }
>> }
>> }else{
>> throw new UnsupportedOperationException("data must be BaseEvent type");
>> }
>> } catch (InterruptedException | ExecutionException | UnsupportedOperationException e) {
>> logger.error("publish Exception = [{}]", e.getMessage(), e);
>> throw new RuntimeException(e);
>> }
>> }
>>
>>
>>
>>> 在 2019年8月17日,下午3:29,Zhang Lei <zh...@boco.com.cn> 写道:
>>>
>>> Hi, Team
>>>
>>> Our previous discussion on PR[1] about using synchronous or asynchronous methods to send Kafka messages, I think need a trade-off in reliability and performance.
>>>
>>> Maybe we give the option to the user by allowing the user to customize some parameters, I have the following suggestions about the Kafka producer parameters:
>>>
>>> Key: Messages ordered and can't be lost, but they are allowed to repeat for FSM
>>>
>>> 1. Default parameter
>>>
>>> max.in.flight.requests.per.connection is 1 (User modification is prohibited for ordered)
>>> acks is -1
>>> retries is greater than 0
>>>
>>> 2. Allow users to define most parameters of the Kafka producer, E.g.
>>>
>>> acks
>>> retries
>>> buffer.memory
>>> compresstion.type
>>> min.insync.replicas > 1 (use with acks)
>>> replication.factor > min.insync.replicas
>>> timeout.ms
>>> request.timeout.ms
>>> metadata.fetch.timeout.ms
>>> max.block.ms
>>> max.request.size
>>>
>>> 3. KafkaProducer.send(record, callback) or KafkaProducer.send(record).get()
>>>
>>> KafkaProducer.send(record).get() can cause performance problems, but we can fix it by deploying multiple alphas
>>>
>>> KafkaProducer.send(record, callback) set max.block.ms=0 & large enough buffer.memory. But we still have to deal with the callback failure scenario.
>>> In asynchronous mode, if the message is sent, but the acknowledgment has not been received, the buffer pool is full, and the configuration file is set to not limit the timeout for the blocking timeout, which means that the production end is blocked all the time. Ensure that data is not lost.
>>>
>>> Maybe we can use the parameters to allow users to choose to use synchronous or asynchronous sending mode, and use asynchronous mode to get better performance when there is a reliable network and Kafka cluster.
>>>
>>> [1] https://github.com/apache/servicecomb-pack/pull/540 <https://github.com/apache/servicecomb-pack/pull/540>
>>>
>>> Lei Zhang
>>>
>>
>
Re: [DISCUSS] the performance and reliability trade-offs of
KafkaMessage Publisher
Posted by Willem Jiang <wi...@gmail.com>.
I think we think too much on the Omega side.
From my experience, the kafka client can get much better performance
by using the async invocation with batch processing.
If want to guarantee the delivery of the kafka, we'd better build a
kafka cluster on the server side instead of notifying Omega the Alpha
cannot process any message.
For the publish method, I think we just need to log the event which
cannot be send to kafka for further investigation.
Willem Jiang
Twitter: willemjiang
Weibo: 姜宁willem
On Sat, Aug 17, 2019 at 4:11 PM Zhang Lei <zh...@boco.com.cn> wrote:
>
> Add another point:
>
> We still use the asynchronous send method. Use parameter alpha.feature.akka.channel.kafka.producer.batchSize trigger producer.flush(), the balance of performance and reliability depends on the parameters set by the user.
>
> The most important thing is that Omega knows that Alpha has made a mistake.
>
> List<BaseEvent> eventCache = new LinkedList();
>
> @Override
> public void publish(Object data) {
> if(logger.isDebugEnabled()){
> logger.debug("send message [{}] to [{}]", data, topic);
> }
>
> try {
> if(data instanceof BaseEvent) {
> BaseEvent event = (BaseEvent) data;
> eventCache.add(event);
> if(eventCache.size()== batchSize){
> try {
> List<Future> kafkaFutures = new LinkedList<>();
> for(BaseEvent event : eventCache){
> kafkaFutures.add(kafkaTemplate.send(topic, event.getGlobalTxId(), Callback...);
> }
> producer.flush();
> for (Future future : kafkaFutures) {
> future.get();
> }
> eventCache.clear();
> } catch (Exception ex) {
> logger.warn("Sending events to Kafka failed", ex);
> throw new Exception("Commit failed as send to Kafka failed",ex);
> }
> }
> }else{
> throw new UnsupportedOperationException("data must be BaseEvent type");
> }
> } catch (InterruptedException | ExecutionException | UnsupportedOperationException e) {
> logger.error("publish Exception = [{}]", e.getMessage(), e);
> throw new RuntimeException(e);
> }
> }
>
>
>
> > 在 2019年8月17日,下午3:29,Zhang Lei <zh...@boco.com.cn> 写道:
> >
> > Hi, Team
> >
> > Our previous discussion on PR[1] about using synchronous or asynchronous methods to send Kafka messages, I think need a trade-off in reliability and performance.
> >
> > Maybe we give the option to the user by allowing the user to customize some parameters, I have the following suggestions about the Kafka producer parameters:
> >
> > Key: Messages ordered and can't be lost, but they are allowed to repeat for FSM
> >
> > 1. Default parameter
> >
> > max.in.flight.requests.per.connection is 1 (User modification is prohibited for ordered)
> > acks is -1
> > retries is greater than 0
> >
> > 2. Allow users to define most parameters of the Kafka producer, E.g.
> >
> > acks
> > retries
> > buffer.memory
> > compresstion.type
> > min.insync.replicas > 1 (use with acks)
> > replication.factor > min.insync.replicas
> > timeout.ms
> > request.timeout.ms
> > metadata.fetch.timeout.ms
> > max.block.ms
> > max.request.size
> >
> > 3. KafkaProducer.send(record, callback) or KafkaProducer.send(record).get()
> >
> > KafkaProducer.send(record).get() can cause performance problems, but we can fix it by deploying multiple alphas
> >
> > KafkaProducer.send(record, callback) set max.block.ms=0 & large enough buffer.memory. But we still have to deal with the callback failure scenario.
> > In asynchronous mode, if the message is sent, but the acknowledgment has not been received, the buffer pool is full, and the configuration file is set to not limit the timeout for the blocking timeout, which means that the production end is blocked all the time. Ensure that data is not lost.
> >
> > Maybe we can use the parameters to allow users to choose to use synchronous or asynchronous sending mode, and use asynchronous mode to get better performance when there is a reliable network and Kafka cluster.
> >
> > [1] https://github.com/apache/servicecomb-pack/pull/540 <https://github.com/apache/servicecomb-pack/pull/540>
> >
> > Lei Zhang
> >
>
Re: [DISCUSS] the performance and reliability trade-offs of
KafkaMessage Publisher
Posted by Zhang Lei <zh...@boco.com.cn>.
Add another point:
We still use the asynchronous send method. Use parameter alpha.feature.akka.channel.kafka.producer.batchSize trigger producer.flush(), the balance of performance and reliability depends on the parameters set by the user.
The most important thing is that Omega knows that Alpha has made a mistake.
List<BaseEvent> eventCache = new LinkedList();
@Override
public void publish(Object data) {
if(logger.isDebugEnabled()){
logger.debug("send message [{}] to [{}]", data, topic);
}
try {
if(data instanceof BaseEvent) {
BaseEvent event = (BaseEvent) data;
eventCache.add(event);
if(eventCache.size()== batchSize){
try {
List<Future> kafkaFutures = new LinkedList<>();
for(BaseEvent event : eventCache){
kafkaFutures.add(kafkaTemplate.send(topic, event.getGlobalTxId(), Callback...);
}
producer.flush();
for (Future future : kafkaFutures) {
future.get();
}
eventCache.clear();
} catch (Exception ex) {
logger.warn("Sending events to Kafka failed", ex);
throw new Exception("Commit failed as send to Kafka failed",ex);
}
}
}else{
throw new UnsupportedOperationException("data must be BaseEvent type");
}
} catch (InterruptedException | ExecutionException | UnsupportedOperationException e) {
logger.error("publish Exception = [{}]", e.getMessage(), e);
throw new RuntimeException(e);
}
}
> 在 2019年8月17日,下午3:29,Zhang Lei <zh...@boco.com.cn> 写道:
>
> Hi, Team
>
> Our previous discussion on PR[1] about using synchronous or asynchronous methods to send Kafka messages, I think need a trade-off in reliability and performance.
>
> Maybe we give the option to the user by allowing the user to customize some parameters, I have the following suggestions about the Kafka producer parameters:
>
> Key: Messages ordered and can't be lost, but they are allowed to repeat for FSM
>
> 1. Default parameter
>
> max.in.flight.requests.per.connection is 1 (User modification is prohibited for ordered)
> acks is -1
> retries is greater than 0
>
> 2. Allow users to define most parameters of the Kafka producer, E.g.
>
> acks
> retries
> buffer.memory
> compresstion.type
> min.insync.replicas > 1 (use with acks)
> replication.factor > min.insync.replicas
> timeout.ms
> request.timeout.ms
> metadata.fetch.timeout.ms
> max.block.ms
> max.request.size
>
> 3. KafkaProducer.send(record, callback) or KafkaProducer.send(record).get()
>
> KafkaProducer.send(record).get() can cause performance problems, but we can fix it by deploying multiple alphas
>
> KafkaProducer.send(record, callback) set max.block.ms=0 & large enough buffer.memory. But we still have to deal with the callback failure scenario.
> In asynchronous mode, if the message is sent, but the acknowledgment has not been received, the buffer pool is full, and the configuration file is set to not limit the timeout for the blocking timeout, which means that the production end is blocked all the time. Ensure that data is not lost.
>
> Maybe we can use the parameters to allow users to choose to use synchronous or asynchronous sending mode, and use asynchronous mode to get better performance when there is a reliable network and Kafka cluster.
>
> [1] https://github.com/apache/servicecomb-pack/pull/540 <https://github.com/apache/servicecomb-pack/pull/540>
>
> Lei Zhang
>