You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2020/06/03 07:50:38 UTC

[servicecomb-java-chassis] branch master updated: [SCB-1980] add TransportFilters to select chain by transport in runtime

This is an automated email from the ASF dual-hosted git repository.

liubao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git


The following commit(s) were added to refs/heads/master by this push:
     new 47f43ff  [SCB-1980] add TransportFilters to select chain by transport in runtime
47f43ff is described below

commit 47f43ffb7458a7093a361618bf6ab68cb9d1c1cd
Author: wujimin <wu...@huawei.com>
AuthorDate: Wed Jun 3 13:01:01 2020 +0800

    [SCB-1980] add TransportFilters to select chain by transport in runtime
---
 .../apache/servicecomb/core/filter/FilterNode.java | 18 ++++-
 .../core/filter/impl/TransportFilters.java         | 49 ++++++++++++++
 .../servicecomb/core/filter/FilterChainTest.java   | 76 ++++++++++++++++++++--
 3 files changed, 138 insertions(+), 5 deletions(-)

diff --git a/core/src/main/java/org/apache/servicecomb/core/filter/FilterNode.java b/core/src/main/java/org/apache/servicecomb/core/filter/FilterNode.java
index 89a336b..cbed2f8 100644
--- a/core/src/main/java/org/apache/servicecomb/core/filter/FilterNode.java
+++ b/core/src/main/java/org/apache/servicecomb/core/filter/FilterNode.java
@@ -22,6 +22,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 
 import org.apache.servicecomb.core.Invocation;
+import org.apache.servicecomb.core.filter.impl.TransportFilters;
 import org.apache.servicecomb.foundation.common.utils.AsyncUtils;
 import org.apache.servicecomb.swagger.invocation.Response;
 
@@ -43,12 +44,27 @@ public class FilterNode {
         .collect(Collectors.toList());
 
     for (int idx = 0; idx < filterNodes.size() - 1; idx++) {
-      filterNodes.get(idx).setNextNode(filterNodes.get(idx + 1));
+      FilterNode currentNode = filterNodes.get(idx);
+      FilterNode nextNode = filterNodes.get(idx + 1);
+      currentNode.setNextNode(nextNode);
+
+      if (currentNode.filter instanceof TransportFilters) {
+        mergeToChain((TransportFilters) currentNode.filter, nextNode);
+      }
     }
 
     return filterNodes.get(0);
   }
 
+  private static void mergeToChain(TransportFilters filter, FilterNode nextNode) {
+    for (FilterNode node : filter.getChainByTransport().values()) {
+      while (node.nextNode != null) {
+        node = node.nextNode;
+      }
+      node.nextNode = nextNode;
+    }
+  }
+
   private final Filter filter;
 
   private FilterNode nextNode;
diff --git a/core/src/main/java/org/apache/servicecomb/core/filter/impl/TransportFilters.java b/core/src/main/java/org/apache/servicecomb/core/filter/impl/TransportFilters.java
new file mode 100644
index 0000000..2170af3
--- /dev/null
+++ b/core/src/main/java/org/apache/servicecomb/core/filter/impl/TransportFilters.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.core.filter.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.servicecomb.core.Invocation;
+import org.apache.servicecomb.core.filter.Filter;
+import org.apache.servicecomb.core.filter.FilterMeta;
+import org.apache.servicecomb.core.filter.FilterNode;
+import org.apache.servicecomb.swagger.invocation.Response;
+
+/**
+ * Internal use only, will not publish by {@link DefaultFilterProvider}
+ */
+@FilterMeta(name = "transport-filters")
+public class TransportFilters implements Filter {
+  private Map<String, FilterNode> chainByTransport = new HashMap<>();
+
+  public Map<String, FilterNode> getChainByTransport() {
+    return chainByTransport;
+  }
+
+  @Override
+  public CompletableFuture<Response> onFilter(Invocation invocation, FilterNode nextNode) {
+    FilterNode filterNode = chainByTransport.get(invocation.getTransport().getName());
+    if (filterNode == null) {
+      return nextNode.onFilter(invocation);
+    }
+    
+    return filterNode.onFilter(invocation);
+  }
+}
diff --git a/core/src/test/java/org/apache/servicecomb/core/filter/FilterChainTest.java b/core/src/test/java/org/apache/servicecomb/core/filter/FilterChainTest.java
index d184fa4..85b805c 100644
--- a/core/src/test/java/org/apache/servicecomb/core/filter/FilterChainTest.java
+++ b/core/src/test/java/org/apache/servicecomb/core/filter/FilterChainTest.java
@@ -17,6 +17,9 @@
 
 package org.apache.servicecomb.core.filter;
 
+import static org.apache.servicecomb.core.Const.HIGHWAY;
+import static org.apache.servicecomb.core.Const.RESTFUL;
+import static org.apache.servicecomb.core.filter.FilterNode.buildChain;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.catchThrowable;
 
@@ -30,9 +33,11 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import org.apache.servicecomb.core.Invocation;
+import org.apache.servicecomb.core.Transport;
 import org.apache.servicecomb.core.definition.OperationConfig;
 import org.apache.servicecomb.core.definition.OperationMeta;
 import org.apache.servicecomb.core.filter.impl.ScheduleFilter;
+import org.apache.servicecomb.core.filter.impl.TransportFilters;
 import org.apache.servicecomb.swagger.invocation.Response;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -99,7 +104,7 @@ public class FilterChainTest {
   public void should_switch_thread_after_schedule() throws ExecutionException, InterruptedException {
     mockInvocation();
 
-    FilterNode.buildChain(recordThreadFilter, scheduler, recordThreadFilter)
+    buildChain(recordThreadFilter, scheduler, recordThreadFilter)
         .onFilter(invocation)
         .get();
 
@@ -109,7 +114,7 @@ public class FilterChainTest {
   @Test
   public void should_stop_chain_when_first_filter_throw_exception() {
     ExecutionException executionException = (ExecutionException) catchThrowable(
-        () -> FilterNode.buildChain(exceptionFilter, recordThreadFilter)
+        () -> buildChain(exceptionFilter, recordThreadFilter)
             .onFilter(invocation)
             .get());
 
@@ -122,7 +127,7 @@ public class FilterChainTest {
   @Test
   public void should_stop_chain_when_middle_filter_throw_exception() {
     ExecutionException executionException = (ExecutionException) catchThrowable(
-        () -> FilterNode.buildChain(recordThreadFilter, exceptionFilter, recordThreadFilter)
+        () -> buildChain(recordThreadFilter, exceptionFilter, recordThreadFilter)
             .onFilter(invocation)
             .get());
 
@@ -139,7 +144,7 @@ public class FilterChainTest {
     };
     SimpleRetryFilter retryFilter = new SimpleRetryFilter().setMaxRetry(3);
 
-    CompletableFuture<Response> future = FilterNode.buildChain(retryFilter, recordThreadFilter, exceptionFilter)
+    CompletableFuture<Response> future = buildChain(retryFilter, recordThreadFilter, exceptionFilter)
         .onFilter(invocation);
 
     assertThat(msg).containsExactly("main", "main", "main");
@@ -148,4 +153,67 @@ public class FilterChainTest {
         .isExactlyInstanceOf(IOException.class)
         .hasMessage("net error");
   }
+
+  @Test
+  public void should_build_chain_with_TransportFilters(@Mocked Transport transport)
+      throws ExecutionException, InterruptedException {
+    mockInvocation();
+    new Expectations() {
+      {
+        invocation.getTransport();
+        result = transport;
+      }
+    };
+    TransportFilters transportFilters = new TransportFilters();
+    transportFilters.getChainByTransport().put(RESTFUL, buildChain(recordThreadFilter));
+    transportFilters.getChainByTransport().put(HIGHWAY, buildChain(recordThreadFilter, scheduler, recordThreadFilter));
+
+    FilterNode chain = buildChain(transportFilters, recordThreadFilter);
+
+    checkRestChain(transport, chain);
+    checkHighwayChain(transport, chain);
+    checkUnknownTransportChain(transport, chain);
+  }
+
+  private void checkUnknownTransportChain(Transport transport, FilterNode chain)
+      throws ExecutionException, InterruptedException {
+    msg.clear();
+    new Expectations() {
+      {
+        transport.getName();
+        result = "abc";
+      }
+    };
+    chain.onFilter(invocation)
+        .get();
+    assertThat(msg).containsExactly("main");
+  }
+
+  private void checkRestChain(Transport transport, FilterNode chain)
+      throws InterruptedException, ExecutionException {
+    msg.clear();
+    new Expectations() {
+      {
+        transport.getName();
+        result = RESTFUL;
+      }
+    };
+    chain.onFilter(invocation)
+        .get();
+    assertThat(msg).containsExactly("main", "main");
+  }
+
+  private void checkHighwayChain(Transport transport, FilterNode chain)
+      throws InterruptedException, ExecutionException {
+    msg.clear();
+    new Expectations() {
+      {
+        transport.getName();
+        result = HIGHWAY;
+      }
+    };
+    chain.onFilter(invocation)
+        .get();
+    assertThat(msg).containsExactly("main", THREAD_NAME, THREAD_NAME);
+  }
 }
\ No newline at end of file