You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by GitBox <gi...@apache.org> on 2019/12/07 09:35:46 UTC
[GitHub] [servicecomb-pack] coolbeevip commented on a change in pull request
#609: [SCB-1627] Omega side CallbackContext send the compensation result
event to Alpha
coolbeevip commented on a change in pull request #609: [SCB-1627] Omega side CallbackContext send the compensation result event to Alpha
URL: https://github.com/apache/servicecomb-pack/pull/609#discussion_r355111783
##########
File path: alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcOmegaCallback.java
##########
@@ -19,32 +19,71 @@
import com.google.protobuf.ByteString;
import io.grpc.stub.StreamObserver;
+import java.lang.invoke.MethodHandles;
import org.apache.servicecomb.pack.alpha.core.OmegaCallback;
import org.apache.servicecomb.pack.alpha.core.TxEvent;
+import org.apache.servicecomb.pack.alpha.core.exception.CompensateAckFailedException;
+import org.apache.servicecomb.pack.alpha.core.exception.CompensateConnectException;
+import org.apache.servicecomb.pack.alpha.core.fsm.CompensateAckType;
import org.apache.servicecomb.pack.contract.grpc.GrpcCompensateCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
class GrpcOmegaCallback implements OmegaCallback {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final StreamObserver<GrpcCompensateCommand> observer;
+ private CompensateAckCountDownLatch compensateAckCountDownLatch;
GrpcOmegaCallback(StreamObserver<GrpcCompensateCommand> observer) {
this.observer = observer;
}
@Override
public void compensate(TxEvent event) {
- GrpcCompensateCommand command = GrpcCompensateCommand.newBuilder()
- .setGlobalTxId(event.globalTxId())
- .setLocalTxId(event.localTxId())
- .setParentTxId(event.parentTxId() == null ? "" : event.parentTxId())
- .setCompensationMethod(event.compensationMethod())
- .setPayloads(ByteString.copyFrom(event.payloads()))
- .build();
- observer.onNext(command);
+ compensateAckCountDownLatch = new CompensateAckCountDownLatch(1);
+ try {
+ GrpcCompensateCommand command = GrpcCompensateCommand.newBuilder()
+ .setGlobalTxId(event.globalTxId())
+ .setLocalTxId(event.localTxId())
+ .setParentTxId(event.parentTxId() == null ? "" : event.parentTxId())
+ .setCompensationMethod(event.compensationMethod())
+ .setPayloads(ByteString.copyFrom(event.payloads()))
+ .build();
+ observer.onNext(command);
+ compensateAckCountDownLatch.await();
Review comment:
This is good advice, I think it can be solved in three steps
step 1: The default timeout is 30 seconds in this PR
```
compensateAckCountDownLatch.await(10, TimeUnit.SECONDS);
```
step2: Support user-defined timeout in another PR https://issues.apache.org/jira/browse/SCB-1646
step3: I agree with async,many compensations cause many waiting threads.
Maybe we need to add a state to the state machine, switch to this state after calling compensation, and follow-up processing after receiving compensation ack. It leads to increased complexity.
I'm not sure if this is better. After using the compensation ack mode, I think that the compensation of the sub-transactions should be performed one by one in reverse order.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services