You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@servicecomb.apache.org by "Willem Jiang (JIRA)" <ji...@apache.org> on 2018/12/24 08:12:00 UTC
[jira] [Resolved] (SCB-1081) CompositeOmegaCallback's
compensate(TxEvent event) method has concurrency issues
[ https://issues.apache.org/jira/browse/SCB-1081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Willem Jiang resolved SCB-1081.
-------------------------------
Resolution: Fixed
Merged the patch into master branch.
> CompositeOmegaCallback's compensate(TxEvent event) method has concurrency issues
> --------------------------------------------------------------------------------
>
> Key: SCB-1081
> URL: https://issues.apache.org/jira/browse/SCB-1081
> Project: Apache ServiceComb
> Issue Type: Bug
> Components: Saga
> Reporter: takumiCX
> Assignee: Willem Jiang
> Priority: Major
> Fix For: pack-0.3.0
>
> Time Spent: 10m
> Remaining Estimate: 0h
>
> CompositeOmegaCallback类的public void compensate(TxEvent event)方法可能会有并发异常
>
> {code:java}
> public void compensate(TxEvent event) {
> Map<String, OmegaCallback> serviceCallbacks = callbacks.getOrDefault(event.serviceName(), emptyMap());
> if (serviceCallbacks.isEmpty()) {
> throw new AlphaException("No such omega callback found for service " + event.serviceName());
> }
> OmegaCallback omegaCallback = serviceCallbacks.get(event.instanceId());
> if (omegaCallback == null) {
> LOG.info("Cannot find the service with the instanceId {}, call the other instance.", event.instanceId());
> omegaCallback = serviceCallbacks.values().iterator().next();
> }
> try {
> omegaCallback.compensate(event);
> } catch (Exception e) {
> serviceCallbacks.values().remove(omegaCallback);
> throw e;
> }
> }
> {code}
> 线程可能通过了 if (omegaCallback == null) 的判断条件但是在omegaCallback = serviceCallbacks.values().iterator().next()之前失去了cpu执行权,由于其他线程对serviceCallbacks这个map的操作,目前看alpha端有两种情况:一种是接收到omega端的onDisconnected请求将对应omega端实例从map中移除;一种是执行pendingTask的线程重新进行补偿时失败执行下面这部分代码catch (Exception e) {
> serviceCallbacks.values().remove(omegaCallback);
> throw e;
> }时也会移除map中对应的omega端实例。
>
> 这部分代码由于是并发异常,发生的可能性本来就非常小,所以比较难以发现和复现。我是对源码在特定位置做了些修改然后复现出来的
> {code:java}
> if (omegaCallback == null) {
> LOG.info("Cannot find the service with the instanceId {}, call the other instance.", event.instanceId());
> try {
> TimeUnit.SECONDS.sleep(2);
> } catch (InterruptedException e) {
> e.printStackTrace();
> }
> omegaCallback = serviceCallbacks.values().iterator().next();
> }{code}
>
> {code:java}
> try {
> throw new RuntimeException();
> // omegaCallback.compensate(event);
> } catch (Exception e) {{code}
>
> 下面是测试代码
> {code:java}
> @Test
> public void compensateWithConcurrency() throws InterruptedException {
> ConcurrentHashMap<String, OmegaCallback> serviceCallbacks = new ConcurrentHashMap();
> serviceCallbacks.put(instanceId1One,callback1One);
> callbacks.put(serviceName1,serviceCallbacks);
> new Thread(new Runnable() {
> @Override
> public void run() {
> compositeOmegaCallback.compensate(eventOf(serviceName1,instanceId1Two,TxStartedEvent));
> }
> }).start();
> TimeUnit.SECONDS.sleep(1);
> new Thread(new Runnable() {
> @Override
> public void run() {
> compositeOmegaCallback.compensate(eventOf(serviceName1,instanceId1One,TxStartedEvent));
> }
> }).start();
> TimeUnit.SECONDS.sleep(3);
> }{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)