You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@servicecomb.apache.org by "Willem Jiang (JIRA)" <ji...@apache.org> on 2018/12/17 06:11:00 UTC

[jira] [Updated] (SCB-1081) CompositeOmegaCallback's compensate(TxEvent event) method has concurrency issues

     [ https://issues.apache.org/jira/browse/SCB-1081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Willem Jiang updated SCB-1081:
------------------------------
    Fix Version/s: pack-0.3.0

> CompositeOmegaCallback's compensate(TxEvent event) method has concurrency issues
> --------------------------------------------------------------------------------
>
>                 Key: SCB-1081
>                 URL: https://issues.apache.org/jira/browse/SCB-1081
>             Project: Apache ServiceComb
>          Issue Type: Bug
>          Components: Saga
>            Reporter: takumiCX
>            Assignee: Willem Jiang
>            Priority: Major
>             Fix For: pack-0.3.0
>
>
> CompositeOmegaCallback类的public void compensate(TxEvent event)方法可能会有并发异常
>  
> {code:java}
> public void compensate(TxEvent event) {
>  Map<String, OmegaCallback> serviceCallbacks = callbacks.getOrDefault(event.serviceName(), emptyMap());
> if (serviceCallbacks.isEmpty()) {
>  throw new AlphaException("No such omega callback found for service " + event.serviceName());
>  }
> OmegaCallback omegaCallback = serviceCallbacks.get(event.instanceId());
>  if (omegaCallback == null) {
>  LOG.info("Cannot find the service with the instanceId {}, call the other instance.", event.instanceId());
>  omegaCallback = serviceCallbacks.values().iterator().next();
>  }
> try {
>  omegaCallback.compensate(event);
>  } catch (Exception e) {
>  serviceCallbacks.values().remove(omegaCallback);
>  throw e;
>  }
>  }
> {code}
> 线程可能通过了 if (omegaCallback == null) 的判断条件但是在omegaCallback = serviceCallbacks.values().iterator().next()之前失去了cpu执行权,由于其他线程对serviceCallbacks这个map的操作,目前看alpha端有两种情况:一种是接收到omega端的onDisconnected请求将对应omega端实例从map中移除;一种是执行pendingTask的线程重新进行补偿时失败执行下面这部分代码catch (Exception e) {
>  serviceCallbacks.values().remove(omegaCallback);
>  throw e;
>  }时也会移除map中对应的omega端实例。
>  
> 这部分代码由于是并发异常,发生的可能性本来就非常小,所以比较难以发现和复现。我是对源码在特定位置做了些修改然后复现出来的
> {code:java}
> if (omegaCallback == null) {
>  LOG.info("Cannot find the service with the instanceId {}, call the other instance.", event.instanceId());
>  try {
>  TimeUnit.SECONDS.sleep(2);
>  } catch (InterruptedException e) {
>  e.printStackTrace();
>  }
>  omegaCallback = serviceCallbacks.values().iterator().next();
> }{code}
>  
> {code:java}
> try {
>  throw new RuntimeException();
> // omegaCallback.compensate(event);
>  } catch (Exception e) {{code}
>  
> 下面是测试代码
> {code:java}
> @Test
> public void compensateWithConcurrency() throws InterruptedException {
>  ConcurrentHashMap<String, OmegaCallback> serviceCallbacks = new ConcurrentHashMap();
>  serviceCallbacks.put(instanceId1One,callback1One);
>  callbacks.put(serviceName1,serviceCallbacks);
>  new Thread(new Runnable() {
>  @Override
>  public void run() {
>  compositeOmegaCallback.compensate(eventOf(serviceName1,instanceId1Two,TxStartedEvent));
>  }
>  }).start();
>  TimeUnit.SECONDS.sleep(1);
>  new Thread(new Runnable() {
>  @Override
>  public void run() {
>  compositeOmegaCallback.compensate(eventOf(serviceName1,instanceId1One,TxStartedEvent));
>  }
>  }).start();
>  TimeUnit.SECONDS.sleep(3);
> }{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)