You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/04/28 09:13:49 UTC
[incubator-servicecomb-saga] 05/07: SCB-224 alpha:support retries
use command
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 9d2a0b84166403690685e17f9f3ba176204fdfbe
Author: zhang2014 <co...@gmail.com>
AuthorDate: Tue Jan 23 23:42:35 2018 +0800
SCB-224 alpha:support retries use command
---
.../servicecomb/saga/alpha/core/Command.java | 4 +-
.../saga/alpha/core/CommandRepository.java | 2 +-
.../servicecomb/saga/alpha/core/OmegaCallback.java | 3 +-
.../saga/alpha/core/TxConsistentService.java | 54 ++----------
.../saga/alpha/server/SpringCommandRepository.java | 14 ++++
.../saga/alpha/server/TxEventEnvelope.java | 96 ----------------------
.../alpha/server/TxEventEnvelopeRepository.java | 1 +
.../src/main/resources/schema-postgresql.sql | 2 +
.../saga/alpha/server/AlphaIntegrationTest.java | 49 +++++++++++
alpha/alpha-server/src/test/resources/schema.sql | 2 +
.../saga/omega/context/CompensationContext.java | 15 ++--
.../saga/omega/context/OmegaContext.java | 2 -
.../omega/transaction/CompensableInterceptor.java | 2 -
13 files changed, 83 insertions(+), 163 deletions(-)
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
index 0f016d3..4edb928 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
@@ -115,11 +115,11 @@ public class Command {
return localTxId;
}
- String parentTxId() {
+ public String parentTxId() {
return parentTxId;
}
- String compensationMethod() {
+ public String compensationMethod() {
return compensationMethod;
}
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
index 2bbea77..25288ed 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
@@ -21,7 +21,7 @@ import java.util.List;
public interface CommandRepository {
- void saveCompensationCommands(String globalTxId);
+ void saveCompensationCommands(TxEvent abortEvent);
void markCommandAsDone(String globalTxId, String localTxId);
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java
index c7dfbbb..f60a44d 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java
@@ -20,5 +20,6 @@ package org.apache.servicecomb.saga.alpha.core;
public interface OmegaCallback {
void compensate(TxEvent event);
- default void disconnect() {}
+ default void disconnect() {
+ }
}
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index b519c1b..968e5b7 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -17,17 +17,17 @@
package org.apache.servicecomb.saga.alpha.core;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
+import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
+import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
-import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
-import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
-import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TxConsistentService {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -61,50 +61,6 @@ public class TxConsistentService {
return true;
}
-// private void compensate(TxEvent event) {
-// List<TxEvent> events = eventRepository.findTransactionsToCompensate(event.globalTxId());
-//
-// Optional<TxEvent> startedEvent = events.stream().filter(e -> e.containChildren(event)).findFirst();
-//
-// startedEvent.ifPresent(compensateEvent -> {
-// Integer[] retiesAndTimes = eventsToRetries.compute(event.parentTxId(), (k, v) ->
-// v == null ? new Integer[] {compensateEvent.retries(), 0} : new Integer[] {v[0], v[1] + 1});
-// List<TxEvent> compensationEvents =
-// retiesAndTimes[0] >= retiesAndTimes[1] ? events : Collections.singletonList(
-// new TxEvent(
-// event.serviceName(), event.instanceId(), event.creationTime(), event.globalTxId(),
-// event.localTxId(), event.parentTxId(), event.type(), event.retriesMethod(), event.payloads()
-// ));
-//
-// compensateImpl(event.globalTxId(), compensationEvents);
-// });
-// }
-//
-// private void compensateImpl(String globalTxId, List<TxEvent> events) {
-// events.removeIf(this::isCompensationScheduled);
-//
-// Set<String> localTxIds = eventsToCompensate.computeIfAbsent(globalTxId, k -> new HashSet<>());
-// events.forEach(e -> localTxIds.add(e.localTxId()));
-//
-// events.forEach(omegaCallback::compensate);
-// }
-
- // TODO: 2018/1/13 SagaEndedEvent may still not be the last, because some omegas may have slow network and its TxEndedEvent reached late,
- // unless we ask user to specify a name for each participant in the global TX in @Compensable
-// private void updateCompensateStatus(TxEvent event) {
-// Set<String> events = eventsToCompensate.get(event.globalTxId());
-// if (events != null) {
-// events.remove(event.localTxId());
-// if (events.isEmpty()) {
-// eventsToCompensate.remove(event.globalTxId());
-// Integer[] retiesAndTimes = eventsToRetries.get(event.parentTxId());
-// if (retiesAndTimes == null || retiesAndTimes[0] >= retiesAndTimes[1]) {
-// markGlobalTxEnd(event);
-// eventsToRetries.remove(event.parentTxId());
-// }
-// }
-// }
-// }
private boolean isGlobalTxAborted(TxEvent event) {
return !eventRepository.findTransactions(event.globalTxId(), TxAbortedEvent.name()).isEmpty();
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
index 086f88e..ed7b4f1 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
@@ -91,4 +91,18 @@ public class SpringCommandRepository implements CommandRepository {
return commands;
}
+
+ private long retriedTimes(String globalTxId, String retriesMethod, String localTxId) {
+ return commandRepository.findByGlobalTxIdAndStatus(globalTxId, DONE.name()).stream()
+ .filter(c -> Objects.equals(c.compensationMethod(), retriesMethod)
+ && Objects.equals(c.localTxId(), localTxId)).count();
+ }
+
+ private List<TxEvent> createRetriesTxEvent(long abortEventId, TxEvent txEvent) {
+ return Collections.singletonList(new TxEvent(
+ abortEventId, txEvent.serviceName(), txEvent.instanceId(), txEvent.creationTime(),
+ txEvent.globalTxId(), txEvent.localTxId(), txEvent.parentTxId(),
+ txEvent.type(), txEvent.retriesMethod(), txEvent.payloads()
+ ));
+ }
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelope.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelope.java
deleted file mode 100644
index 7d93462..0000000
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelope.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.servicecomb.saga.alpha.server;
-
-import java.util.Date;
-
-import javax.persistence.Embedded;
-import javax.persistence.Entity;
-import javax.persistence.GeneratedValue;
-import javax.persistence.Id;
-
-import org.apache.servicecomb.saga.alpha.core.TxEvent;
-
-@Entity class TxEventEnvelope {
- @Id
- @GeneratedValue
- private long surrogateId;
-
- @Embedded
- private TxEvent event;
-
- private TxEventEnvelope() {
- }
-
- TxEventEnvelope(TxEvent event) {
- this.event = event;
- }
-
- public TxEventEnvelope(
- String serviceName, String instanceId, String globalTxId,
- String localTxId, String parentTxId, String type, String compensationMethod,
- String retriesMethod, int retries, byte[] payloads) {
- this.event = new TxEvent(
- serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type,
- compensationMethod, retriesMethod, retries, payloads);
- }
-
- String serviceName() {
- return event.serviceName();
- }
-
- String instanceId() {
- return event.instanceId();
- }
-
- public long creationTime() {
- return event.creationTime().getTime();
- }
-
- String globalTxId() {
- return event.globalTxId();
- }
-
- String localTxId() {
- return event.localTxId();
- }
-
- String parentTxId() {
- return event.parentTxId();
- }
-
- String type() {
- return event.type();
- }
-
- String compensationMethod() {
- return event.compensationMethod();
- }
-
- byte[] payloads() {
- return event.payloads();
- }
-
- int retries() {
- return event.retries();
- }
-
- TxEvent event() {
- return event;
- }
-}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index 470caa5..3a7edb3 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -73,6 +73,7 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
+ " FROM TxEvent t2 "
+ " WHERE t2.globalTxId = ?1 "
+ " AND t2.localTxId = t.localTxId "
+ + " AND t2.compensationMethod != t.retriesMethod "
+ " AND t2.type = 'TxCompensatedEvent') "
+ "ORDER BY t.surrogateId ASC")
List<TxEvent> findStartedEventsWithMatchingEndedButNotCompensatedEvents(String globalTxId);
diff --git a/alpha/alpha-server/src/main/resources/schema-postgresql.sql b/alpha/alpha-server/src/main/resources/schema-postgresql.sql
index 41815ee..674e051 100644
--- a/alpha/alpha-server/src/main/resources/schema-postgresql.sql
+++ b/alpha/alpha-server/src/main/resources/schema-postgresql.sql
@@ -26,6 +26,8 @@ CREATE TABLE IF NOT EXISTS TxEvent (
type varchar(50) NOT NULL,
compensationMethod varchar(256) NOT NULL,
expiryTime timestamp(6) NOT NULL,
+ retriesMethod varchar(256) NOT NULL,
+ retries int NOT NULL,
payloads bytea
);
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 8b2672c..26aa17a 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -438,6 +438,55 @@ public class AlphaIntegrationTest {
return false;
}
+ @Test
+ public void retiesAndCompensateOnFailure() throws Exception {
+ asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+
+ String localTxId1 = UUID.randomUUID().toString();
+
+ blockingStub.onTxEvent(GrpcTxEvent.newBuilder()
+ .setServiceName(serviceName)
+ .setInstanceId(instanceId)
+ .setTimestamp(System.currentTimeMillis())
+ .setGlobalTxId(globalTxId)
+ .setLocalTxId(localTxId1)
+ .setParentTxId(parentTxId)
+ .setType(TxStartedEvent.name())
+ .setCompensationMethod("Compensation Method")
+ .setPayloads(ByteString.copyFrom(payload.getBytes()))
+ .setRetries(3).setRetriesMethod("Retries Method")
+ .build());
+ blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent, globalTxId, localTxId1));
+
+ await().atMost(3, SECONDS).until(() -> !eventRepo.findByGlobalTxId(globalTxId).isEmpty());
+
+ for (int i = 0; i < 3; i++) {
+ blockingStub.onTxEvent(
+ eventOf(TxAbortedEvent, localTxId, localTxId1, payload.getBytes(), getClass().getCanonicalName()));
+
+ await().atMost(3, SECONDS).until(() -> receivedCommands.size() == 1);
+
+ GrpcCompensateCommand command = receivedCommands.poll();
+ assertThat(command.getGlobalTxId(), is(globalTxId));
+ assertThat(command.getLocalTxId(), is(localTxId1));
+ assertThat(command.getParentTxId(), is(parentTxId));
+ assertThat(command.getCompensateMethod(), is("Retries Method"));
+ assertThat(command.getPayloads().toByteArray(), is(payload.getBytes()));
+ }
+
+ blockingStub.onTxEvent(
+ eventOf(TxAbortedEvent, localTxId, localTxId1, payload.getBytes(), getClass().getCanonicalName()));
+
+ await().atMost(3, SECONDS).until(() -> receivedCommands.size() == 1);
+
+ GrpcCompensateCommand command = receivedCommands.poll();
+ assertThat(command.getGlobalTxId(), is(globalTxId));
+ assertThat(command.getLocalTxId(), is(localTxId1));
+ assertThat(command.getParentTxId(), is(parentTxId));
+ assertThat(command.getCompensateMethod(), is("Compensation Method"));
+ assertThat(command.getPayloads().toByteArray(), is(payload.getBytes()));
+ }
+
private GrpcAck onCompensation(GrpcCompensateCommand command) {
return blockingStub.onTxEvent(
eventOf(TxCompensatedEvent,
diff --git a/alpha/alpha-server/src/test/resources/schema.sql b/alpha/alpha-server/src/test/resources/schema.sql
index a10a4e0..ca46625 100644
--- a/alpha/alpha-server/src/test/resources/schema.sql
+++ b/alpha/alpha-server/src/test/resources/schema.sql
@@ -26,6 +26,8 @@ CREATE TABLE IF NOT EXISTS TxEvent (
type varchar(50) NOT NULL,
compensationMethod varchar(256) NOT NULL,
expiryTime TIMESTAMP NOT NULL,
+ retriesMethod varchar(256) NOT NULL,
+ retries int NOT NULL,
payloads varbinary(10240)
);
diff --git a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java
index 8d9eb7e..48a67f7 100644
--- a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java
+++ b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java
@@ -29,19 +29,15 @@ import org.slf4j.LoggerFactory;
public class CompensationContext {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private final Map<String, TransactionContextInternal> contexts = new ConcurrentHashMap<>();
-
- public CompensationContext() {
- }
+ private final Map<String, CompensationContextInternal> contexts = new ConcurrentHashMap<>();
public void addCompensationContext(Method compensationMethod, Object target) {
compensationMethod.setAccessible(true);
- contexts.put(compensationMethod.toString(),
- new TransactionContextInternal(target, compensationMethod));
+ contexts.put(compensationMethod.toString(), new CompensationContextInternal(target, compensationMethod));
}
public void compensate(String globalTxId, String localTxId, String compensationMethod, Object... payloads) {
- TransactionContextInternal contextInternal = contexts.get(compensationMethod);
+ CompensationContextInternal contextInternal = contexts.get(compensationMethod);
try {
contextInternal.compensationMethod.invoke(contextInternal.target, payloads);
@@ -54,12 +50,11 @@ public class CompensationContext {
}
}
- private static final class TransactionContextInternal {
+ private static final class CompensationContextInternal {
private final Object target;
-
private final Method compensationMethod;
- private TransactionContextInternal(Object target, Method compensationMethod) {
+ private CompensationContextInternal(Object target, Method compensationMethod) {
this.target = target;
this.compensationMethod = compensationMethod;
}
diff --git a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java
index ac9f02c..daa8e7c 100644
--- a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java
+++ b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java
@@ -17,8 +17,6 @@
package org.apache.servicecomb.saga.omega.context;
-import java.util.Map;
-
public class OmegaContext {
public static final String GLOBAL_TX_ID_KEY = "X-Pack-Global-Transaction-Id";
public static final String LOCAL_TX_ID_KEY = "X-Pack-Local-Transaction-Id";
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
index d3e2bd1..988d8d7 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
@@ -21,9 +21,7 @@ import org.apache.servicecomb.saga.omega.context.OmegaContext;
import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
class CompensableInterceptor implements EventAwareInterceptor {
-
private final OmegaContext context;
-
private final MessageSender sender;
CompensableInterceptor(OmegaContext context, MessageSender sender) {
--
To stop receiving notification emails like this one, please contact
ningjiang@apache.org.