You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/08/20 01:30:50 UTC

[incubator-servicecomb-saga] branch SCB-853 created (now 249e005)

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a change to branch SCB-853
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git.


      at 249e005  SCB-853 Saga-core noew supports JDK7

This branch includes the following new commits:

     new 249e005  SCB-853 Saga-core noew supports JDK7

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-servicecomb-saga] 01/01: SCB-853 Saga-core noew supports JDK7

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch SCB-853
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 249e00574a2f10ec7b570a94d9432fcf9959be37
Author: Willem Jiang <ji...@huawei.com>
AuthorDate: Mon Aug 20 09:30:14 2018 +0800

    SCB-853 Saga-core noew supports JDK7
---
 saga-core/pom.xml                                  | 10 ++++
 .../servicecomb/saga/core/BackwardRecovery.java    |  5 ++
 .../apache/servicecomb/saga/core/Compensation.java | 32 +++++++++--
 .../saga/core/CompositeSagaResponse.java           | 31 ++++++++---
 .../apache/servicecomb/saga/core/Descriptive.java  |  4 +-
 .../org/apache/servicecomb/saga/core/Fallback.java | 17 +++++-
 .../servicecomb/saga/core/ForwardRecovery.java     |  5 ++
 .../servicecomb/saga/core/GraphBasedSaga.java      | 15 +++---
 .../saga/core/LoggingRecoveryPolicy.java           |  5 ++
 .../servicecomb/saga/core/NoOpSagaRequest.java     |  6 +++
 .../apache/servicecomb/saga/core/Operation.java    |  8 +--
 .../servicecomb/saga/core/RestOperation.java       | 14 +++--
 .../apache/servicecomb/saga/core/SagaContext.java  |  4 +-
 .../servicecomb/saga/core/SagaContextImpl.java     | 35 +++++++-----
 .../apache/servicecomb/saga/core/SagaEvent.java    |  5 ++
 .../apache/servicecomb/saga/core/SagaRequest.java  |  6 +--
 .../servicecomb/saga/core/SagaTaskFactory.java     | 10 ++--
 .../apache/servicecomb/saga/core/Transaction.java  | 18 +++++++
 .../servicecomb/saga/core/TransactionConsumer.java |  5 ++
 .../saga/core/dag/ByLevelTraveller.java            |  5 +-
 .../saga/core/dag/GraphBasedSagaFactory.java       |  3 +-
 .../servicecomb/saga/core/dag/GraphBuilder.java    |  9 ++--
 .../saga/core/dag/GraphCycleDetectorImpl.java      | 26 +++++----
 .../org/apache/servicecomb/saga/core/dag/Node.java |  4 +-
 .../servicecomb/saga/core/CompensationImpl.java    |  5 ++
 .../saga/core/CompositeSagaResponseTest.java       |  8 +++
 .../servicecomb/saga/core/ForwardRecoveryTest.java |  7 ++-
 .../servicecomb/saga/core/RestOperationTest.java   | 18 ++++---
 .../servicecomb/saga/core/RetrySagaLogTest.java    | 19 +++++--
 .../saga/core/SagaExecutionComponentTestBase.java  | 44 +++++++++++----
 .../servicecomb/saga/core/SagaIntegrationTest.java | 63 ++++++++++++++--------
 .../dag/DirectedAcyclicGraphTraversalTest.java     |  4 +-
 .../saga/core/dag/GraphBuilderTest.java            | 40 ++++++++------
 .../saga/core/dag/GraphCycleDetectorTest.java      |  3 +-
 .../servicecomb/saga/format/JacksonFallback.java   | 11 ++++
 35 files changed, 366 insertions(+), 138 deletions(-)

diff --git a/saga-core/pom.xml b/saga-core/pom.xml
index fd1b47c..e460b1a 100644
--- a/saga-core/pom.xml
+++ b/saga-core/pom.xml
@@ -95,6 +95,16 @@
           </execution>
         </executions>
       </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>3.1</version>
+        <configuration>
+          <encoding>UTF-8</encoding>
+          <source>1.7</source>
+          <target>1.7</target>
+        </configuration>
+      </plugin>
     </plugins>
   </build>
 
diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/BackwardRecovery.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/BackwardRecovery.java
index e46de2f..799dbbf 100644
--- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/BackwardRecovery.java
+++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/BackwardRecovery.java
@@ -46,4 +46,9 @@ public class BackwardRecovery implements RecoveryPolicy {
       throw e;
     }
   }
+
+  @Override
+  public String description() {
+    return getClass().getSimpleName();
+  }
 }
diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/Compensation.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/Compensation.java
index f642011..a6a352b 100644
--- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/Compensation.java
+++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/Compensation.java
@@ -20,14 +20,40 @@ package org.apache.servicecomb.saga.core;
 public interface Compensation extends Operation {
 
   Compensation SAGA_START_COMPENSATION = new Compensation() {
+    @Override
+    public SagaResponse send(String address) {
+      return SUCCESSFUL_SAGA_RESPONSE;
+    }
+
+    @Override
+    public SagaResponse send(String address, SagaResponse response) {
+      return send(address);
+    }
+
+    @Override
+    public int retries() {
+      return DEFAULT_RETRIES;
+    }
   };
 
   Compensation SAGA_END_COMPENSATION = new Compensation() {
+    @Override
+    public SagaResponse send(String address) {
+      return SUCCESSFUL_SAGA_RESPONSE;
+    }
+
+    @Override
+    public SagaResponse send(String address, SagaResponse response) {
+      return send(address);
+    }
+
+    @Override
+    public int retries() {
+      return DEFAULT_RETRIES;
+    }
   };
 
   int DEFAULT_RETRIES = 3;
 
-  default int retries() {
-    return DEFAULT_RETRIES;
-  }
+  int retries();
 }
diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/CompositeSagaResponse.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/CompositeSagaResponse.java
index 02ab48c..84753b9 100644
--- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/CompositeSagaResponse.java
+++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/CompositeSagaResponse.java
@@ -18,7 +18,6 @@
 package org.apache.servicecomb.saga.core;
 
 import java.util.Collection;
-import java.util.Optional;
 
 public class CompositeSagaResponse implements SagaResponse {
   private final Collection<SagaResponse> responses;
@@ -29,19 +28,35 @@ public class CompositeSagaResponse implements SagaResponse {
 
   @Override
   public boolean succeeded() {
-    return responses.stream().allMatch(SagaResponse::succeeded);
+    if (responses.size() > 0) {
+      boolean result = true;
+      for (SagaResponse response : responses) {
+        result = result && response.succeeded();
+      }
+      return result;
+    } else {
+      return false;
+    }
   }
 
   @Override
   public String body() {
-    Optional<String> reduce = responses.stream()
-        .map(SagaResponse::body)
-        .reduce((a, b) -> a + ", " + b)
-        .map(combined -> "[" + combined + "]");
-
-    return reduce.orElse("{}");
+    StringBuffer result = new StringBuffer();
+    if (responses.size() == 0) {
+      result.append("{}");
+    } else {
+      result.append("[");
+      for (SagaResponse response : responses) {
+        result.append(response.body());
+        result.append(", ");
+      }
+      result.delete(result.length()-2, result.length());
+      result.append("]");
+    }
+    return result.toString();
   }
 
+
   public Collection<SagaResponse> responses() {
     return responses;
   }
diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/Descriptive.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/Descriptive.java
index bd367af..e8b4f4e 100644
--- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/Descriptive.java
+++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/Descriptive.java
@@ -19,7 +19,5 @@ package org.apache.servicecomb.saga.core;
 
 interface Descriptive {
 
-  default String description() {
-    return getClass().getSimpleName();
-  }
+  String description();
 }
diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/Fallback.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/Fallback.java
index fca68a0..8c1068b 100644
--- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/Fallback.java
+++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/Fallback.java
@@ -19,7 +19,22 @@ package org.apache.servicecomb.saga.core;
 
 public interface Fallback extends Operation {
 
-  Fallback NOP_FALLBACK = () -> TYPE_NOP;
+  Fallback NOP_FALLBACK = new Fallback() {
+    @Override
+    public String type() {
+      return TYPE_NOP;
+    }
+
+    @Override
+    public SagaResponse send(String address) {
+      return SUCCESSFUL_SAGA_RESPONSE;
+    }
+
+    @Override
+    public SagaResponse send(String address, SagaResponse response) {
+      return send(address);
+    }
+  };
 
   String type();
 }
diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/ForwardRecovery.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/ForwardRecovery.java
index c783990..75e16ee 100644
--- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/ForwardRecovery.java
+++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/ForwardRecovery.java
@@ -50,4 +50,9 @@ public class ForwardRecovery implements RecoveryPolicy {
       throw new TransactionFailedException(ignored);
     }
   }
+
+  @Override
+  public String description() {
+    return getClass().getSimpleName();
+  }
 }
diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/GraphBasedSaga.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/GraphBasedSaga.java
index f4544e3..69347fc 100644
--- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/GraphBasedSaga.java
+++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/GraphBasedSaga.java
@@ -65,15 +65,15 @@ public class GraphBasedSaga implements Saga {
     this.tasks = tasks;
 
     this.transactionTaskRunner = new TaskRunner(
-        traveller(sagaTaskGraph, new FromRootTraversalDirection<>()),
+        traveller(sagaTaskGraph, new FromRootTraversalDirection<SagaRequest>()),
         new TransactionTaskConsumer(
             tasks,
             sagaContext,
-            new ExecutorCompletionService<>(executor)));
+            new ExecutorCompletionService<Operation>(executor)));
 
     this.sagaContext = sagaContext;
     this.compensationTaskRunner = new TaskRunner(
-        traveller(sagaTaskGraph, new FromLeafTraversalDirection<>()),
+        traveller(sagaTaskGraph, new FromLeafTraversalDirection<SagaRequest>()),
         new CompensationTaskConsumer(tasks, sagaContext));
 
     currentTaskRunner = transactionTaskRunner;
@@ -92,9 +92,12 @@ public class GraphBasedSaga implements Saga {
         log.error("Failed to run operation", e);
         currentTaskRunner = compensationTaskRunner;
 
-        sagaContext.handleHangingTransactions(request -> {
-          tasks.get(request.task()).commit(request, sagaContext.responseOf(request.parents()));
-          tasks.get(request.task()).compensate(request);
+        sagaContext.handleHangingTransactions(new TransactionConsumer<SagaRequest>() {
+          @Override
+          public void accept(SagaRequest request) {
+            tasks.get(request.task()).commit(request, sagaContext.responseOf(request.parents()));
+            tasks.get(request.task()).compensate(request);
+          }
         });
       }
     } while (currentTaskRunner.hasNext());
diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/LoggingRecoveryPolicy.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/LoggingRecoveryPolicy.java
index 9173eca..593aa91 100644
--- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/LoggingRecoveryPolicy.java
+++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/LoggingRecoveryPolicy.java
@@ -43,4 +43,9 @@ public class LoggingRecoveryPolicy implements RecoveryPolicy {
     log.info("Completed request id={} for service {}", request.id(), request.serviceName());
     return response;
   }
+
+  @Override
+  public String description() {
+    return getClass().getSimpleName();
+  }
 }
diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/NoOpSagaRequest.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/NoOpSagaRequest.java
index 56ad05e..93bbda9 100644
--- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/NoOpSagaRequest.java
+++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/NoOpSagaRequest.java
@@ -19,6 +19,7 @@ package org.apache.servicecomb.saga.core;
 
 import static org.apache.servicecomb.saga.core.Compensation.SAGA_END_COMPENSATION;
 import static org.apache.servicecomb.saga.core.Compensation.SAGA_START_COMPENSATION;
+import static org.apache.servicecomb.saga.core.Fallback.NOP_FALLBACK;
 import static org.apache.servicecomb.saga.core.Operation.TYPE_NOP;
 import static org.apache.servicecomb.saga.core.SagaTask.SAGA_END_TASK;
 import static org.apache.servicecomb.saga.core.SagaTask.SAGA_START_TASK;
@@ -65,6 +66,11 @@ public class NoOpSagaRequest implements SagaRequest {
   }
 
   @Override
+  public Fallback fallback() {
+    return NOP_FALLBACK;
+  }
+
+  @Override
   public String serviceName() {
     return "Saga";
   }
diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/Operation.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/Operation.java
index 13b6f8a..99e760b 100644
--- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/Operation.java
+++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/Operation.java
@@ -23,11 +23,7 @@ public interface Operation {
   String TYPE_REST = "rest";
   SagaResponse SUCCESSFUL_SAGA_RESPONSE = new SuccessfulSagaResponse("success");
 
-  default SagaResponse send(String address) {
-    return SUCCESSFUL_SAGA_RESPONSE;
-  }
+  SagaResponse send(String address);
 
-  default SagaResponse send(String address, SagaResponse response) {
-    return send(address);
-  }
+  SagaResponse send(String address, SagaResponse response);
 }
diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/RestOperation.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/RestOperation.java
index 5cce22d..153eb98 100644
--- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/RestOperation.java
+++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/RestOperation.java
@@ -17,8 +17,6 @@
 
 package org.apache.servicecomb.saga.core;
 
-import static java.util.Collections.emptyMap;
-
 import org.apache.servicecomb.saga.core.application.interpreter.RestRequestChecker;
 import java.util.Map;
 
@@ -33,7 +31,7 @@ public class RestOperation implements Operation {
 
     this.path = path;
     this.method = method;
-    this.params = params == null? emptyMap() : params;
+    this.params = params == null? java.util.Collections.<String, Map<String, String>>emptyMap() : params;
   }
 
   public String path() {
@@ -56,4 +54,14 @@ public class RestOperation implements Operation {
         ", params=" + params +
         '}';
   }
+
+  @Override
+  public SagaResponse send(String address) {
+    return SUCCESSFUL_SAGA_RESPONSE;
+  }
+
+  @Override
+  public SagaResponse send(String address, SagaResponse response) {
+    return send(address);
+  }
 }
diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaContext.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaContext.java
index 309be44..2f76031 100644
--- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaContext.java
+++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaContext.java
@@ -17,8 +17,6 @@
 
 package org.apache.servicecomb.saga.core;
 
-import java.util.function.Consumer;
-
 public interface SagaContext extends EventContext {
   boolean isCompensationStarted();
 
@@ -26,7 +24,7 @@ public interface SagaContext extends EventContext {
 
   boolean isCompensationCompleted(SagaRequest request);
 
-  void handleHangingTransactions(Consumer<SagaRequest> consumer);
+  void handleHangingTransactions(TransactionConsumer<SagaRequest> consumer);
 
   SagaResponse responseOf(String requestId);
 
diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaContextImpl.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaContextImpl.java
index 610ae27..bc4253c 100644
--- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaContextImpl.java
+++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaContextImpl.java
@@ -17,16 +17,16 @@
 
 package org.apache.servicecomb.saga.core;
 
-import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
+
 
 import org.apache.servicecomb.saga.core.application.interpreter.FromJsonFormat;
 
@@ -86,7 +86,7 @@ public class SagaContextImpl implements SagaContext {
   }
 
   @Override
-  public void handleHangingTransactions(Consumer<SagaRequest> consumer)  {
+  public void handleHangingTransactions(TransactionConsumer<SagaRequest> consumer)  {
     for (Iterator<SagaRequest> iterator = hangingTransactions.values().iterator(); iterator.hasNext(); ) {
       consumer.accept(iterator.next());
     }
@@ -94,13 +94,19 @@ public class SagaContextImpl implements SagaContext {
 
   @Override
   public SagaResponse responseOf(String requestId) {
-    return completedTransactions.getOrDefault(requestId, SagaResponse.NONE_RESPONSE);
+    SagaResponse response = completedTransactions.get(requestId);
+    if (response == null) {
+      response = SagaResponse.NONE_RESPONSE;
+    }
+    return response;
   }
 
   private List<SagaResponse> responsesOf(String[] parentRequestIds) {
-    return Arrays.stream(parentRequestIds)
-        .map(this::responseOf)
-        .collect(Collectors.toList());
+    List<SagaResponse> result = new ArrayList<>();
+    for(String parentRequestId: parentRequestIds) {
+      result.add(responseOf(parentRequestId));
+    }
+    return result;
   }
 
   @Override
@@ -125,9 +131,14 @@ public class SagaContextImpl implements SagaContext {
   }
 
   private Set<String> chosenChildrenOf(String[] parentRequestIds) {
-    return Arrays.stream(parentRequestIds)
-        .map(this::responseOf)
-        .flatMap(sagaResponse -> childrenExtractor.fromJson(sagaResponse.body()).stream())
-        .collect(Collectors.toSet());
+    Set<String> result = new HashSet<>();
+    for(String parentRequestId: parentRequestIds) {
+      SagaResponse response = responseOf(parentRequestId);
+      Set<String> jsons = childrenExtractor.fromJson(response.body());
+      for (String json : jsons) {
+        result.add(json);
+      }
+    }
+    return result;
   }
 }
diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaEvent.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaEvent.java
index ceb0ebe..ffe31d0 100644
--- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaEvent.java
+++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaEvent.java
@@ -36,4 +36,9 @@ public abstract class SagaEvent implements Descriptive {
   public String json(ToJsonFormat toJsonFormat) {
     return "{}";
   }
+
+  @Override
+  public String description() {
+    return getClass().getSimpleName();
+  }
 }
diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaRequest.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaRequest.java
index 337abdd..4578e42 100644
--- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaRequest.java
+++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaRequest.java
@@ -17,8 +17,6 @@
 
 package org.apache.servicecomb.saga.core;
 
-import static org.apache.servicecomb.saga.core.Fallback.NOP_FALLBACK;
-
 public interface SagaRequest {
 
   String PARAM_FORM = "form";
@@ -30,9 +28,7 @@ public interface SagaRequest {
 
   Compensation compensation();
 
-  default Fallback fallback() {
-    return NOP_FALLBACK;
-  }
+  Fallback fallback();
 
   String serviceName();
 
diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaTaskFactory.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaTaskFactory.java
index 0fd2450..b6d9585 100644
--- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaTaskFactory.java
+++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/SagaTaskFactory.java
@@ -36,12 +36,12 @@ public class SagaTaskFactory {
     retrySagaLog = new RetrySagaLog(persistentStore, retryDelay);
   }
 
-  public Map<String, SagaTask> sagaTasks(String sagaId,
-      String requestJson,
-      RecoveryPolicy recoveryPolicy,
-      EventStore sagaLog) {
+  public Map<String, SagaTask> sagaTasks(final String sagaId,
+      final String requestJson,
+      final RecoveryPolicy recoveryPolicy,
+      final EventStore sagaLog) {
 
-    SagaLog compositeSagaLog = compositeSagaLog(sagaLog, persistentStore);
+    final SagaLog compositeSagaLog = compositeSagaLog(sagaLog, persistentStore);
 
     return new HashMap<String, SagaTask>() {{
       put(SagaTask.SAGA_START_TASK, new SagaStartTask(sagaId, requestJson, compositeSagaLog));
diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/Transaction.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/Transaction.java
index fafb845..389f8fa 100644
--- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/Transaction.java
+++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/Transaction.java
@@ -20,8 +20,26 @@ package org.apache.servicecomb.saga.core;
 public interface Transaction extends Operation {
 
   Transaction SAGA_START_TRANSACTION = new Transaction() {
+    @Override
+    public SagaResponse send(String address) {
+      return SUCCESSFUL_SAGA_RESPONSE;
+    }
+
+    @Override
+    public SagaResponse send(String address, SagaResponse response) {
+      return send(address);
+    }
   };
 
   Transaction SAGA_END_TRANSACTION = new Transaction() {
+    @Override
+    public SagaResponse send(String address) {
+      return SUCCESSFUL_SAGA_RESPONSE;
+    }
+
+    @Override
+    public SagaResponse send(String address, SagaResponse response) {
+      return send(address);
+    }
   };
 }
diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/TransactionConsumer.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/TransactionConsumer.java
new file mode 100644
index 0000000..b5bb098
--- /dev/null
+++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/TransactionConsumer.java
@@ -0,0 +1,5 @@
+package org.apache.servicecomb.saga.core;
+
+public interface TransactionConsumer<T> {
+  void accept(T request);
+}
diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/ByLevelTraveller.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/ByLevelTraveller.java
index 22bc874..2489117 100644
--- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/ByLevelTraveller.java
+++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/ByLevelTraveller.java
@@ -59,7 +59,10 @@ public class ByLevelTraveller<T> implements Traveller<T> {
       nodes.add(node);
 
       for (Node<T> child : traversalDirection.children(node)) {
-        nodeParents.computeIfAbsent(child.id(), id -> new HashSet<>(traversalDirection.parents(child)));
+        // This is not thread safe
+        if (nodeParents.get(child.id()) == null) {
+          nodeParents.put(child.id(), new HashSet<>(traversalDirection.parents(child)));
+        }
         nodeParents.get(child.id()).remove(node);
 
         if (nodeParents.get(child.id()).isEmpty()) {
diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphBasedSagaFactory.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphBasedSagaFactory.java
index 2048383..55c71c1 100644
--- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphBasedSagaFactory.java
+++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphBasedSagaFactory.java
@@ -27,6 +27,7 @@ import org.apache.servicecomb.saga.core.GraphBasedSaga;
 import org.apache.servicecomb.saga.core.Saga;
 import org.apache.servicecomb.saga.core.SagaContext;
 import org.apache.servicecomb.saga.core.SagaContextImpl;
+import org.apache.servicecomb.saga.core.SagaRequest;
 import org.apache.servicecomb.saga.core.application.SagaFactory;
 import org.apache.servicecomb.saga.infrastructure.ContextAwareEventStore;
 import org.apache.servicecomb.saga.core.PersistentStore;
@@ -49,7 +50,7 @@ public class GraphBasedSagaFactory implements SagaFactory {
     this.childrenExtractor = childrenExtractor;
     this.executorService = executorService;
     this.sagaTaskFactory = new SagaTaskFactory(retryDelay, persistentStore);
-    this.graphBuilder = new GraphBuilder(new GraphCycleDetectorImpl<>());
+    this.graphBuilder = new GraphBuilder(new GraphCycleDetectorImpl<SagaRequest>());
   }
 
   @Override
diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphBuilder.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphBuilder.java
index 578cbb9..c5cd850 100644
--- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphBuilder.java
+++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphBuilder.java
@@ -63,10 +63,11 @@ public class GraphBuilder {
       }
     }
 
-    requestNodes.values().stream()
-        .filter((node) -> node.children().isEmpty())
-        .forEach(node -> node.addChild(leaf));
-
+    for(Node<SagaRequest> node : requestNodes.values()) {
+      if (node.children().isEmpty()) {
+        node.addChild(leaf);
+      }
+    }
     return new SingleLeafDirectedAcyclicGraph<>(root, leaf);
   }
 
diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphCycleDetectorImpl.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphCycleDetectorImpl.java
index 6011a98..f2869b3 100644
--- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphCycleDetectorImpl.java
+++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphCycleDetectorImpl.java
@@ -45,26 +45,32 @@ public class GraphCycleDetectorImpl<T> implements GraphCycleDetector<T> {
     return unvisitedNodes(nodeParents);
   }
 
+  // This method is not thread safe
   private void traverse(Queue<Node<T>> orphanNodes, Map<Node<T>, Set<Node<T>>> nodeParents) {
     while (!orphanNodes.isEmpty()) {
       Node<T> node = orphanNodes.poll();
 
-      node.children().forEach(child -> {
-        nodeParents.computeIfAbsent(child, n -> new HashSet<>(child.parents()))
-            .remove(node);
-
+      for(Node<T> child : node.children()) {
+        Set<Node<T>> parent = nodeParents.get(child);
+        if (parent == null) {
+          parent = new HashSet<>(child.parents());
+          nodeParents.put(child, parent);
+        }
+        parent.remove(node);
         if (nodeParents.get(child).isEmpty()) {
           orphanNodes.add(child);
         }
-      });
+      }
     }
   }
 
   private Set<Node<T>> unvisitedNodes(Map<Node<T>, Set<Node<T>>> nodeParents) {
-    return nodeParents.entrySet()
-        .parallelStream()
-        .filter(parents -> !parents.getValue().isEmpty())
-        .map(Entry::getKey)
-        .collect(Collectors.toSet());
+    Set<Node<T>> result = new HashSet<>();
+    for (Map.Entry<Node<T>, Set<Node<T>>> entry : nodeParents.entrySet()) {
+      if (!entry.getValue().isEmpty()) {
+        result.add(entry.getKey());
+      }
+    }
+    return result;
   }
 }
diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/Node.java b/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/Node.java
index 0b3be0a..3e5bcad 100644
--- a/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/Node.java
+++ b/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/Node.java
@@ -56,7 +56,9 @@ public class Node<T> {
 
   public void addChildren(Collection<Node<T>> nodes) {
     children.addAll(nodes);
-    nodes.forEach(node -> node.parents.add(this));
+    for (Node<T> node : nodes) {
+      node.parents.add(this);
+    }
   }
 
   @Override
diff --git a/saga-core/src/test/java/org/apache/servicecomb/saga/core/CompensationImpl.java b/saga-core/src/test/java/org/apache/servicecomb/saga/core/CompensationImpl.java
index c974bc4..0077802 100644
--- a/saga-core/src/test/java/org/apache/servicecomb/saga/core/CompensationImpl.java
+++ b/saga-core/src/test/java/org/apache/servicecomb/saga/core/CompensationImpl.java
@@ -24,4 +24,9 @@ public class CompensationImpl extends RestOperation implements Compensation {
   public CompensationImpl(String path, String method, Map<String, Map<String, String>> params) {
     super(path, method, params);
   }
+
+  @Override
+  public int retries() {
+    return DEFAULT_RETRIES;
+  }
 }
diff --git a/saga-core/src/test/java/org/apache/servicecomb/saga/core/CompositeSagaResponseTest.java b/saga-core/src/test/java/org/apache/servicecomb/saga/core/CompositeSagaResponseTest.java
index be4b9f4..449340c 100644
--- a/saga-core/src/test/java/org/apache/servicecomb/saga/core/CompositeSagaResponseTest.java
+++ b/saga-core/src/test/java/org/apache/servicecomb/saga/core/CompositeSagaResponseTest.java
@@ -22,6 +22,8 @@ import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.when;
 
+import java.util.ArrayList;
+
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -31,6 +33,7 @@ public class CompositeSagaResponseTest {
   private final SagaResponse response2 = Mockito.mock(SagaResponse.class);
 
   private final SagaResponse compositeSagaResponse = new CompositeSagaResponse(asList(response1, response2));
+  private final SagaResponse emptySagaResponse = new CompositeSagaResponse(new ArrayList<SagaResponse>());
 
   @Test
   public void succeededOnlyWhenAllAreSuccessful() throws Exception {
@@ -70,4 +73,9 @@ public class CompositeSagaResponseTest {
         + "}\n"
         + "]"));
   }
+
+  @Test
+  public void EmptyCompositeSagaResponse() {
+    assertThat(emptySagaResponse.body(), is("{}"));
+  }
 }
diff --git a/saga-core/src/test/java/org/apache/servicecomb/saga/core/ForwardRecoveryTest.java b/saga-core/src/test/java/org/apache/servicecomb/saga/core/ForwardRecoveryTest.java
index a6e1dca..078f996 100644
--- a/saga-core/src/test/java/org/apache/servicecomb/saga/core/ForwardRecoveryTest.java
+++ b/saga-core/src/test/java/org/apache/servicecomb/saga/core/ForwardRecoveryTest.java
@@ -49,7 +49,12 @@ public class ForwardRecoveryTest {
   public void blowsUpWhenTaskIsNotCommittedWithFailRetryDelaySeconds() throws Exception {
     doThrow(Exception.class).when(transaction).send(serviceName, parentResponse);
 
-    Thread t = new Thread(() -> forwardRecovery.apply(sagaTask, sagaRequest, parentResponse));
+    Thread t = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        forwardRecovery.apply(sagaTask, sagaRequest, parentResponse);
+      }
+    });
     t.start();
     Thread.sleep(400);
     t.interrupt();
diff --git a/saga-core/src/test/java/org/apache/servicecomb/saga/core/RestOperationTest.java b/saga-core/src/test/java/org/apache/servicecomb/saga/core/RestOperationTest.java
index 7f1990d..900bb8b 100644
--- a/saga-core/src/test/java/org/apache/servicecomb/saga/core/RestOperationTest.java
+++ b/saga-core/src/test/java/org/apache/servicecomb/saga/core/RestOperationTest.java
@@ -23,14 +23,20 @@ import static java.util.Collections.singletonMap;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 
+import java.util.Collections;
+import java.util.Map;
+
 import org.junit.Test;
 
 public class RestOperationTest {
 
+  public static final Map<String, Map<String, String>> EMPTY_MAP_MAP = Collections.<String, Map<String, String>>emptyMap();
+  public static final Map<String, String> EMPTY_MAP =  Collections.<String, String>emptyMap();
+
   @Test
   public void blowsUpWhenGetMethodWithForm() {
     try {
-      new RestOperation("blah", "GET", singletonMap("form", emptyMap()));
+      new RestOperation("blah", "GET", singletonMap("form", EMPTY_MAP));
       expectFailing(IllegalArgumentException.class);
     } catch (IllegalArgumentException e) {
       assertThat(e.getMessage(), is("GET & DELETE request cannot enclose a body"));
@@ -40,7 +46,7 @@ public class RestOperationTest {
   @Test
   public void blowsUpWhenGetMethodWithJson() {
     try {
-      new RestOperation("blah", "GET", singletonMap("json", emptyMap()));
+      new RestOperation("blah", "GET", singletonMap("json", EMPTY_MAP));
       expectFailing(IllegalArgumentException.class);
     } catch (IllegalArgumentException e) {
       assertThat(e.getMessage(), is("GET & DELETE request cannot enclose a body"));
@@ -50,7 +56,7 @@ public class RestOperationTest {
   @Test
   public void blowsUpWhenDeleteMethodWithForm() {
     try {
-      new RestOperation("blah", "DELETE", singletonMap("form", emptyMap()));
+      new RestOperation("blah", "DELETE", singletonMap("form", EMPTY_MAP));
       expectFailing(IllegalArgumentException.class);
     } catch (IllegalArgumentException e) {
       assertThat(e.getMessage(), is("GET & DELETE request cannot enclose a body"));
@@ -60,7 +66,7 @@ public class RestOperationTest {
   @Test
   public void blowsUpWhenDeleteMethodWithJson() {
     try {
-      new RestOperation("blah", "DELETE", singletonMap("json", emptyMap()));
+      new RestOperation("blah", "DELETE", singletonMap("json", EMPTY_MAP));
       expectFailing(IllegalArgumentException.class);
     } catch (IllegalArgumentException e) {
       assertThat(e.getMessage(), is("GET & DELETE request cannot enclose a body"));
@@ -70,7 +76,7 @@ public class RestOperationTest {
   @Test
   public void blowsUpWhenMethodIsNotSupported() {
     try {
-      new RestOperation("blah", "foo", emptyMap());
+      new RestOperation("blah", "foo", EMPTY_MAP_MAP );
       expectFailing(IllegalArgumentException.class);
     } catch (IllegalArgumentException e) {
       assertThat(e.getMessage(), is("Unsupported method foo"));
@@ -80,7 +86,7 @@ public class RestOperationTest {
   @Test
   public void blowsUpWhenMethodIsNull() {
     try {
-      new RestOperation("blah", null, emptyMap());
+      new RestOperation("blah", null, EMPTY_MAP_MAP);
       expectFailing(IllegalArgumentException.class);
     } catch (IllegalArgumentException e) {
       assertThat(e.getMessage(), is("Unsupported method null"));
diff --git a/saga-core/src/test/java/org/apache/servicecomb/saga/core/RetrySagaLogTest.java b/saga-core/src/test/java/org/apache/servicecomb/saga/core/RetrySagaLogTest.java
index 6da8f16..9f9bdc8 100644
--- a/saga-core/src/test/java/org/apache/servicecomb/saga/core/RetrySagaLogTest.java
+++ b/saga-core/src/test/java/org/apache/servicecomb/saga/core/RetrySagaLogTest.java
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -63,18 +64,26 @@ public class RetrySagaLogTest {
   public void exitOnInterruption() throws InterruptedException {
     ExecutorService executor = Executors.newSingleThreadExecutor();
 
-    Future<?> future = executor.submit(() -> {
-      doThrow(RuntimeException.class).when(persistentStore).offer(dummyEvent);
+    Future<?> future = executor.submit(new Runnable() {
+      @Override
+      public void run() {
+        doThrow(RuntimeException.class).when(persistentStore).offer(dummyEvent);
 
-      retrySagaLog.offer(dummyEvent);
-      interrupted = true;
+        retrySagaLog.offer(dummyEvent);
+        interrupted = true;
+      }
     });
 
     Thread.sleep(500);
 
     assertThat(future.cancel(true), is(true));
 
-    await().atMost(2, TimeUnit.SECONDS).until(() -> interrupted);
+    await().atMost(2, TimeUnit.SECONDS).until(new Callable<Boolean>() {
+      @Override
+      public Boolean call() {
+        return interrupted;
+      }
+    });
     executor.shutdown();
   }
 }
diff --git a/saga-core/src/test/java/org/apache/servicecomb/saga/core/SagaExecutionComponentTestBase.java b/saga-core/src/test/java/org/apache/servicecomb/saga/core/SagaExecutionComponentTestBase.java
index b9e893f..d4f88c4 100644
--- a/saga-core/src/test/java/org/apache/servicecomb/saga/core/SagaExecutionComponentTestBase.java
+++ b/saga-core/src/test/java/org/apache/servicecomb/saga/core/SagaExecutionComponentTestBase.java
@@ -18,7 +18,6 @@
 package org.apache.servicecomb.saga.core;
 
 import static org.apache.servicecomb.saga.core.Operation.TYPE_REST;
-import static java.util.Collections.emptyMap;
 import static java.util.Collections.singletonList;
 import static java.util.Collections.singletonMap;
 import static java.util.concurrent.TimeUnit.SECONDS;
@@ -30,9 +29,12 @@ import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.servicecomb.saga.core.application.SagaFactory;
 import org.hamcrest.Description;
@@ -48,6 +50,7 @@ import org.apache.servicecomb.saga.infrastructure.EmbeddedEventStore;
 
 @SuppressWarnings("unchecked")
 public abstract class SagaExecutionComponentTestBase {
+
   private static final String requestJson = "[\n"
       + "  {\n"
       + "    \"id\": \"request-1\",\n"
@@ -88,20 +91,22 @@ public abstract class SagaExecutionComponentTestBase {
       + "  \"requests\": " + anotherRequestJson + "\n"
       + "}";
 
+  public static final Map<String, Map<String, String>> EMPTY_MAP = Collections.<String, Map<String, String>>emptyMap();
+
   private final SagaRequest request1 = new SagaRequestImpl(
       "request-1",
       "aaa",
       TYPE_REST,
-      new TransactionImpl("/rest/as", "post", emptyMap()),
-      new CompensationImpl("/rest/as", "delete", emptyMap())
+      new TransactionImpl("/rest/as", "post", EMPTY_MAP),
+      new CompensationImpl("/rest/as", "delete", EMPTY_MAP)
   );
 
   private final SagaRequest request2 = new SagaRequestImpl(
       "request-2",
       "bbb",
       TYPE_REST,
-      new TransactionImpl("/rest/bs", "post", emptyMap()),
-      new CompensationImpl("/rest/bs", "delete", emptyMap())
+      new TransactionImpl("/rest/bs", "post", EMPTY_MAP),
+      new CompensationImpl("/rest/bs", "delete", EMPTY_MAP)
   );
 
   private final SagaDefinition definition1 = new SagaDefinition() {
@@ -174,10 +179,27 @@ public abstract class SagaExecutionComponentTestBase {
 
   @Test
   public void processRequestsInParallel() {
-    CompletableFuture.runAsync(() -> coordinator.run(sagaJson));
-    CompletableFuture.runAsync(() -> coordinator.run(anotherSagaJson));
+    ExecutorService executor = Executors.newFixedThreadPool(2);
 
-    waitAtMost(2, SECONDS).until(() -> eventStore.size() == 8);
+    executor.submit(new Runnable() {
+      @Override
+      public void run() {
+        coordinator.run(sagaJson);
+      }
+    });
+    executor.submit(new Runnable() {
+      @Override
+      public void run() {
+        coordinator.run(anotherSagaJson);
+      }
+    });
+
+    waitAtMost(2, SECONDS).until(new Callable<Boolean>() {
+      @Override
+      public Boolean call() throws Exception {
+        return eventStore.size() == 8;
+      }
+    });
 
     assertThat(eventStore, containsInAnyOrder(
         eventWith(NoOpSagaRequest.SAGA_START_REQUEST, SagaStartedEvent.class),
@@ -211,8 +233,8 @@ public abstract class SagaExecutionComponentTestBase {
   }
 
   private Matcher<SagaEvent> eventWith(
-      SagaRequest sagaRequest,
-      Class<?> type) {
+      final SagaRequest sagaRequest,
+      final Class<?> type) {
 
     return new TypeSafeMatcher<SagaEvent>() {
       @Override
diff --git a/saga-core/src/test/java/org/apache/servicecomb/saga/core/SagaIntegrationTest.java b/saga-core/src/test/java/org/apache/servicecomb/saga/core/SagaIntegrationTest.java
index d005ee7..19f9802 100644
--- a/saga-core/src/test/java/org/apache/servicecomb/saga/core/SagaIntegrationTest.java
+++ b/saga-core/src/test/java/org/apache/servicecomb/saga/core/SagaIntegrationTest.java
@@ -44,6 +44,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -54,6 +55,7 @@ import java.util.concurrent.CyclicBarrier;
 
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import com.seanyinx.github.unit.scaffolding.Randomness;
@@ -116,7 +118,7 @@ public class SagaIntegrationTest {
   // root - node1 - node2 - leaf
   @Before
   public void setUp() throws Exception {
-    when(childrenExtractor.fromJson(anyString())).thenReturn(emptySet());
+    when(childrenExtractor.fromJson(anyString())).thenReturn(Collections.<String>emptySet());
     when(childrenExtractor.fromJson(NONE_RESPONSE.body())).thenReturn(setOf("none"));
 
     when(transaction1.send(request1.serviceName(), EMPTY_RESPONSE)).thenReturn(transactionResponse1);
@@ -169,20 +171,26 @@ public class SagaIntegrationTest {
     addExtraChildToNode1();
 
     // barrier to make sure the two transactions starts at the same time
-    CyclicBarrier barrier = new CyclicBarrier(2);
+    final CyclicBarrier barrier = new CyclicBarrier(2);
     when(transaction2.send(request2.serviceName(), transactionResponse1))
         .thenAnswer(
-            withAnswer(() -> {
-              barrier.await();
-              Thread.sleep(100);
-              throw exception;
+            withAnswer(new Callable<SagaResponse>() {
+              @Override
+              public SagaResponse call() throws Exception {
+                barrier.await();
+                Thread.sleep(100);
+                throw exception;
+              }
             }));
 
     when(transaction3.send(request3.serviceName(), transactionResponse1))
         .thenAnswer(
-            withAnswer(() -> {
-              barrier.await();
-              return transactionResponse3;
+            withAnswer(new Callable<SagaResponse>() {
+              @Override
+              public SagaResponse call() throws Exception {
+                barrier.await();
+                return transactionResponse3;
+              }
             }));
 
     saga.run();
@@ -305,21 +313,27 @@ public class SagaIntegrationTest {
     addExtraChildToNode1();
 
     // barrier to make sure the two transactions starts at the same time
-    CyclicBarrier barrier = new CyclicBarrier(2);
+    final CyclicBarrier barrier = new CyclicBarrier(2);
     when(transaction3.send(request3.serviceName(), transactionResponse1))
-        .thenAnswer(withAnswer(() -> {
-      barrier.await();
-      throw exception;
-    }));
+        .thenAnswer(withAnswer(new Callable<SagaResponse>() {
+          @Override
+          public SagaResponse call() throws Exception {
+            barrier.await();
+            throw exception;
+          }
+        }));
 
-    CountDownLatch latch = new CountDownLatch(1);
+    final CountDownLatch latch = new CountDownLatch(1);
 
     when(transaction2.send(request2.serviceName(), transactionResponse1))
-        .thenAnswer(withAnswer(() -> {
-      barrier.await();
-      latch.await();
-      return transactionResponse2;
-    })).thenReturn(transactionResponse2);
+        .thenAnswer(withAnswer(new Callable<SagaResponse>() {
+          @Override
+          public SagaResponse call() throws Exception {
+            barrier.await();
+            latch.await();
+            return transactionResponse2;
+          }
+        })).thenReturn(transactionResponse2);
 
     saga.run();
 
@@ -646,8 +660,13 @@ public class SagaIntegrationTest {
     verify(compensation2, never()).send(request2.serviceName());
   }
 
-  private Answer<SagaResponse> withAnswer(Callable<SagaResponse> callable) {
-    return invocationOnMock -> callable.call();
+  private Answer<SagaResponse> withAnswer(final Callable<SagaResponse> callable) {
+    return new Answer<SagaResponse>() {
+      @Override
+      public SagaResponse answer(InvocationOnMock invocation) throws Throwable {
+        return callable.call();
+      }
+    };
   }
 
   private EventEnvelope envelope(SagaEvent event) {
diff --git a/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/DirectedAcyclicGraphTraversalTest.java b/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/DirectedAcyclicGraphTraversalTest.java
index beb9607..3c54fcf 100644
--- a/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/DirectedAcyclicGraphTraversalTest.java
+++ b/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/DirectedAcyclicGraphTraversalTest.java
@@ -58,7 +58,7 @@ public class DirectedAcyclicGraphTraversalTest {
 
   @Test
   public void traverseGraphOneLevelPerStepFromRoot() {
-    Traveller<String> traveller = new ByLevelTraveller<>(dag, new FromRootTraversalDirection<>());
+    Traveller<String> traveller = new ByLevelTraveller<>(dag, new FromRootTraversalDirection<String>());
 
     Collection<Node<String>> nodes = traveller.nodes();
 
@@ -80,7 +80,7 @@ public class DirectedAcyclicGraphTraversalTest {
 
   @Test
   public void traverseGraphOneLevelPerStepFromLeaf() {
-    Traveller<String> traveller = new ByLevelTraveller<>(dag, new FromLeafTraversalDirection<>());
+    Traveller<String> traveller = new ByLevelTraveller<>(dag, new FromLeafTraversalDirection<String>());
 
     Collection<Node<String>> nodes = traveller.nodes();
 
diff --git a/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/GraphBuilderTest.java b/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/GraphBuilderTest.java
index 8703c7a..9a6c0f8 100644
--- a/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/GraphBuilderTest.java
+++ b/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/GraphBuilderTest.java
@@ -19,8 +19,6 @@ package org.apache.servicecomb.saga.core.dag;
 
 import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
 import static org.apache.servicecomb.saga.core.Operation.TYPE_REST;
-import static java.util.Collections.emptyMap;
-import static java.util.Collections.emptySet;
 import static java.util.Collections.singleton;
 import static org.hamcrest.CoreMatchers.startsWith;
 import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
@@ -31,8 +29,12 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.when;
 
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.stream.Collectors;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.servicecomb.saga.core.NoOpSagaRequest;
 import org.apache.servicecomb.saga.core.SagaException;
@@ -49,28 +51,30 @@ import org.apache.servicecomb.saga.core.TransactionImpl;
 @SuppressWarnings("unchecked")
 public class GraphBuilderTest {
 
+  public static final Map<String, Map<String, String>> EMPTY_MAP = Collections.<String, Map<String, String>>emptyMap();
+
   private final SagaRequest request1 = new SagaRequestImpl(
       "request-aaa",
       "aaa",
       TYPE_REST,
-      new TransactionImpl("/rest/as", "post", emptyMap()),
-      new CompensationImpl("/rest/as","delete", emptyMap())
+      new TransactionImpl("/rest/as", "post", EMPTY_MAP),
+      new CompensationImpl("/rest/as","delete", EMPTY_MAP)
   );
 
   private final SagaRequest request2 = new SagaRequestImpl(
       "request-bbb",
       "bbb",
       TYPE_REST,
-      new TransactionImpl("/rest/bs", "post", emptyMap()),
-      new CompensationImpl("/rest/bs","delete", emptyMap())
+      new TransactionImpl("/rest/bs", "post", EMPTY_MAP),
+      new CompensationImpl("/rest/bs","delete", EMPTY_MAP)
   );
 
   private final SagaRequest request3 = new SagaRequestImpl(
       "request-ccc",
       "ccc",
       TYPE_REST,
-      new TransactionImpl("/rest/cs", "post", emptyMap()),
-      new CompensationImpl("/rest/cs","delete", emptyMap()),
+      new TransactionImpl("/rest/cs", "post", EMPTY_MAP),
+      new CompensationImpl("/rest/cs","delete", EMPTY_MAP),
       null,
       new String[]{"request-aaa", "request-bbb"}
   );
@@ -80,8 +84,8 @@ public class GraphBuilderTest {
       "request-duplicate-id",
       "xxx",
       TYPE_REST,
-      new TransactionImpl("/rest/xs", "post", emptyMap()),
-      new CompensationImpl("/rest/xs","delete", emptyMap())
+      new TransactionImpl("/rest/xs", "post", EMPTY_MAP),
+      new CompensationImpl("/rest/xs","delete", EMPTY_MAP)
   );
   private final SagaRequest[] duplicateRequests = {duplicateRequest, duplicateRequest};
 
@@ -90,14 +94,14 @@ public class GraphBuilderTest {
 
   @Before
   public void setUp() throws Exception {
-    when(detector.cycleJoints(any())).thenReturn(emptySet());
+    when(detector.cycleJoints((SingleLeafDirectedAcyclicGraph<SagaRequest>)any())).thenReturn((Set<Node<SagaRequest>>) Collections.EMPTY_SET);
   }
 
   @Test
   public void buildsGraphOfParallelRequests() {
     SingleLeafDirectedAcyclicGraph<SagaRequest> tasks = graphBuilder.build(requests);
 
-    Traveller<SagaRequest> traveller = new ByLevelTraveller<>(tasks, new FromRootTraversalDirection<>());
+    Traveller<SagaRequest> traveller = new ByLevelTraveller<>(tasks, new FromRootTraversalDirection<SagaRequest>());
     Collection<Node<SagaRequest>> nodes = traveller.nodes();
 
     traveller.next();
@@ -130,7 +134,7 @@ public class GraphBuilderTest {
   @Test
   public void blowsUpWhenGraphContainsCycle() {
     reset(detector);
-    when(detector.cycleJoints(any())).thenReturn(singleton(new Node<>(0L, null)));
+    when(detector.cycleJoints((SingleLeafDirectedAcyclicGraph<SagaRequest>) any())).thenReturn(singleton(new Node<SagaRequest>(0L, null)));
 
     try {
       graphBuilder.build(requests);
@@ -141,8 +145,10 @@ public class GraphBuilderTest {
   }
 
   private Collection<SagaRequest> requestsOf(Collection<Node<SagaRequest>> nodes) {
-    return nodes.stream()
-        .map(Node::value)
-        .collect(Collectors.toList());
+    List<SagaRequest> result = new ArrayList<>();
+    for(Node<SagaRequest> node: nodes) {
+      result.add(node.value());
+    }
+    return result;
   }
 }
diff --git a/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/GraphCycleDetectorTest.java b/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/GraphCycleDetectorTest.java
index 934c54b..190beeb 100644
--- a/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/GraphCycleDetectorTest.java
+++ b/saga-core/src/test/java/org/apache/servicecomb/saga/core/dag/GraphCycleDetectorTest.java
@@ -52,7 +52,6 @@ public class GraphCycleDetectorTest {
     node1.addChild(node3);
 
     Set<Node<String>> nodes = detector.cycleJoints(graph);
-
     assertThat(nodes.isEmpty(), is(true));
   }
 
@@ -63,7 +62,7 @@ public class GraphCycleDetectorTest {
     node3.addChild(node1);
 
     Set<Node<String>> nodes = detector.cycleJoints(graph);
-
+   
     assertThat(nodes, contains(node1));
   }
 
diff --git a/saga-format/src/main/java/org/apache/servicecomb/saga/format/JacksonFallback.java b/saga-format/src/main/java/org/apache/servicecomb/saga/format/JacksonFallback.java
index 443ba66..a70aafb 100644
--- a/saga-format/src/main/java/org/apache/servicecomb/saga/format/JacksonFallback.java
+++ b/saga-format/src/main/java/org/apache/servicecomb/saga/format/JacksonFallback.java
@@ -19,6 +19,7 @@ package org.apache.servicecomb.saga.format;
 
 import org.apache.servicecomb.saga.core.Fallback;
 import org.apache.servicecomb.saga.core.Operation;
+import org.apache.servicecomb.saga.core.SagaResponse;
 import org.apache.servicecomb.saga.transports.TransportFactory;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -60,5 +61,15 @@ public interface JacksonFallback extends Fallback, TransportAware {
     public Operation with(TransportFactory transport) {
       return this;
     }
+
+    @Override
+    public SagaResponse send(String address) {
+      return SUCCESSFUL_SAGA_RESPONSE;
+    }
+
+    @Override
+    public SagaResponse send(String address, SagaResponse response) {
+      return send(address);
+    }
   }
 }