You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by GitBox <gi...@apache.org> on 2019/12/07 09:35:46 UTC

[GitHub] [servicecomb-pack] coolbeevip commented on a change in pull request #609: [SCB-1627] Omega side CallbackContext send the compensation result event to Alpha

coolbeevip commented on a change in pull request #609: [SCB-1627] Omega side CallbackContext send the compensation result event to Alpha
URL: https://github.com/apache/servicecomb-pack/pull/609#discussion_r355111783
 
 

 ##########
 File path: alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcOmegaCallback.java
 ##########
 @@ -19,32 +19,71 @@
 
 import com.google.protobuf.ByteString;
 import io.grpc.stub.StreamObserver;
+import java.lang.invoke.MethodHandles;
 import org.apache.servicecomb.pack.alpha.core.OmegaCallback;
 import org.apache.servicecomb.pack.alpha.core.TxEvent;
+import org.apache.servicecomb.pack.alpha.core.exception.CompensateAckFailedException;
+import org.apache.servicecomb.pack.alpha.core.exception.CompensateConnectException;
+import org.apache.servicecomb.pack.alpha.core.fsm.CompensateAckType;
 import org.apache.servicecomb.pack.contract.grpc.GrpcCompensateCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class GrpcOmegaCallback implements OmegaCallback {
 
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private final StreamObserver<GrpcCompensateCommand> observer;
+  private CompensateAckCountDownLatch compensateAckCountDownLatch;
 
   GrpcOmegaCallback(StreamObserver<GrpcCompensateCommand> observer) {
     this.observer = observer;
   }
 
   @Override
   public void compensate(TxEvent event) {
-    GrpcCompensateCommand command = GrpcCompensateCommand.newBuilder()
-        .setGlobalTxId(event.globalTxId())
-        .setLocalTxId(event.localTxId())
-        .setParentTxId(event.parentTxId() == null ? "" : event.parentTxId())
-        .setCompensationMethod(event.compensationMethod())
-        .setPayloads(ByteString.copyFrom(event.payloads()))
-        .build();
-    observer.onNext(command);
+    compensateAckCountDownLatch = new CompensateAckCountDownLatch(1);
+    try {
+      GrpcCompensateCommand command = GrpcCompensateCommand.newBuilder()
+          .setGlobalTxId(event.globalTxId())
+          .setLocalTxId(event.localTxId())
+          .setParentTxId(event.parentTxId() == null ? "" : event.parentTxId())
+          .setCompensationMethod(event.compensationMethod())
+          .setPayloads(ByteString.copyFrom(event.payloads()))
+          .build();
+      observer.onNext(command);
+      compensateAckCountDownLatch.await();
 
 Review comment:
   This is good advice, I think it can be solved in three steps
   
   step 1: The default timeout is 30 seconds in this PR
   ```
   compensateAckCountDownLatch.await(10, TimeUnit.SECONDS);
   ```
   step2: Support user-defined timeout in another PR  https://issues.apache.org/jira/browse/SCB-1646
   
   step3: I agree with async,many compensations cause many waiting threads.
   Maybe we need to add a state to the state machine, switch to this state after calling compensation, and follow-up processing after receiving compensation ack. It leads to increased complexity. 
   
   I'm not sure if this is better. After using the compensation ack mode, I think that the compensation of the sub-transactions should be performed one by one in reverse order.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services