You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2020/05/30 01:10:31 UTC

[servicecomb-java-chassis] branch master updated: [SCB-1947] define Filter and filter chain

This is an automated email from the ASF dual-hosted git repository.

liubao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git


The following commit(s) were added to refs/heads/master by this push:
     new 184a88f  [SCB-1947] define Filter and filter chain
184a88f is described below

commit 184a88fbe1acbdcfb6cb6d3f7535320190262de7
Author: wujimin <wu...@huawei.com>
AuthorDate: Wed May 27 21:35:26 2020 +0800

    [SCB-1947] define Filter and filter chain
---
 .../org/apache/servicecomb/core/filter/Filter.java |  47 +++++++
 .../apache/servicecomb/core/filter/FilterMeta.java |  39 ++++++
 .../apache/servicecomb/core/filter/FilterNode.java |  73 ++++++++++
 .../core/filter/impl/ScheduleFilter.java           |  65 +++++++++
 .../servicecomb/core/filter/FilterChainTest.java   | 152 +++++++++++++++++++++
 .../servicecomb/core/filter/SimpleRetryFilter.java |  89 ++++++++++++
 .../foundation/common/utils/AsyncUtils.java        |  50 +++++++
 7 files changed, 515 insertions(+)

diff --git a/core/src/main/java/org/apache/servicecomb/core/filter/Filter.java b/core/src/main/java/org/apache/servicecomb/core/filter/Filter.java
new file mode 100644
index 0000000..017f3f8
--- /dev/null
+++ b/core/src/main/java/org/apache/servicecomb/core/filter/Filter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.core.filter;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.servicecomb.core.Invocation;
+import org.apache.servicecomb.core.SCBEngine;
+import org.apache.servicecomb.swagger.invocation.Response;
+
+public interface Filter {
+  default boolean enabled() {
+    return true;
+  }
+
+  default void init(SCBEngine engine) {
+
+  }
+
+  /**
+   *
+   * @param invocation invocation
+   * @param nextNode node filter node
+   * @return response future<br>
+   *         even Response can express fail data<br>
+   *         but Response only express success data in filter chain<br>
+   *         all fail data can only express by exception<br>
+   *         <br>
+   *         special for producer:<br>
+   *           if response is failure, then after encode response, response.result will be exception.errorData, not a exception
+   */
+  CompletableFuture<Response> onFilter(Invocation invocation, FilterNode nextNode);
+}
diff --git a/core/src/main/java/org/apache/servicecomb/core/filter/FilterMeta.java b/core/src/main/java/org/apache/servicecomb/core/filter/FilterMeta.java
new file mode 100644
index 0000000..680ba45
--- /dev/null
+++ b/core/src/main/java/org/apache/servicecomb/core/filter/FilterMeta.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.core.filter;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Inherited;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Documented
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+@Inherited
+public @interface FilterMeta {
+  String name();
+
+  /**
+   *
+   * @return true to use same instance for all filter chains
+   */
+  boolean shareable() default true;
+}
diff --git a/core/src/main/java/org/apache/servicecomb/core/filter/FilterNode.java b/core/src/main/java/org/apache/servicecomb/core/filter/FilterNode.java
new file mode 100644
index 0000000..8a483b2
--- /dev/null
+++ b/core/src/main/java/org/apache/servicecomb/core/filter/FilterNode.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.core.filter;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import org.apache.servicecomb.core.Invocation;
+import org.apache.servicecomb.foundation.common.utils.AsyncUtils;
+import org.apache.servicecomb.swagger.invocation.Response;
+
+public class FilterNode {
+  public static FilterNode buildChain(Filter... filters) {
+    return buildChain(Arrays.asList(filters));
+  }
+
+  public static FilterNode buildChain(List<Filter> filters) {
+    List<FilterNode> filterNodes = filters.stream()
+        .map(FilterNode::new)
+        .collect(Collectors.toList());
+
+    for (int idx = 0; idx < filterNodes.size() - 1; idx++) {
+      filterNodes.get(idx).setNextNode(filterNodes.get(idx + 1));
+    }
+
+    return filterNodes.get(0);
+  }
+
+  private final Filter filter;
+
+  private FilterNode nextNode;
+
+  public FilterNode(Filter filter) {
+    this.filter = filter;
+  }
+
+  private void setNextNode(FilterNode nextNode) {
+    this.nextNode = nextNode;
+  }
+
+  public CompletableFuture<Response> onFilter(Invocation invocation) {
+    if (!filter.enabled()) {
+      return nextNode.onFilter(invocation);
+    }
+
+    return AsyncUtils.tryCatch(() -> filter.onFilter(invocation, nextNode))
+        .thenApply(this::rethrowExceptionInResponse);
+  }
+
+  private Response rethrowExceptionInResponse(Response response) {
+    if (response.isFailed() && response.getResult() instanceof Throwable) {
+      AsyncUtils.rethrow(response.getResult());
+    }
+
+    return response;
+  }
+}
diff --git a/core/src/main/java/org/apache/servicecomb/core/filter/impl/ScheduleFilter.java b/core/src/main/java/org/apache/servicecomb/core/filter/impl/ScheduleFilter.java
new file mode 100644
index 0000000..f64a053
--- /dev/null
+++ b/core/src/main/java/org/apache/servicecomb/core/filter/impl/ScheduleFilter.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.core.filter.impl;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import org.apache.servicecomb.core.Invocation;
+import org.apache.servicecomb.core.exception.Exceptions;
+import org.apache.servicecomb.core.filter.Filter;
+import org.apache.servicecomb.core.filter.FilterMeta;
+import org.apache.servicecomb.core.filter.FilterNode;
+import org.apache.servicecomb.core.invocation.InvocationStageTrace;
+import org.apache.servicecomb.swagger.invocation.Response;
+
+@FilterMeta(name = "schedule")
+public class ScheduleFilter implements Filter {
+  @Override
+  public CompletableFuture<Response> onFilter(Invocation invocation, FilterNode next) {
+    invocation.getInvocationStageTrace().startSchedule();
+    Executor executor = invocation.getOperationMeta().getExecutor();
+    return CompletableFuture.completedFuture(null)
+        .thenComposeAsync(response -> runInExecutor(invocation, next), executor);
+  }
+
+  private CompletableFuture<Response> runInExecutor(Invocation invocation, FilterNode next) {
+    invocation.onExecuteStart();
+    InvocationStageTrace trace = invocation.getInvocationStageTrace();
+    trace.startServerFiltersRequest();
+    trace.startHandlersRequest();
+
+    checkInQueueTimeout(invocation);
+
+    return next.onFilter(invocation)
+        .whenComplete((response, throwable) -> whenComplete(invocation));
+  }
+
+  private void checkInQueueTimeout(Invocation invocation) {
+    long nanoTimeout = invocation.getOperationMeta().getConfig()
+        .getNanoRequestWaitInPoolTimeout(invocation.getTransport().getName());
+
+    if (System.nanoTime() - invocation.getInvocationStageTrace().getStart() > nanoTimeout) {
+      throw Exceptions.genericProducer("Request in the queue timed out.");
+    }
+  }
+
+  private void whenComplete(Invocation invocation) {
+    invocation.getInvocationStageTrace().finishHandlersResponse();
+    invocation.getInvocationStageTrace().finishServerFiltersResponse();
+  }
+}
diff --git a/core/src/test/java/org/apache/servicecomb/core/filter/FilterChainTest.java b/core/src/test/java/org/apache/servicecomb/core/filter/FilterChainTest.java
new file mode 100644
index 0000000..a7d75c1
--- /dev/null
+++ b/core/src/test/java/org/apache/servicecomb/core/filter/FilterChainTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.core.filter;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.catchThrowable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Vector;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.servicecomb.core.Invocation;
+import org.apache.servicecomb.core.definition.OperationConfig;
+import org.apache.servicecomb.core.definition.OperationMeta;
+import org.apache.servicecomb.core.filter.impl.ScheduleFilter;
+import org.apache.servicecomb.swagger.invocation.Response;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import mockit.Expectations;
+import mockit.Mocked;
+
+public class FilterChainTest {
+  static final ExecutorService EXECUTOR = Executors.newSingleThreadExecutor();
+
+  static String THREAD_NAME;
+
+  @Mocked
+  Invocation invocation;
+
+  @Mocked
+  OperationMeta operationMeta;
+
+  @Mocked
+  OperationConfig operationConfig;
+
+  List<String> msg = new Vector<>();
+
+  Filter recordThreadFilter = (invocation, nextNode) -> {
+    msg.add(Thread.currentThread().getName());
+    if (nextNode == null) {
+      return CompletableFuture.completedFuture(Response.ok(null));
+    }
+
+    return nextNode.onFilter(invocation);
+  };
+
+  Filter scheduler = new ScheduleFilter();
+
+  Filter exceptionFilter = (invocation, nextNode) -> {
+    throw new IllegalStateException("e1");
+  };
+
+  @BeforeClass
+  public static void beforeClass() {
+    try {
+      THREAD_NAME = EXECUTOR.submit(() -> Thread.currentThread().getName()).get();
+    } catch (Exception e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  private void mockInvocation() {
+    new Expectations() {
+      {
+        invocation.getOperationMeta();
+        result = operationMeta;
+
+        operationMeta.getExecutor();
+        result = EXECUTOR;
+
+        operationConfig.getNanoRequestWaitInPoolTimeout(anyString);
+        result = Long.MAX_VALUE;
+      }
+    };
+  }
+
+  @Test
+  public void should_switch_thread_after_schedule() throws ExecutionException, InterruptedException {
+    mockInvocation();
+
+    FilterNode.buildChain(recordThreadFilter, scheduler, recordThreadFilter)
+        .onFilter(invocation)
+        .get();
+
+    assertThat(msg).containsExactly("main", THREAD_NAME);
+  }
+
+  @Test
+  public void should_stop_chain_when_first_filter_throw_exception() {
+    ExecutionException executionException = (ExecutionException) catchThrowable(
+        () -> FilterNode.buildChain(exceptionFilter, recordThreadFilter)
+            .onFilter(invocation)
+            .get());
+
+    assertThat(msg).isEmpty();
+    assertThat(executionException.getCause())
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessage("e1");
+  }
+
+  @Test
+  public void should_stop_chain_when_middle_filter_throw_exception() {
+    ExecutionException executionException = (ExecutionException) catchThrowable(
+        () -> FilterNode.buildChain(recordThreadFilter, exceptionFilter, recordThreadFilter)
+            .onFilter(invocation)
+            .get());
+
+    assertThat(msg).containsExactly("main");
+    assertThat(executionException.getCause())
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessage("e1");
+  }
+
+  @Test
+  public void should_support_retry_logic() {
+    Filter exceptionFilter = (invocation, nextNode) -> {
+      throw new CompletionException(new IOException("net error"));
+    };
+    SimpleRetryFilter retryFilter = new SimpleRetryFilter().setMaxRetry(3);
+
+    ExecutionException executionException = (ExecutionException) catchThrowable(
+        () -> FilterNode.buildChain(retryFilter, recordThreadFilter, exceptionFilter)
+            .onFilter(invocation)
+            .get());
+
+    assertThat(msg).containsExactly("main", "main", "main");
+    assertThat(executionException.getCause())
+        .isInstanceOf(IOException.class)
+        .hasMessage("net error");
+  }
+}
\ No newline at end of file
diff --git a/core/src/test/java/org/apache/servicecomb/core/filter/SimpleRetryFilter.java b/core/src/test/java/org/apache/servicecomb/core/filter/SimpleRetryFilter.java
new file mode 100644
index 0000000..061356d
--- /dev/null
+++ b/core/src/test/java/org/apache/servicecomb/core/filter/SimpleRetryFilter.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.core.filter;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.servicecomb.core.Invocation;
+import org.apache.servicecomb.core.exception.Exceptions;
+import org.apache.servicecomb.swagger.invocation.Response;
+
+@FilterMeta(name = "simple-retry")
+public class SimpleRetryFilter implements Filter {
+  protected int maxRetry = 3;
+
+  public SimpleRetryFilter setMaxRetry(int maxRetry) {
+    this.maxRetry = maxRetry;
+    return this;
+  }
+
+  @Override
+  public CompletableFuture<Response> onFilter(Invocation invocation, FilterNode nextNode) {
+    return new RetrySession(invocation, nextNode).run();
+  }
+
+  protected Throwable unwrapException(Throwable throwable) {
+    return Exceptions.unwrapIncludeInvocationException(throwable);
+  }
+
+  protected boolean isRetryException(Throwable throwable) {
+    return !(throwable instanceof IOException);
+  }
+
+  class RetrySession {
+    Invocation invocation;
+
+    FilterNode nextNode;
+
+    int retryCount;
+
+    CompletableFuture<Response> future = new CompletableFuture<>();
+
+    RetrySession(Invocation invocation, FilterNode nextNode) {
+      this.invocation = invocation;
+      this.nextNode = nextNode;
+    }
+
+    CompletableFuture<Response> run() {
+      nextNode.onFilter(invocation)
+          .whenComplete(this::whenNextComplete);
+      return future;
+    }
+
+    private void whenNextComplete(Response response, Throwable throwable) {
+      if (throwable == null) {
+        future.complete(response);
+        return;
+      }
+
+      Throwable unwrapped = unwrapException(throwable);
+      if (isRetryException(unwrapped)) {
+        future.completeExceptionally(throwable);
+        return;
+      }
+
+      retryCount++;
+      if (retryCount >= maxRetry) {
+        future.completeExceptionally(throwable);
+        return;
+      }
+
+      run();
+    }
+  }
+}
diff --git a/foundations/foundation-common/src/main/java/org/apache/servicecomb/foundation/common/utils/AsyncUtils.java b/foundations/foundation-common/src/main/java/org/apache/servicecomb/foundation/common/utils/AsyncUtils.java
new file mode 100644
index 0000000..3bbf7c8
--- /dev/null
+++ b/foundations/foundation-common/src/main/java/org/apache/servicecomb/foundation/common/utils/AsyncUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.foundation.common.utils;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+
+public final class AsyncUtils {
+  private AsyncUtils() {
+  }
+
+  public static <T> CompletableFuture<T> tryCatch(Supplier<CompletableFuture<T>> supplier) {
+    try {
+      return supplier.get();
+    } catch (Throwable e) {
+      return completeExceptionally(e);
+    }
+  }
+
+  public static <T> CompletableFuture<T> completeExceptionally(Throwable throwable) {
+    CompletableFuture<T> future = new CompletableFuture<>();
+    future.completeExceptionally(throwable);
+    return future;
+  }
+
+  /**
+   * throws {@code exception} as unchecked exception, without wrapping exception.
+   *
+   * @param exception exception which will be rethrow
+   * @throws T {@code exception} as unchecked exception
+   */
+  @SuppressWarnings("unchecked")
+  public static <T extends Throwable> void rethrow(Throwable exception) throws T {
+    throw (T) exception;
+  }
+}