You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/08/17 12:17:48 UTC

[incubator-servicecomb-saga] 01/03: SCB-817 Added TCC events in Omega part (WIP)

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch SCB-665
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit d8c635b572d4357f041b6f8b47a83fb358179100
Author: Willem Jiang <ji...@huawei.com>
AuthorDate: Sun Aug 12 11:16:22 2018 +0800

    SCB-817 Added TCC events in Omega part (WIP)
---
 .../saga/omega/context/annotations/TccStart.java   | 15 +++++
 .../saga/omega/transaction/annotations/TCC.java    | 64 ++++++++++++++++++++++
 .../saga/omega/transaction/tcc/TccAspect.java      | 48 ++++++++++++++++
 .../saga/omega/transaction/tcc/TccInterceptor.java |  4 ++
 .../tcc/TccStartAnnotationProcessor.java           | 47 ++++++++++++++++
 .../saga/omega/transaction/tcc/TccStartAspect.java | 60 ++++++++++++++++++++
 .../omega/transaction/tcc/events/CancelEvent.java  | 20 +++++++
 .../omega/transaction/tcc/events/ConfirmEvent.java | 20 +++++++
 .../transaction/tcc/events/ParticipateEvent.java   | 21 +++++++
 .../tcc/events/TransactionEndEvent.java            |  4 ++
 .../tcc/events/TransationStartEvent.java           |  4 ++
 11 files changed, 307 insertions(+)

diff --git a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/annotations/TccStart.java b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/annotations/TccStart.java
new file mode 100644
index 0000000..c9154f9
--- /dev/null
+++ b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/annotations/TccStart.java
@@ -0,0 +1,15 @@
+package org.apache.servicecomb.saga.omega.context.annotations;
+
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+/**
+ * Indicates the annotated method will start a TCC .
+ */
+@Retention(RUNTIME)
+@Target(METHOD)
+public @interface TccStart {
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/TCC.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/TCC.java
new file mode 100644
index 0000000..a095978
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/TCC.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.saga.omega.transaction.annotations;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.METHOD})
+/**
+ * Indicates the annotated method will start a sub-transaction. <br>
+ * A <code>@TCC</code> method should satisfy below requirements:
+ * <ol>
+ *   <li>all parameters are serialized</li>
+ *   <li>is idempotent</li>
+ *   <li>the object instance which @TCC method resides in should be stateless</li>
+ * </ol>
+ */
+public @interface TCC {
+  /**
+   * Confirm method name.<br>
+   * A confirm method should satisfy below requirements:
+   * <ol>
+   *   <li>has same parameter list as @TCC method's</li>
+   *   <li>all parameters are serialized</li>
+   *   <li>is idempotent</li>
+   *   <li>be in the same class as @TCC method is in</li>
+   * </ol>
+   *
+   * @return
+   */
+  String confirmMethod() default "";
+
+  /**
+   * Cancel method name.<br>
+   * A cancel method should satisfy below requirements:
+   * <ol>
+   *   <li>has same parameter list as @TCC method's</li>
+   *   <li>all parameters are serialized</li>
+   *   <li>is idempotent</li>
+   *   <li>be in the same class as @TCC method is in</li>
+   * </ol>
+   *
+   * @return
+   */
+  String cancelMethod() default "";
+
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccAspect.java
new file mode 100644
index 0000000..173cb78
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccAspect.java
@@ -0,0 +1,48 @@
+package org.apache.servicecomb.saga.omega.transaction.tcc;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.Method;
+
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.transaction.CompensableInterceptor;
+import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+import org.apache.servicecomb.saga.omega.transaction.RecoveryPolicy;
+import org.apache.servicecomb.saga.omega.transaction.RecoveryPolicyFactory;
+import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.annotation.Around;
+import org.aspectj.lang.annotation.Aspect;
+import org.aspectj.lang.reflect.MethodSignature;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Aspect
+public class TccAspect {
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final OmegaContext context;
+
+  private final CompensableInterceptor interceptor;
+
+  public TccAspect(MessageSender sender, OmegaContext context) {
+    this.context = context;
+    this.interceptor = new CompensableInterceptor(context, sender);
+  }
+
+  @Around("execution(@org.apache.servicecomb.saga.omega.transaction.annotations.Compensable * *(..)) && @annotation(compensable)")
+  Object advise(ProceedingJoinPoint joinPoint, Compensable compensable) throws Throwable {
+    Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
+    String localTxId = context.localTxId();
+    context.newLocalTxId();
+    LOG.debug("Updated context {} for compensable method {} ", context, method.toString());
+
+    int retries = compensable.retries();
+    RecoveryPolicy recoveryPolicy = RecoveryPolicyFactory.getRecoveryPolicy(retries);
+    try {
+      return recoveryPolicy.apply(joinPoint, compensable, interceptor, context, localTxId, retries);
+    } finally {
+      context.setLocalTxId(localTxId);
+      LOG.debug("Restored context back to {}", context);
+    }
+  }
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccInterceptor.java
new file mode 100644
index 0000000..5ada269
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccInterceptor.java
@@ -0,0 +1,4 @@
+package org.apache.servicecomb.saga.omega.transaction.tcc;
+
+public class TccInterceptor {
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessor.java
new file mode 100644
index 0000000..3e708cd
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessor.java
@@ -0,0 +1,47 @@
+package org.apache.servicecomb.saga.omega.transaction.tcc;
+
+import javax.transaction.TransactionalException;
+
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
+import org.apache.servicecomb.saga.omega.transaction.EventAwareInterceptor;
+import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+import org.apache.servicecomb.saga.omega.transaction.OmegaException;
+import org.apache.servicecomb.saga.omega.transaction.SagaEndedEvent;
+import org.apache.servicecomb.saga.omega.transaction.SagaStartedEvent;
+import org.apache.servicecomb.saga.omega.transaction.TxAbortedEvent;
+
+public class TccStartAnnotationProcessor implements EventAwareInterceptor {
+
+  private final OmegaContext omegaContext;
+  private final MessageSender sender;
+
+  TccStartAnnotationProcessor(OmegaContext omegaContext, MessageSender sender) {
+    this.omegaContext = omegaContext;
+    this.sender = sender;
+  }
+
+  @Override
+  public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod,
+      int retries, Object... message) {
+    try {
+      return sender.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId(), timeout));
+    } catch (OmegaException e) {
+      throw new TransactionalException(e.getMessage(), e.getCause());
+    }
+  }
+
+  @Override
+  public void postIntercept(String parentTxId, String compensationMethod) {
+    AlphaResponse response = sender.send(new SagaEndedEvent(omegaContext.globalTxId(), omegaContext.localTxId()));
+    if (response.aborted()) {
+      throw new OmegaException("transaction " + parentTxId + " is aborted");
+    }
+  }
+
+  @Override
+  public void onError(String parentTxId, String compensationMethod, Throwable throwable) {
+    String globalTxId = omegaContext.globalTxId();
+    sender.send(new TxAbortedEvent(globalTxId, omegaContext.localTxId(), null, compensationMethod, throwable));
+  }
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspect.java
new file mode 100644
index 0000000..5b380db
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspect.java
@@ -0,0 +1,60 @@
+package org.apache.servicecomb.saga.omega.transaction.tcc;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.Method;
+
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.context.annotations.SagaStart;
+import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+import org.apache.servicecomb.saga.omega.transaction.OmegaException;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.annotation.Around;
+import org.aspectj.lang.annotation.Aspect;
+import org.aspectj.lang.reflect.MethodSignature;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Aspect
+public class TccStartAspect {
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final TccStartAnnotationProcessor tccStartAnnotationProcessor;
+
+  private final OmegaContext context;
+
+  public TccStartAspect(MessageSender sender, OmegaContext context) {
+    this.context = context;
+    this.tccStartAnnotationProcessor = new TccStartAnnotationProcessor(context, sender);
+  }
+
+  @Around("execution(@org.apache.servicecomb.saga.omega.context.annotations.TccStart * *(..)) && @annotation(sagaStart)")
+  Object advise(ProceedingJoinPoint joinPoint, SagaStart sagaStart) throws Throwable {
+    initializeOmegaContext();
+    Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
+
+    tccStartAnnotationProcessor.preIntercept(context.globalTxId(), method.toString(), sagaStart.timeout(), "", 0);
+    LOG.debug("Initialized context {} before execution of method {}", context, method.toString());
+
+    try {
+      Object result = joinPoint.proceed();
+
+      tccStartAnnotationProcessor.postIntercept(context.globalTxId(), method.toString());
+      LOG.debug("Transaction with context {} has finished.", context);
+
+      return result;
+    } catch (Throwable throwable) {
+      // We don't need to handle the OmegaException here
+      if (!(throwable instanceof OmegaException)) {
+        tccStartAnnotationProcessor.onError(context.globalTxId(), method.toString(), throwable);
+        LOG.error("Transaction {} failed.", context.globalTxId());
+      }
+      throw throwable;
+    } finally {
+      context.clear();
+    }
+  }
+
+  private void initializeOmegaContext() {
+    context.setLocalTxId(context.newGlobalTxId());
+  }
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/CancelEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/CancelEvent.java
new file mode 100644
index 0000000..7659812
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/CancelEvent.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.saga.omega.transaction.tcc.events;
+
+public class CancelEvent {
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/ConfirmEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/ConfirmEvent.java
new file mode 100644
index 0000000..f01675b
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/ConfirmEvent.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.saga.omega.transaction.tcc.events;
+
+public class ConfirmEvent {
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/ParticipateEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/ParticipateEvent.java
new file mode 100644
index 0000000..83e910b
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/ParticipateEvent.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.saga.omega.transaction.tcc.events;
+
+
+public class ParticipateEvent {
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TransactionEndEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TransactionEndEvent.java
new file mode 100644
index 0000000..9613602
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TransactionEndEvent.java
@@ -0,0 +1,4 @@
+package org.apache.servicecomb.saga.omega.transaction.tcc.events;
+
+public class TransactionEndEvent {
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TransationStartEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TransationStartEvent.java
new file mode 100644
index 0000000..54858c6
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TransationStartEvent.java
@@ -0,0 +1,4 @@
+package org.apache.servicecomb.saga.omega.transaction.tcc.events;
+
+public class TransationStartEvent {
+}