You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/08/17 12:17:48 UTC
[incubator-servicecomb-saga] 01/03: SCB-817 Added TCC events in
Omega part (WIP)
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch SCB-665
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit d8c635b572d4357f041b6f8b47a83fb358179100
Author: Willem Jiang <ji...@huawei.com>
AuthorDate: Sun Aug 12 11:16:22 2018 +0800
SCB-817 Added TCC events in Omega part (WIP)
---
.../saga/omega/context/annotations/TccStart.java | 15 +++++
.../saga/omega/transaction/annotations/TCC.java | 64 ++++++++++++++++++++++
.../saga/omega/transaction/tcc/TccAspect.java | 48 ++++++++++++++++
.../saga/omega/transaction/tcc/TccInterceptor.java | 4 ++
.../tcc/TccStartAnnotationProcessor.java | 47 ++++++++++++++++
.../saga/omega/transaction/tcc/TccStartAspect.java | 60 ++++++++++++++++++++
.../omega/transaction/tcc/events/CancelEvent.java | 20 +++++++
.../omega/transaction/tcc/events/ConfirmEvent.java | 20 +++++++
.../transaction/tcc/events/ParticipateEvent.java | 21 +++++++
.../tcc/events/TransactionEndEvent.java | 4 ++
.../tcc/events/TransationStartEvent.java | 4 ++
11 files changed, 307 insertions(+)
diff --git a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/annotations/TccStart.java b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/annotations/TccStart.java
new file mode 100644
index 0000000..c9154f9
--- /dev/null
+++ b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/annotations/TccStart.java
@@ -0,0 +1,15 @@
+package org.apache.servicecomb.saga.omega.context.annotations;
+
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+/**
+ * Indicates the annotated method will start a TCC .
+ */
+@Retention(RUNTIME)
+@Target(METHOD)
+public @interface TccStart {
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/TCC.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/TCC.java
new file mode 100644
index 0000000..a095978
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/TCC.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.saga.omega.transaction.annotations;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.METHOD})
+/**
+ * Indicates the annotated method will start a sub-transaction. <br>
+ * A <code>@TCC</code> method should satisfy below requirements:
+ * <ol>
+ * <li>all parameters are serialized</li>
+ * <li>is idempotent</li>
+ * <li>the object instance which @TCC method resides in should be stateless</li>
+ * </ol>
+ */
+public @interface TCC {
+ /**
+ * Confirm method name.<br>
+ * A confirm method should satisfy below requirements:
+ * <ol>
+ * <li>has same parameter list as @TCC method's</li>
+ * <li>all parameters are serialized</li>
+ * <li>is idempotent</li>
+ * <li>be in the same class as @TCC method is in</li>
+ * </ol>
+ *
+ * @return
+ */
+ String confirmMethod() default "";
+
+ /**
+ * Cancel method name.<br>
+ * A cancel method should satisfy below requirements:
+ * <ol>
+ * <li>has same parameter list as @TCC method's</li>
+ * <li>all parameters are serialized</li>
+ * <li>is idempotent</li>
+ * <li>be in the same class as @TCC method is in</li>
+ * </ol>
+ *
+ * @return
+ */
+ String cancelMethod() default "";
+
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccAspect.java
new file mode 100644
index 0000000..173cb78
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccAspect.java
@@ -0,0 +1,48 @@
+package org.apache.servicecomb.saga.omega.transaction.tcc;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.Method;
+
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.transaction.CompensableInterceptor;
+import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+import org.apache.servicecomb.saga.omega.transaction.RecoveryPolicy;
+import org.apache.servicecomb.saga.omega.transaction.RecoveryPolicyFactory;
+import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.annotation.Around;
+import org.aspectj.lang.annotation.Aspect;
+import org.aspectj.lang.reflect.MethodSignature;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Aspect
+public class TccAspect {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final OmegaContext context;
+
+ private final CompensableInterceptor interceptor;
+
+ public TccAspect(MessageSender sender, OmegaContext context) {
+ this.context = context;
+ this.interceptor = new CompensableInterceptor(context, sender);
+ }
+
+ @Around("execution(@org.apache.servicecomb.saga.omega.transaction.annotations.Compensable * *(..)) && @annotation(compensable)")
+ Object advise(ProceedingJoinPoint joinPoint, Compensable compensable) throws Throwable {
+ Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
+ String localTxId = context.localTxId();
+ context.newLocalTxId();
+ LOG.debug("Updated context {} for compensable method {} ", context, method.toString());
+
+ int retries = compensable.retries();
+ RecoveryPolicy recoveryPolicy = RecoveryPolicyFactory.getRecoveryPolicy(retries);
+ try {
+ return recoveryPolicy.apply(joinPoint, compensable, interceptor, context, localTxId, retries);
+ } finally {
+ context.setLocalTxId(localTxId);
+ LOG.debug("Restored context back to {}", context);
+ }
+ }
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccInterceptor.java
new file mode 100644
index 0000000..5ada269
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccInterceptor.java
@@ -0,0 +1,4 @@
+package org.apache.servicecomb.saga.omega.transaction.tcc;
+
+public class TccInterceptor {
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessor.java
new file mode 100644
index 0000000..3e708cd
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessor.java
@@ -0,0 +1,47 @@
+package org.apache.servicecomb.saga.omega.transaction.tcc;
+
+import javax.transaction.TransactionalException;
+
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
+import org.apache.servicecomb.saga.omega.transaction.EventAwareInterceptor;
+import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+import org.apache.servicecomb.saga.omega.transaction.OmegaException;
+import org.apache.servicecomb.saga.omega.transaction.SagaEndedEvent;
+import org.apache.servicecomb.saga.omega.transaction.SagaStartedEvent;
+import org.apache.servicecomb.saga.omega.transaction.TxAbortedEvent;
+
+public class TccStartAnnotationProcessor implements EventAwareInterceptor {
+
+ private final OmegaContext omegaContext;
+ private final MessageSender sender;
+
+ TccStartAnnotationProcessor(OmegaContext omegaContext, MessageSender sender) {
+ this.omegaContext = omegaContext;
+ this.sender = sender;
+ }
+
+ @Override
+ public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod,
+ int retries, Object... message) {
+ try {
+ return sender.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId(), timeout));
+ } catch (OmegaException e) {
+ throw new TransactionalException(e.getMessage(), e.getCause());
+ }
+ }
+
+ @Override
+ public void postIntercept(String parentTxId, String compensationMethod) {
+ AlphaResponse response = sender.send(new SagaEndedEvent(omegaContext.globalTxId(), omegaContext.localTxId()));
+ if (response.aborted()) {
+ throw new OmegaException("transaction " + parentTxId + " is aborted");
+ }
+ }
+
+ @Override
+ public void onError(String parentTxId, String compensationMethod, Throwable throwable) {
+ String globalTxId = omegaContext.globalTxId();
+ sender.send(new TxAbortedEvent(globalTxId, omegaContext.localTxId(), null, compensationMethod, throwable));
+ }
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspect.java
new file mode 100644
index 0000000..5b380db
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspect.java
@@ -0,0 +1,60 @@
+package org.apache.servicecomb.saga.omega.transaction.tcc;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.Method;
+
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.context.annotations.SagaStart;
+import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+import org.apache.servicecomb.saga.omega.transaction.OmegaException;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.annotation.Around;
+import org.aspectj.lang.annotation.Aspect;
+import org.aspectj.lang.reflect.MethodSignature;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Aspect
+public class TccStartAspect {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final TccStartAnnotationProcessor tccStartAnnotationProcessor;
+
+ private final OmegaContext context;
+
+ public TccStartAspect(MessageSender sender, OmegaContext context) {
+ this.context = context;
+ this.tccStartAnnotationProcessor = new TccStartAnnotationProcessor(context, sender);
+ }
+
+ @Around("execution(@org.apache.servicecomb.saga.omega.context.annotations.TccStart * *(..)) && @annotation(sagaStart)")
+ Object advise(ProceedingJoinPoint joinPoint, SagaStart sagaStart) throws Throwable {
+ initializeOmegaContext();
+ Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
+
+ tccStartAnnotationProcessor.preIntercept(context.globalTxId(), method.toString(), sagaStart.timeout(), "", 0);
+ LOG.debug("Initialized context {} before execution of method {}", context, method.toString());
+
+ try {
+ Object result = joinPoint.proceed();
+
+ tccStartAnnotationProcessor.postIntercept(context.globalTxId(), method.toString());
+ LOG.debug("Transaction with context {} has finished.", context);
+
+ return result;
+ } catch (Throwable throwable) {
+ // We don't need to handle the OmegaException here
+ if (!(throwable instanceof OmegaException)) {
+ tccStartAnnotationProcessor.onError(context.globalTxId(), method.toString(), throwable);
+ LOG.error("Transaction {} failed.", context.globalTxId());
+ }
+ throw throwable;
+ } finally {
+ context.clear();
+ }
+ }
+
+ private void initializeOmegaContext() {
+ context.setLocalTxId(context.newGlobalTxId());
+ }
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/CancelEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/CancelEvent.java
new file mode 100644
index 0000000..7659812
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/CancelEvent.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.saga.omega.transaction.tcc.events;
+
+public class CancelEvent {
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/ConfirmEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/ConfirmEvent.java
new file mode 100644
index 0000000..f01675b
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/ConfirmEvent.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.saga.omega.transaction.tcc.events;
+
+public class ConfirmEvent {
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/ParticipateEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/ParticipateEvent.java
new file mode 100644
index 0000000..83e910b
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/ParticipateEvent.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.saga.omega.transaction.tcc.events;
+
+
+public class ParticipateEvent {
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TransactionEndEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TransactionEndEvent.java
new file mode 100644
index 0000000..9613602
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TransactionEndEvent.java
@@ -0,0 +1,4 @@
+package org.apache.servicecomb.saga.omega.transaction.tcc.events;
+
+public class TransactionEndEvent {
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TransationStartEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TransationStartEvent.java
new file mode 100644
index 0000000..54858c6
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TransationStartEvent.java
@@ -0,0 +1,4 @@
+package org.apache.servicecomb.saga.omega.transaction.tcc.events;
+
+public class TransationStartEvent {
+}