You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2020/01/14 11:33:53 UTC

[servicecomb-pack] 04/08: SCB-1696 Optimize state machine compensation retry strategy using reverseRetries and retryDelayInMilliseconds

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 5baeda1e082c16c19947927c1a26d7f57b40403f
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Mon Jan 13 02:12:59 2020 +0800

    SCB-1696 Optimize state machine compensation retry strategy using reverseRetries and retryDelayInMilliseconds
---
 .../pack/alpha/core/fsm/SuspendedType.java         |   2 +-
 .../servicecomb/pack/alpha/core/fsm/TxState.java   |   4 +-
 ...kEvent.java => TxCompensateAckFailedEvent.java} |  22 ++---
 ...Event.java => TxCompensateAckSucceedEvent.java} |  23 +----
 .../servicecomb/pack/alpha/fsm/SagaActor.java      |  50 ++++++----
 .../pack/alpha/fsm/domain/UpdateTxEventDomain.java |  13 ++-
 .../servicecomb/pack/alpha/fsm/SagaActorTest.java  | 101 ++++++++++++++++-----
 .../pack/alpha/fsm/SagaEventSender.java            |  77 +++++++++++-----
 .../pack/alpha/fsm/SagaIntegrationTest.java        |  24 ++---
 .../alpha/server/fsm/GrpcSagaEventService.java     |  11 ++-
 .../alpha/server/fsm/AlphaIntegrationFsmTest.java  |  36 ++++----
 .../grpc/saga/GrpcSagaClientMessageSender.java     |   2 +
 .../pack/omega/transaction/CallbackContext.java    |  10 +-
 .../transaction/CompensationMessageHandler.java    |   6 +-
 .../transaction/TxCompensateAckFailedEvent.java    |   4 +-
 .../transaction/TxCompensateAckSucceedEvent.java   |   4 +-
 .../transaction/tcc/CoordinateMessageHandler.java  |   2 +-
 .../CompensationMessageHandlerTest.java            |  66 ++++++++++++--
 18 files changed, 304 insertions(+), 153 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/SuspendedType.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/SuspendedType.java
index a3189d2..f2ed57a 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/SuspendedType.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/SuspendedType.java
@@ -18,5 +18,5 @@
 package org.apache.servicecomb.pack.alpha.core.fsm;
 
 public enum SuspendedType {
-  NONE, TIMEOUT, UNPREDICTABLE
+  NONE, TIMEOUT, COMPENSATE_FAILED, UNPREDICTABLE
 }
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/TxState.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/TxState.java
index 747cfc1..f7848fb 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/TxState.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/TxState.java
@@ -22,5 +22,7 @@ public enum TxState {
   FAILED,
   COMMITTED,
   COMPENSATION_SENT, // The compensation method has been called to wait for TxCompensatedEvent
-  COMPENSATED
+  COMPENSATED, // Just for compatibility with historical data  deserialization in ES
+  COMPENSATED_SUCCEED,
+  COMPENSATED_FAILED,
 }
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/TxCompensateAckEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/TxCompensateAckFailedEvent.java
similarity index 80%
copy from alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/TxCompensateAckEvent.java
copy to alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/TxCompensateAckFailedEvent.java
index 1fae858..a033d21 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/TxCompensateAckEvent.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/TxCompensateAckFailedEvent.java
@@ -20,16 +20,16 @@ package org.apache.servicecomb.pack.alpha.core.fsm.event;
 import java.util.Date;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.base.TxEvent;
 
-public class TxCompensateAckEvent extends TxEvent {
+public class TxCompensateAckFailedEvent extends TxEvent {
 
-  private boolean succeed;
+  private byte[] payloads;
 
-  public boolean isSucceed() {
-    return succeed;
+  public byte[] getPayloads() {
+    return payloads;
   }
 
-  public void setSucceed(boolean succeed) {
-    this.succeed = succeed;
+  public void setPayloads(byte[] payloads) {
+    this.payloads = payloads;
   }
 
   public static Builder builder() {
@@ -38,10 +38,10 @@ public class TxCompensateAckEvent extends TxEvent {
 
   public static final class Builder {
 
-    private TxCompensateAckEvent txCompensatedEvent;
+    private TxCompensateAckFailedEvent txCompensatedEvent;
 
     private Builder() {
-      txCompensatedEvent = new TxCompensateAckEvent();
+      txCompensatedEvent = new TxCompensateAckFailedEvent();
     }
 
     public Builder serviceName(String serviceName) {
@@ -74,12 +74,12 @@ public class TxCompensateAckEvent extends TxEvent {
       return this;
     }
 
-    public Builder succeed(boolean succeed){
-      txCompensatedEvent.setSucceed(succeed);
+    public Builder payloads(byte[] payloads){
+      txCompensatedEvent.setPayloads(payloads);
       return this;
     }
 
-    public TxCompensateAckEvent build() {
+    public TxCompensateAckFailedEvent build() {
       return txCompensatedEvent;
     }
   }
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/TxCompensateAckEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/TxCompensateAckSucceedEvent.java
similarity index 79%
rename from alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/TxCompensateAckEvent.java
rename to alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/TxCompensateAckSucceedEvent.java
index 1fae858..d1b7c75 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/TxCompensateAckEvent.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/TxCompensateAckSucceedEvent.java
@@ -20,17 +20,7 @@ package org.apache.servicecomb.pack.alpha.core.fsm.event;
 import java.util.Date;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.base.TxEvent;
 
-public class TxCompensateAckEvent extends TxEvent {
-
-  private boolean succeed;
-
-  public boolean isSucceed() {
-    return succeed;
-  }
-
-  public void setSucceed(boolean succeed) {
-    this.succeed = succeed;
-  }
+public class TxCompensateAckSucceedEvent extends TxEvent {
 
   public static Builder builder() {
     return new Builder();
@@ -38,10 +28,10 @@ public class TxCompensateAckEvent extends TxEvent {
 
   public static final class Builder {
 
-    private TxCompensateAckEvent txCompensatedEvent;
+    private TxCompensateAckSucceedEvent txCompensatedEvent;
 
     private Builder() {
-      txCompensatedEvent = new TxCompensateAckEvent();
+      txCompensatedEvent = new TxCompensateAckSucceedEvent();
     }
 
     public Builder serviceName(String serviceName) {
@@ -74,12 +64,7 @@ public class TxCompensateAckEvent extends TxEvent {
       return this;
     }
 
-    public Builder succeed(boolean succeed){
-      txCompensatedEvent.setSucceed(succeed);
-      return this;
-    }
-
-    public TxCompensateAckEvent build() {
+    public TxCompensateAckSucceedEvent build() {
       return txCompensatedEvent;
     }
   }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
index 7c6eee6..a7c74e2 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
@@ -29,6 +29,8 @@ import java.util.concurrent.TimeUnit;
 import org.apache.servicecomb.pack.alpha.core.AlphaException;
 import org.apache.servicecomb.pack.alpha.core.fsm.SuspendedType;
 import org.apache.servicecomb.pack.alpha.core.fsm.TxState;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckFailedEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckSucceedEvent;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
 import org.apache.servicecomb.pack.alpha.fsm.domain.AddTxEventDomain;
 import org.apache.servicecomb.pack.alpha.fsm.domain.DomainEvent;
@@ -40,7 +42,6 @@ import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaEndedEvent;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaStartedEvent;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaTimeoutEvent;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.TxAbortedEvent;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensatedEvent;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.internal.ComponsitedCheckEvent;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.TxEndedEvent;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.TxStartedEvent;
@@ -230,7 +231,14 @@ public class SagaActor extends
               return goTo(SagaActorState.SUSPENDED)
                   .applying(domainEvent);
             }
-        ).event(TxCompensatedEvent.class, SagaData.class,
+        ).event(TxCompensateAckSucceedEvent.class, SagaData.class,
+            (event, data) -> {
+              UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
+              return stay().applying(domainEvent).andThen(exec(_data -> {
+                self().tell(ComponsitedCheckEvent.builder().build(), self());
+              }));
+            }
+        ).event(TxCompensateAckFailedEvent.class, SagaData.class,
             (event, data) -> {
               UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
               return stay().applying(domainEvent).andThen(exec(_data -> {
@@ -242,10 +250,13 @@ public class SagaActor extends
               if (data.getTxEntities().hasCompensationSentTx() || !data.isTerminated()) {
                 return stay();
               } else {
-                SagaEndedDomain domainEvent = new SagaEndedDomain(event,
-                    SagaActorState.COMPENSATED);
-                return goTo(SagaActorState.COMPENSATED)
-                    .applying(domainEvent);
+                if(data.getSuspendedType() == SuspendedType.COMPENSATE_FAILED) {
+                  SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED, SuspendedType.COMPENSATE_FAILED);
+                  return goTo(SagaActorState.SUSPENDED).applying(domainEvent);
+                } else {
+                  SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.COMPENSATED);
+                  return goTo(SagaActorState.COMPENSATED).applying(domainEvent);
+                }
               }
             }
         ).event(SagaAbortedEvent.class, SagaData.class,
@@ -450,6 +461,9 @@ public class SagaActor extends
               .compensationMethod(domainEvent.getCompensationMethod())
               .payloads(domainEvent.getPayloads())
               .state(domainEvent.getState())
+              .reverseRetries(domainEvent.getReverseRetries())
+              .reverseTimeout(domainEvent.getReverseTimeout())
+              .retryDelayInMilliseconds(domainEvent.getRetryDelayInMilliseconds())
               .beginTime(domainEvent.getEvent().getCreateTime())
               .build();
           data.getTxEntities().put(txEntity.getLocalTxId(), txEntity);
@@ -471,11 +485,15 @@ public class SagaActor extends
               compensation(v, data);
             }
           });
-        } else if (domainEvent.getState() == TxState.COMPENSATED) {
+        } else if (domainEvent.getState() == TxState.COMPENSATED_SUCCEED || domainEvent.getState() == TxState.COMPENSATED_FAILED) {
           // decrement the compensation running counter by one
           data.getCompensationRunningCounter().decrementAndGet();
           txEntity.setState(domainEvent.getState());
-          LOG.info("compensation is completed {}", txEntity.getLocalTxId());
+          if (domainEvent.getState() == TxState.COMPENSATED_SUCCEED) {
+            LOG.info("compensation is succeed {}", txEntity.getLocalTxId());
+          } else {
+            LOG.info("compensation is failed {}", txEntity.getLocalTxId());
+          }
         }
       } else if (event instanceof SagaEndedDomain) {
         SagaEndedDomain domainEvent = (SagaEndedDomain) event;
@@ -548,22 +566,20 @@ public class SagaActor extends
       if (txEntity.getReverseRetries() > 0) {
         // which means the retry number
         if (txEntity.getRetriesCounter().incrementAndGet() < txEntity.getReverseRetries()) {
+          LOG.info("Retry compensate {}/{} after {} ms", txEntity.getRetriesCounter().get() + 1, txEntity.getReverseRetries(),
+              txEntity.getRetryDelayInMilliseconds());
           try {
             Thread.sleep(txEntity.getRetryDelayInMilliseconds());
           } catch (InterruptedException e) {
             LOG.error(e.getMessage(), e);
           }
           compensation(txEntity, data);
+        } else {
+          data.setSuspendedType(SuspendedType.COMPENSATE_FAILED);
         }
-      } else if (txEntity.getReverseRetries() == -1) {
-        // which means retry it until succeed
-        try {
-          Thread.sleep(txEntity.getRetryDelayInMilliseconds());
-        } catch (InterruptedException e) {
-          LOG.error(e.getMessage(), e);
-        }
-        compensation(txEntity, data);
+      } else {
+        data.setSuspendedType(SuspendedType.COMPENSATE_FAILED);
       }
     }
   }
-}
+}
\ No newline at end of file
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java
index f74d3e6..79d5dc0 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java
@@ -19,7 +19,8 @@ package org.apache.servicecomb.pack.alpha.fsm.domain;
 
 import org.apache.servicecomb.pack.alpha.core.fsm.TxState;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.TxAbortedEvent;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensatedEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckFailedEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckSucceedEvent;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.TxEndedEvent;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
 
@@ -45,13 +46,19 @@ public class UpdateTxEventDomain implements DomainEvent {
     this.state = TxState.FAILED;
   }
 
-  public UpdateTxEventDomain(TxCompensatedEvent event) {
+  public UpdateTxEventDomain(TxCompensateAckSucceedEvent event) {
     this.event = event;
     this.parentTxId = event.getParentTxId();
     this.localTxId = event.getLocalTxId();
-    this.state = TxState.COMPENSATED;
+    this.state = TxState.COMPENSATED_SUCCEED;
   }
 
+  public UpdateTxEventDomain(TxCompensateAckFailedEvent event) {
+    this.event = event;
+    this.parentTxId = event.getParentTxId();
+    this.localTxId = event.getLocalTxId();
+    this.state = TxState.COMPENSATED_FAILED;
+  }
 
   public String getParentTxId() {
     return parentTxId;
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
index 59a7d42..9dcfbbe 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
@@ -328,7 +328,7 @@ public class SagaActorTest {
    * 3. TxEndedEvent-11
    * 4. TxStartedEvent-12
    * 5. TxAbortedEvent-12
-   * 6. TxCompensatedEvent-11
+   * 6. TxCompensateAckSucceedEvent-11
    * 7. SagaAbortedEvent-1
    */
   @Test
@@ -375,7 +375,7 @@ public class SagaActorTest {
       SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData();
       assertEquals(sagaData.getGlobalTxId(), globalTxId);
       assertEquals(sagaData.getTxEntities().size(), 2);
-      assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(), TxState.COMPENSATED);
+      assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(), TxState.COMPENSATED_SUCCEED);
       assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(), TxState.FAILED);
       assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
 
@@ -385,6 +385,59 @@ public class SagaActorTest {
     }};
   }
 
+  @Test
+  public void middleTxAbortedAndRetryCompensationEvents() {
+    new TestKit(system) {{
+      final String globalTxId = UUID.randomUUID().toString();
+      final String localTxId_1 = UUID.randomUUID().toString();
+      final String localTxId_2 = UUID.randomUUID().toString();
+
+      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()));
+      watch(saga);
+      saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
+
+      List<BaseEvent> eventList = SagaEventSender.middleTxAbortedAndRetryCompensationEvents(globalTxId, localTxId_1, localTxId_2);
+      eventList.stream().forEach( event -> {
+        saga.tell(event, getRef());
+      });
+
+      //expect
+      CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
+      assertEquals(SagaActorState.IDLE, currentState.state());
+
+      PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.IDLE, SagaActorState.READY);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.FAILED);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED);
+
+      Terminated terminated = expectMsgClass(Terminated.class);
+      assertEquals(terminated.getActor(), saga);
+
+      SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData();
+      assertEquals(sagaData.getGlobalTxId(), globalTxId);
+      assertEquals(sagaData.getTxEntities().size(), 2);
+      assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(), TxState.COMPENSATED_SUCCEED);
+      assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(), TxState.FAILED);
+
+      assertThat(eventList, is(sagaData.getEvents()));
+
+      system.stop(saga);
+    }};
+  }
+
   /**
    * 1. SagaStartedEvent-1
    * 2. TxStartedEvent-11
@@ -393,8 +446,8 @@ public class SagaActorTest {
    * 5. TxEndedEvent-12
    * 6. TxStartedEvent-13
    * 7. TxAbortedEvent-13
-   * 8. TxCompensatedEvent-11
-   * 9. TxCompensatedEvent-12
+   * 8. TxCompensateAckSucceedEvent-11
+   * 9. TxCompensateAckSucceedEvent-12
    * 10. SagaAbortedEvent-1
    */
   @Test
@@ -448,8 +501,8 @@ public class SagaActorTest {
       SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData();
       assertEquals(sagaData.getGlobalTxId(), globalTxId);
       assertEquals(sagaData.getTxEntities().size(), 3);
-      assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(), TxState.COMPENSATED);
-      assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(), TxState.COMPENSATED);
+      assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(), TxState.COMPENSATED_SUCCEED);
+      assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(), TxState.COMPENSATED_SUCCEED);
       assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(), TxState.FAILED);
       assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
       assertThat(eventList, is(sagaData.getEvents()));
@@ -466,8 +519,8 @@ public class SagaActorTest {
    * 5. TxEndedEvent-12
    * 6. TxStartedEvent-13
    * 7. TxAbortedEvent-13
-   * 8. TxCompensatedEvent-11
-   * 9. TxCompensatedEvent-12
+   * 8. TxCompensateAckSucceedEvent-11
+   * 9. TxCompensateAckSucceedEvent-12
    * 10. SagaAbortedEvent-1
    */
   @Test
@@ -521,8 +574,8 @@ public class SagaActorTest {
       SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData();
       assertEquals(sagaData.getGlobalTxId(), globalTxId);
       assertEquals(sagaData.getTxEntities().size(), 3);
-      assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(), TxState.COMPENSATED);
-      assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(), TxState.COMPENSATED);
+      assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(), TxState.COMPENSATED_SUCCEED);
+      assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(), TxState.COMPENSATED_SUCCEED);
       assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(), TxState.FAILED);
       assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
 
@@ -540,8 +593,8 @@ public class SagaActorTest {
    * 5. TxEndedEvent-12
    * 6. TxStartedEvent-13
    * 7. TxEndedEvent-13
-   * 8. TxCompensatedEvent-12
-   * 9. TxCompensatedEvent-13
+   * 8. TxCompensateAckSucceedEvent-12
+   * 9. TxCompensateAckSucceedEvent-13
    * 10. SagaAbortedEvent-1
    */
   @Test
@@ -584,8 +637,8 @@ public class SagaActorTest {
       assertEquals(sagaData.getGlobalTxId(), globalTxId);
       assertEquals(sagaData.getTxEntities().size(), 3);
       assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(), TxState.FAILED);
-      assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(), TxState.COMPENSATED);
-      assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(), TxState.COMPENSATED);
+      assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(), TxState.COMPENSATED_SUCCEED);
+      assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(), TxState.COMPENSATED_SUCCEED);
       assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
       assertThat(eventList, is(sagaData.getEvents()));
 
@@ -603,9 +656,9 @@ public class SagaActorTest {
    * 6. TxStartedEvent-13
    * 7. TxEndedEvent-13
    * 8. SagaAbortedEvent-1
-   * 9. TxCompensatedEvent-11
-   * 8. TxCompensatedEvent-12
-   * 9. TxCompensatedEvent-13
+   * 9. TxCompensateAckSucceedEvent-11
+   * 8. TxCompensateAckSucceedEvent-12
+   * 9. TxCompensateAckSucceedEvent-13
    */
   @Test
   public void sagaAbortedEventAfterAllTxEndedTest() {
@@ -661,9 +714,9 @@ public class SagaActorTest {
       SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData();
       assertEquals(sagaData.getGlobalTxId(), globalTxId);
       assertEquals(sagaData.getTxEntities().size(), 3);
-      assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(), TxState.COMPENSATED);
-      assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(), TxState.COMPENSATED);
-      assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(), TxState.COMPENSATED);
+      assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(), TxState.COMPENSATED_SUCCEED);
+      assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(), TxState.COMPENSATED_SUCCEED);
+      assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(), TxState.COMPENSATED_SUCCEED);
       assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
 
       assertThat(eventList, is(sagaData.getEvents()));
@@ -937,8 +990,8 @@ public class SagaActorTest {
    * 3. TxEndedEvent-11
    * 5. TxEndedEvent-12
    * 7. TxAbortedEvent-13
-   * 8. TxCompensatedEvent-11
-   * 9. TxCompensatedEvent-12
+   * 8. TxCompensateAckSucceedEvent-11
+   * 9. TxCompensateAckSucceedEvent-12
    * 10. SagaAbortedEvent-1
    */
   @Test
@@ -982,8 +1035,8 @@ public class SagaActorTest {
       SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData();
       assertEquals(sagaData.getGlobalTxId(), globalTxId);
       assertEquals(sagaData.getTxEntities().size(), 3);
-      assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(), TxState.COMPENSATED);
-      assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(), TxState.COMPENSATED);
+      assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(), TxState.COMPENSATED_SUCCEED);
+      assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(), TxState.COMPENSATED_SUCCEED);
       assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(), TxState.FAILED);
       assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
       assertThat(eventList, is(sagaData.getEvents()));
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java
index b110a84..1f0d792 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java
@@ -24,7 +24,9 @@ import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaEndedEvent;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaStartedEvent;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaTimeoutEvent;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.TxAbortedEvent;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensatedEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckFailedEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckSucceedEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckSucceedEvent;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.TxEndedEvent;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.TxStartedEvent;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
@@ -79,7 +81,7 @@ public class SagaEventSender {
    * 3. TxEndedEvent-11
    * 4. TxStartedEvent-12
    * 5. TxAbortedEvent-12
-   * 6. TxCompensatedEvent-11
+   * 6. TxCompensateAckSucceedEvent-11
    * 7. SagaAbortedEvent-1
    */
   public static List<BaseEvent> middleTxAbortedEvents(String globalTxId, String localTxId_1, String localTxId_2){
@@ -89,7 +91,7 @@ public class SagaEventSender {
     sagaEvents.add(TxEndedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
     sagaEvents.add(TxStartedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
     sagaEvents.add(TxAbortedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
-    sagaEvents.add(TxCompensatedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxCompensateAckSucceedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
     sagaEvents.add(SagaAbortedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build());
     return sagaEvents;
   }
@@ -102,8 +104,8 @@ public class SagaEventSender {
    * 5. TxEndedEvent-12
    * 6. TxStartedEvent-13
    * 7. TxAbortedEvent-13
-   * 8. TxCompensatedEvent-11
-   * 9. TxCompensatedEvent-12
+   * 8. TxCompensateAckSucceedEvent-11
+   * 9. TxCompensateAckSucceedEvent-12
    * 10. SagaAbortedEvent-1
    */
   public static List<BaseEvent> lastTxAbortedEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
@@ -115,8 +117,33 @@ public class SagaEventSender {
     sagaEvents.add(TxEndedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
     sagaEvents.add(TxStartedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
     sagaEvents.add(TxAbortedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
-    sagaEvents.add(TxCompensatedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
-    sagaEvents.add(TxCompensatedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    sagaEvents.add(TxCompensateAckSucceedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxCompensateAckSucceedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    sagaEvents.add(SagaAbortedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build());
+    return sagaEvents;
+  }
+
+  /**
+   * 1. SagaStartedEvent-1
+   * 2. TxStartedEvent-11
+   * 3. TxEndedEvent-11
+   * 4. TxStartedEvent-12
+   * 5. TxAbortedEvent-12
+   * 6. TxCompensateAckFailedEvent-11
+   * 7. TxCompensateAckFailedEvent-11
+   * 8. TxCompensateAckFailedEvent-11
+   * 9. SagaAbortedEvent-1
+   */
+  public static List<BaseEvent> middleTxAbortedAndRetryCompensationEvents(String globalTxId, String localTxId_1, String localTxId_2){
+    List<BaseEvent> sagaEvents = new ArrayList<>();
+    sagaEvents.add(SagaStartedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build());
+    sagaEvents.add(TxStartedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).reverseRetries(3).build());
+    sagaEvents.add(TxEndedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxStartedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    sagaEvents.add(TxAbortedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    sagaEvents.add(TxCompensateAckFailedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxCompensateAckFailedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxCompensateAckSucceedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
     sagaEvents.add(SagaAbortedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build());
     return sagaEvents;
   }
@@ -130,8 +157,8 @@ public class SagaEventSender {
    * 6. TxStartedEvent-13
    * 7. TxAbortedEvent-13
    * 8. SagaAbortedEvent-1
-   * 9. TxCompensatedEvent-11
-   * 10. TxCompensatedEvent-12
+   * 9. TxCompensateAckSucceedEvent-11
+   * 10. TxCompensateAckSucceedEvent-12
    */
   public static List<BaseEvent> sagaAbortedEventBeforeTxComponsitedEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
     List<BaseEvent> sagaEvents = new ArrayList<>();
@@ -143,8 +170,8 @@ public class SagaEventSender {
     sagaEvents.add(TxStartedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
     sagaEvents.add(TxAbortedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
     sagaEvents.add(SagaAbortedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build());
-    sagaEvents.add(TxCompensatedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
-    sagaEvents.add(TxCompensatedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    sagaEvents.add(TxCompensateAckSucceedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxCompensateAckSucceedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
     return sagaEvents;
   }
 
@@ -156,8 +183,8 @@ public class SagaEventSender {
    * 5. TxEndedEvent-12
    * 6. TxStartedEvent-13
    * 7. TxEndedEvent-13
-   * 8. TxCompensatedEvent-12
-   * 9. TxCompensatedEvent-13
+   * 8. TxCompensateAckSucceedEvent-12
+   * 9. TxCompensateAckSucceedEvent-13
    * 10. SagaAbortedEvent-1
    */
   public static List<BaseEvent> receivedRemainingEventAfterFirstTxAbortedEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
@@ -169,8 +196,8 @@ public class SagaEventSender {
     sagaEvents.add(TxEndedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
     sagaEvents.add(TxStartedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
     sagaEvents.add(TxEndedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
-    sagaEvents.add(TxCompensatedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
-    sagaEvents.add(TxCompensatedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+    sagaEvents.add(TxCompensateAckSucceedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    sagaEvents.add(TxCompensateAckSucceedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
     sagaEvents.add(SagaAbortedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build());
     return sagaEvents;
   }
@@ -184,9 +211,9 @@ public class SagaEventSender {
    * 6. TxStartedEvent-13
    * 7. TxEndedEvent-13
    * 8. SagaAbortedEvent-1
-   * 9. TxCompensatedEvent-11
-   * 8. TxCompensatedEvent-12
-   * 9. TxCompensatedEvent-13
+   * 9. TxCompensateAckSucceedEvent-11
+   * 8. TxCompensateAckSucceedEvent-12
+   * 9. TxCompensateAckSucceedEvent-13
    */
   public static List<BaseEvent> sagaAbortedEventAfterAllTxEndedsEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
     List<BaseEvent> sagaEvents = new ArrayList<>();
@@ -198,9 +225,9 @@ public class SagaEventSender {
     sagaEvents.add(TxStartedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
     sagaEvents.add(TxEndedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
     sagaEvents.add(SagaAbortedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build());
-    sagaEvents.add(TxCompensatedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
-    sagaEvents.add(TxCompensatedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
-    sagaEvents.add(TxCompensatedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+    sagaEvents.add(TxCompensateAckSucceedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxCompensateAckSucceedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    sagaEvents.add(TxCompensateAckSucceedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
     return sagaEvents;
   }
 
@@ -302,8 +329,8 @@ public class SagaEventSender {
    * 3. TxEndedEvent-11
    * 5. TxEndedEvent-12
    * 7. TxAbortedEvent-13
-   * 8. TxCompensatedEvent-11
-   * 9. TxCompensatedEvent-12
+   * 8. TxCompensateAckSucceedEvent-11
+   * 9. TxCompensateAckSucceedEvent-12
    * 10. SagaAbortedEvent-1
    */
   public static List<BaseEvent> lastTxAbortedEventWithTxConcurrentEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
@@ -315,8 +342,8 @@ public class SagaEventSender {
     sagaEvents.add(TxEndedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
     sagaEvents.add(TxEndedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
     sagaEvents.add(TxAbortedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
-    sagaEvents.add(TxCompensatedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
-    sagaEvents.add(TxCompensatedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    sagaEvents.add(TxCompensateAckSucceedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxCompensateAckSucceedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
     sagaEvents.add(SagaAbortedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build());
     return sagaEvents;
   }
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
index bfe4037..f7b927e 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
@@ -137,7 +137,7 @@ public class SagaIntegrationTest {
     assertNotNull(sagaData.getBeginTime());
     assertNotNull(sagaData.getEndTime());
     assertEquals(sagaData.getTxEntities().size(),2);
-    assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED);
+    assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED_SUCCEED);
     assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.FAILED);
   }
 
@@ -158,8 +158,8 @@ public class SagaIntegrationTest {
     assertNotNull(sagaData.getBeginTime());
     assertNotNull(sagaData.getEndTime());
     assertEquals(sagaData.getTxEntities().size(),3);
-    assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED);
-    assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED);
+    assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED_SUCCEED);
+    assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED_SUCCEED);
     assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(),TxState.FAILED);
   }
 
@@ -180,8 +180,8 @@ public class SagaIntegrationTest {
     assertNotNull(sagaData.getBeginTime());
     assertNotNull(sagaData.getEndTime());
     assertEquals(sagaData.getTxEntities().size(),3);
-    assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED);
-    assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED);
+    assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED_SUCCEED);
+    assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED_SUCCEED);
     assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(),TxState.FAILED);
   }
 
@@ -203,8 +203,8 @@ public class SagaIntegrationTest {
     assertNotNull(sagaData.getEndTime());
     assertEquals(sagaData.getTxEntities().size(),3);
     assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.FAILED);
-    assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED);
-    assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(),TxState.COMPENSATED);
+    assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED_SUCCEED);
+    assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(),TxState.COMPENSATED_SUCCEED);
   }
 
   @Test
@@ -224,9 +224,9 @@ public class SagaIntegrationTest {
     assertNotNull(sagaData.getBeginTime());
     assertNotNull(sagaData.getEndTime());
     assertEquals(sagaData.getTxEntities().size(),3);
-    assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED);
-    assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED);
-    assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(),TxState.COMPENSATED);
+    assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED_SUCCEED);
+    assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED_SUCCEED);
+    assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(),TxState.COMPENSATED_SUCCEED);
   }
 
   @Test
@@ -335,8 +335,8 @@ public class SagaIntegrationTest {
     assertNotNull(sagaData.getBeginTime());
     assertNotNull(sagaData.getEndTime());
     assertEquals(sagaData.getTxEntities().size(),3);
-    assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED);
-    assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED);
+    assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED_SUCCEED);
+    assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED_SUCCEED);
     assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(),TxState.FAILED);
   }
 
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
index 49b5e4e..e4d25b9 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
@@ -26,7 +26,8 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.servicecomb.pack.alpha.core.OmegaCallback;
 import org.apache.servicecomb.pack.alpha.core.fsm.CompensateAckType;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckFailedEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckSucceedEvent;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
 import org.apache.servicecomb.pack.alpha.core.fsm.channel.ActorEventChannel;
 import org.apache.servicecomb.pack.common.EventType;
@@ -141,6 +142,7 @@ public class GrpcSagaEventService extends TxEventServiceImplBase {
           .forwardTimeout(message.getForwardTimeout())
           .reverseRetries(message.getReverseRetries())
           .reverseTimeout(message.getReverseTimeout())
+          .retryDelayInMilliseconds(message.getRetryDelayInMilliseconds())
           .createTime(new Date())
           .payloads(message.getPayloads().toByteArray()).build();
     } else if (message.getType().equals(EventType.TxEndedEvent.name())) {
@@ -168,8 +170,7 @@ public class GrpcSagaEventService extends TxEventServiceImplBase {
           .createTime(new Date())
           .localTxId(message.getLocalTxId()).build();
     } else if (message.getType().equals(EventType.TxCompensateAckSucceedEvent.name())) {
-      event = TxCompensateAckEvent.builder()
-          .succeed(true)
+      event = TxCompensateAckSucceedEvent.builder()
           .serviceName(message.getServiceName())
           .instanceId(message.getInstanceId())
           .globalTxId(message.getGlobalTxId())
@@ -179,8 +180,8 @@ public class GrpcSagaEventService extends TxEventServiceImplBase {
       omegaCallbacks.get(message.getServiceName()).get(message.getInstanceId())
           .getAck(CompensateAckType.Succeed);
     } else if (message.getType().equals(EventType.TxCompensateAckFailedEvent.name())) {
-      event = TxCompensateAckEvent.builder()
-          .succeed(false)
+      event = TxCompensateAckFailedEvent.builder()
+          .payloads(message.getPayloads().toByteArray())
           .serviceName(message.getServiceName())
           .instanceId(message.getInstanceId())
           .globalTxId(message.getGlobalTxId())
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
index 3db7943..7dd2600 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
@@ -179,7 +179,7 @@ public class AlphaIntegrationFsmTest {
     assertNotNull(sagaData.getBeginTime());
     assertNotNull(sagaData.getEndTime());
     assertTrue(sagaData.getEndTime().getTime() > sagaData.getBeginTime().getTime());
-    assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED);
+    assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED_SUCCEED);
     assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.FAILED);
   }
 
@@ -204,8 +204,8 @@ public class AlphaIntegrationFsmTest {
     assertNotNull(sagaData.getBeginTime());
     assertNotNull(sagaData.getEndTime());
     assertTrue(sagaData.getEndTime().getTime() > sagaData.getBeginTime().getTime());
-    assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED);
-    assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED);
+    assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED_SUCCEED);
+    assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED_SUCCEED);
     assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(),TxState.FAILED);
     assertArrayEquals(sagaData.getTxEntities().get(localTxId_3).getThrowablePayLoads(),NullPointerException.class.getName().getBytes());
   }
@@ -232,8 +232,8 @@ public class AlphaIntegrationFsmTest {
     assertNotNull(sagaData.getEndTime());
     assertTrue(sagaData.getEndTime().getTime() > sagaData.getBeginTime().getTime());
     assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.FAILED);
-    assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED);
-    assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(),TxState.COMPENSATED);
+    assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED_SUCCEED);
+    assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(),TxState.COMPENSATED_SUCCEED);
   }
 
   @Test
@@ -265,8 +265,8 @@ public class AlphaIntegrationFsmTest {
     assertNotNull(sagaData.getEndTime());
     assertTrue(sagaData.getEndTime().getTime() > sagaData.getBeginTime().getTime());
     assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.FAILED);
-    assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED);
-    assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(),TxState.COMPENSATED);
+    assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED_SUCCEED);
+    assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(),TxState.COMPENSATED_SUCCEED);
   }
 
   @Test
@@ -290,9 +290,9 @@ public class AlphaIntegrationFsmTest {
     assertNotNull(sagaData.getBeginTime());
     assertNotNull(sagaData.getEndTime());
     assertTrue(sagaData.getEndTime().getTime() > sagaData.getBeginTime().getTime());
-    assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED);
-    assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED);
-    assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(),TxState.COMPENSATED);
+    assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED_SUCCEED);
+    assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED_SUCCEED);
+    assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(),TxState.COMPENSATED_SUCCEED);
   }
 
   @Test
@@ -412,8 +412,8 @@ public class AlphaIntegrationFsmTest {
     assertNotNull(sagaData.getBeginTime());
     assertNotNull(sagaData.getEndTime());
     assertTrue(sagaData.getEndTime().getTime() > sagaData.getBeginTime().getTime());
-    assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED);
-    assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED);
+    assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED_SUCCEED);
+    assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED_SUCCEED);
     assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(),TxState.FAILED);
   }
 
@@ -438,8 +438,8 @@ public class AlphaIntegrationFsmTest {
     assertNotNull(sagaData.getBeginTime());
     assertNotNull(sagaData.getEndTime());
     assertTrue(sagaData.getEndTime().getTime() > sagaData.getBeginTime().getTime());
-    assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED);
-    assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED);
+    assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED_SUCCEED);
+    assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED_SUCCEED);
     assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(),TxState.FAILED);
   }
 
@@ -478,8 +478,8 @@ public class AlphaIntegrationFsmTest {
     assertNotNull(sagaData.getBeginTime());
     assertNotNull(sagaData.getEndTime());
     assertTrue(sagaData.getEndTime().getTime() > sagaData.getBeginTime().getTime());
-    assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED);
-    assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED);
+    assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED_SUCCEED);
+    assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED_SUCCEED);
     assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(),TxState.FAILED);
     assertArrayEquals(sagaData.getTxEntities().get(localTxId_3).getThrowablePayLoads(),NullPointerException.class.getName().getBytes());
   }
@@ -505,8 +505,8 @@ public class AlphaIntegrationFsmTest {
     assertNotNull(sagaData.getBeginTime());
     assertNotNull(sagaData.getEndTime());
     assertTrue(sagaData.getEndTime().getTime() > sagaData.getBeginTime().getTime());
-    assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED);
-    assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED);
+    assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED_SUCCEED);
+    assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED_SUCCEED);
     assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(),TxState.FAILED);
     assertArrayEquals(sagaData.getTxEntities().get(localTxId_3).getThrowablePayLoads(),NullPointerException.class.getName().getBytes());
   }
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/GrpcSagaClientMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/GrpcSagaClientMessageSender.java
index d628832..20d0b49 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/GrpcSagaClientMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/GrpcSagaClientMessageSender.java
@@ -111,10 +111,12 @@ public class GrpcSagaClientMessageSender implements SagaMessageSender {
         .setType(event.type().name())
         .setTimeout(event.timeout())
         .setForwardTimeout(event.forwardTimeout())
+        .setReverseTimeout(event.reverseTimeout())
         .setCompensationMethod(event.compensationMethod())
         .setRetryMethod(event.retryMethod() == null ? "" : event.retryMethod())
         .setForwardRetries(event.forwardRetries())
         .setReverseRetries(event.reverseRetries())
+        .setRetryDelayInMilliseconds(event.retryDelayInMilliseconds())
         .setPayloads(payloads);
 
     return builder.build();
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CallbackContext.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CallbackContext.java
index 94a7111..6a3bcb1 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CallbackContext.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CallbackContext.java
@@ -43,7 +43,7 @@ public class CallbackContext {
     contexts.put(key, new CallbackContextInternal(target, compensationMethod));
   }
 
-  public void apply(String globalTxId, String localTxId, String callbackMethod, Object... payloads) {
+  public void apply(String globalTxId, String localTxId, String parentTxId, String callbackMethod, Object... payloads) {
     CallbackContextInternal contextInternal = contexts.get(callbackMethod);
     String oldGlobalTxId = omegaContext.globalTxId();
     String oldLocalTxId = omegaContext.localTxId();
@@ -54,14 +54,14 @@ public class CallbackContext {
       if (omegaContext.getAlphaMetas().isAkkaEnabled()) {
         sender.send(
             new TxCompensateAckSucceedEvent(omegaContext.globalTxId(), omegaContext.localTxId(),
-                omegaContext.globalTxId()));
+                parentTxId, callbackMethod));
       }
       LOG.info("Callback transaction with global tx id [{}], local tx id [{}]", globalTxId, localTxId);
     } catch (IllegalAccessException | InvocationTargetException e) {
       if (omegaContext.getAlphaMetas().isAkkaEnabled()) {
         sender.send(
             new TxCompensateAckFailedEvent(omegaContext.globalTxId(), omegaContext.localTxId(),
-                omegaContext.globalTxId()));
+                parentTxId, callbackMethod, e));
       }
       LOG.error(
           "Pre-checking for callback method " + contextInternal.callbackMethod.toString()
@@ -73,6 +73,10 @@ public class CallbackContext {
     }
   }
 
+  public OmegaContext getOmegaContext() {
+    return omegaContext;
+  }
+
   private static final class CallbackContextInternal {
     private final Object target;
 
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensationMessageHandler.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensationMessageHandler.java
index 8d5d0f8..93d2ebd 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensationMessageHandler.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensationMessageHandler.java
@@ -30,7 +30,9 @@ public class CompensationMessageHandler implements MessageHandler {
   @Override
   public void onReceive(String globalTxId, String localTxId, String parentTxId, String compensationMethod,
       Object... payloads) {
-    context.apply(globalTxId, localTxId, compensationMethod, payloads);
-    sender.send(new TxCompensatedEvent(globalTxId, localTxId, parentTxId, compensationMethod));
+    context.apply(globalTxId, localTxId, parentTxId, compensationMethod, payloads);
+    if (!context.getOmegaContext().getAlphaMetas().isAkkaEnabled()) {
+      sender.send(new TxCompensatedEvent(globalTxId, localTxId, parentTxId, compensationMethod));
+    }
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxCompensateAckFailedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxCompensateAckFailedEvent.java
index db009c2..0a41369 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxCompensateAckFailedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxCompensateAckFailedEvent.java
@@ -20,7 +20,7 @@ package org.apache.servicecomb.pack.omega.transaction;
 import org.apache.servicecomb.pack.common.EventType;
 
 public class TxCompensateAckFailedEvent extends TxEvent {
-  public TxCompensateAckFailedEvent(String globalTxId, String localTxId, String parentTxId) {
-    super(EventType.TxCompensateAckFailedEvent, globalTxId, localTxId, parentTxId, "", 0, "", 0,0 ,0 ,0, 0);
+  public TxCompensateAckFailedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Throwable throwable) {
+    super(EventType.TxCompensateAckFailedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0, "", 0,0 ,0 ,0, 0, stackTrace(throwable));
   }
 }
\ No newline at end of file
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxCompensateAckSucceedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxCompensateAckSucceedEvent.java
index 32870f7..0f9f972 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxCompensateAckSucceedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxCompensateAckSucceedEvent.java
@@ -20,7 +20,7 @@ package org.apache.servicecomb.pack.omega.transaction;
 import org.apache.servicecomb.pack.common.EventType;
 
 public class TxCompensateAckSucceedEvent extends TxEvent {
-  public TxCompensateAckSucceedEvent(String globalTxId, String localTxId, String parentTxId) {
-    super(EventType.TxCompensateAckSucceedEvent, globalTxId, localTxId, parentTxId, "", 0, "", 0, 0, 0, 0, 0);
+  public TxCompensateAckSucceedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod) {
+    super(EventType.TxCompensateAckSucceedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0, "", 0, 0, 0, 0, 0);
   }
 }
\ No newline at end of file
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/tcc/CoordinateMessageHandler.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/tcc/CoordinateMessageHandler.java
index ab188cd..a24a018 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/tcc/CoordinateMessageHandler.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/tcc/CoordinateMessageHandler.java
@@ -45,7 +45,7 @@ public class CoordinateMessageHandler implements TccMessageHandler {
   public void onReceive(String globalTxId, String localTxId, String parentTxId, String methodName) {
     // TODO need to catch the exception and send the failed message
     // The parameter need to be updated here
-    callbackContext.apply(globalTxId, localTxId, methodName, parametersContext.getParameters(localTxId));
+    callbackContext.apply(globalTxId, localTxId, parentTxId, methodName, parametersContext.getParameters(localTxId));
     tccMessageSender.coordinate(new CoordinatedEvent(globalTxId, localTxId, parentTxId, methodName, TransactionStatus.Succeed));
     // Need to remove the parameter
     parametersContext.removeParameter(localTxId);
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/CompensationMessageHandlerTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/CompensationMessageHandlerTest.java
index 8b6353e..d46e1b8 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/CompensationMessageHandlerTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/CompensationMessageHandlerTest.java
@@ -18,16 +18,23 @@
 package org.apache.servicecomb.pack.omega.transaction;
 
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.servicecomb.pack.common.EventType;
 import org.apache.servicecomb.pack.contract.grpc.ServerMeta;
+import org.apache.servicecomb.pack.omega.context.AlphaMetas;
+import org.apache.servicecomb.pack.omega.context.IdGenerator;
+import org.apache.servicecomb.pack.omega.context.OmegaContext;
+import org.apache.servicecomb.pack.omega.context.UniqueIdGenerator;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -73,10 +80,6 @@ public class CompensationMessageHandlerTest {
   private final String compensationMethod = getClass().getCanonicalName();
   private final String payload = uniquify("blah");
 
-  private final CallbackContext context = mock(CallbackContext.class);
-
-  private final CompensationMessageHandler handler = new CompensationMessageHandler(sender, context);
-
   @Before
   public void setUp() {
     events.clear();
@@ -84,10 +87,13 @@ public class CompensationMessageHandlerTest {
 
   @Test
   public void sendsCompensatedEventOnCompensationCompleted() {
+    final CallbackContext context = mock(CallbackContext.class);
+    final CompensationMessageHandler handler = new CompensationMessageHandler(sender, context);
+    IdGenerator<String> idGenerator = new UniqueIdGenerator();
+    OmegaContext omegaContext = new OmegaContext(idGenerator,AlphaMetas.builder().akkaEnabled(false).build());
+    when(context.getOmegaContext()).thenReturn(omegaContext);
     handler.onReceive(globalTxId, localTxId, parentTxId, compensationMethod, payload);
-
     assertThat(events.size(), is(1));
-
     TxEvent event = events.get(0);
     assertThat(event.globalTxId(), is(globalTxId));
     assertThat(event.localTxId(), is(localTxId));
@@ -95,7 +101,53 @@ public class CompensationMessageHandlerTest {
     assertThat(event.type(), is(EventType.TxCompensatedEvent));
     assertThat(event.compensationMethod(), is(getClass().getCanonicalName()));
     assertThat(event.payloads().length, is(0));
+    verify(context).apply(globalTxId, localTxId, parentTxId, compensationMethod, payload);
+  }
+
+  @Test
+  public void sendsCompensateAckSucceedEventOnCompensationCompletedWithFSM() throws NoSuchMethodException {
+    IdGenerator<String> idGenerator = new UniqueIdGenerator();
+    OmegaContext omegaContext = new OmegaContext(idGenerator,AlphaMetas.builder().akkaEnabled(true).build());
+    CallbackContext context = new CallbackContext(omegaContext, sender);
+    Method mockMethod = this.getClass().getMethod("mockCompensationSucceedMethod",String.class);
+    context.addCallbackContext(compensationMethod, mockMethod, this);
+    CompensationMessageHandler handler = new CompensationMessageHandler(sender, context);
+    handler.onReceive(globalTxId, localTxId, parentTxId, compensationMethod, payload);
+    assertThat(events.size(), is(1));
+    TxEvent event = events.get(0);
+    assertThat(event.globalTxId(), is(globalTxId));
+    assertThat(event.localTxId(), is(localTxId));
+    assertThat(event.parentTxId(), is(parentTxId));
+    assertThat(event.type(), is(EventType.TxCompensateAckSucceedEvent));
+    assertThat(event.compensationMethod(), is(getClass().getCanonicalName()));
+    assertThat(event.payloads().length, is(0));
+  }
+
+  @Test
+  public void sendsCompensateAckFailedEventOnCompensationFailedWithFSM() throws NoSuchMethodException {
+    IdGenerator<String> idGenerator = new UniqueIdGenerator();
+    OmegaContext omegaContext = new OmegaContext(idGenerator,AlphaMetas.builder().akkaEnabled(true).build());
+    CallbackContext context = new CallbackContext(omegaContext, sender);
+    Method mockMethod = this.getClass().getMethod("mockCompensationFailedMethod",String.class);
+    context.addCallbackContext(compensationMethod, mockMethod, this);
+    CompensationMessageHandler handler = new CompensationMessageHandler(sender, context);
+    handler.onReceive(globalTxId, localTxId, parentTxId, compensationMethod, payload);
+    assertThat(events.size(), is(1));
+    TxEvent event = events.get(0);
+    assertThat(event.globalTxId(), is(globalTxId));
+    assertThat(event.localTxId(), is(localTxId));
+    assertThat(event.parentTxId(), is(parentTxId));
+    assertThat(event.type(), is(EventType.TxCompensateAckFailedEvent));
+    assertThat(event.compensationMethod(), is(getClass().getCanonicalName()));
+    assertThat(event.payloads().length, greaterThan(0));
+  }
+
+  public void mockCompensationSucceedMethod(String payloads){
+    // mock compensation method
+  }
 
-    verify(context).apply(globalTxId, localTxId, compensationMethod, payload);
+  public void mockCompensationFailedMethod(String payloads){
+    // mock compensation method
+    throw new RuntimeException("mock compensation failed");
   }
 }