You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2020/05/30 01:10:31 UTC
[servicecomb-java-chassis] branch master updated: [SCB-1947] define
Filter and filter chain
This is an automated email from the ASF dual-hosted git repository.
liubao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git
The following commit(s) were added to refs/heads/master by this push:
new 184a88f [SCB-1947] define Filter and filter chain
184a88f is described below
commit 184a88fbe1acbdcfb6cb6d3f7535320190262de7
Author: wujimin <wu...@huawei.com>
AuthorDate: Wed May 27 21:35:26 2020 +0800
[SCB-1947] define Filter and filter chain
---
.../org/apache/servicecomb/core/filter/Filter.java | 47 +++++++
.../apache/servicecomb/core/filter/FilterMeta.java | 39 ++++++
.../apache/servicecomb/core/filter/FilterNode.java | 73 ++++++++++
.../core/filter/impl/ScheduleFilter.java | 65 +++++++++
.../servicecomb/core/filter/FilterChainTest.java | 152 +++++++++++++++++++++
.../servicecomb/core/filter/SimpleRetryFilter.java | 89 ++++++++++++
.../foundation/common/utils/AsyncUtils.java | 50 +++++++
7 files changed, 515 insertions(+)
diff --git a/core/src/main/java/org/apache/servicecomb/core/filter/Filter.java b/core/src/main/java/org/apache/servicecomb/core/filter/Filter.java
new file mode 100644
index 0000000..017f3f8
--- /dev/null
+++ b/core/src/main/java/org/apache/servicecomb/core/filter/Filter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.core.filter;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.servicecomb.core.Invocation;
+import org.apache.servicecomb.core.SCBEngine;
+import org.apache.servicecomb.swagger.invocation.Response;
+
+public interface Filter {
+ default boolean enabled() {
+ return true;
+ }
+
+ default void init(SCBEngine engine) {
+
+ }
+
+ /**
+ *
+ * @param invocation invocation
+ * @param nextNode node filter node
+ * @return response future<br>
+ * even Response can express fail data<br>
+ * but Response only express success data in filter chain<br>
+ * all fail data can only express by exception<br>
+ * <br>
+ * special for producer:<br>
+ * if response is failure, then after encode response, response.result will be exception.errorData, not a exception
+ */
+ CompletableFuture<Response> onFilter(Invocation invocation, FilterNode nextNode);
+}
diff --git a/core/src/main/java/org/apache/servicecomb/core/filter/FilterMeta.java b/core/src/main/java/org/apache/servicecomb/core/filter/FilterMeta.java
new file mode 100644
index 0000000..680ba45
--- /dev/null
+++ b/core/src/main/java/org/apache/servicecomb/core/filter/FilterMeta.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.core.filter;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Inherited;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Documented
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+@Inherited
+public @interface FilterMeta {
+ String name();
+
+ /**
+ *
+ * @return true to use same instance for all filter chains
+ */
+ boolean shareable() default true;
+}
diff --git a/core/src/main/java/org/apache/servicecomb/core/filter/FilterNode.java b/core/src/main/java/org/apache/servicecomb/core/filter/FilterNode.java
new file mode 100644
index 0000000..8a483b2
--- /dev/null
+++ b/core/src/main/java/org/apache/servicecomb/core/filter/FilterNode.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.core.filter;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import org.apache.servicecomb.core.Invocation;
+import org.apache.servicecomb.foundation.common.utils.AsyncUtils;
+import org.apache.servicecomb.swagger.invocation.Response;
+
+public class FilterNode {
+ public static FilterNode buildChain(Filter... filters) {
+ return buildChain(Arrays.asList(filters));
+ }
+
+ public static FilterNode buildChain(List<Filter> filters) {
+ List<FilterNode> filterNodes = filters.stream()
+ .map(FilterNode::new)
+ .collect(Collectors.toList());
+
+ for (int idx = 0; idx < filterNodes.size() - 1; idx++) {
+ filterNodes.get(idx).setNextNode(filterNodes.get(idx + 1));
+ }
+
+ return filterNodes.get(0);
+ }
+
+ private final Filter filter;
+
+ private FilterNode nextNode;
+
+ public FilterNode(Filter filter) {
+ this.filter = filter;
+ }
+
+ private void setNextNode(FilterNode nextNode) {
+ this.nextNode = nextNode;
+ }
+
+ public CompletableFuture<Response> onFilter(Invocation invocation) {
+ if (!filter.enabled()) {
+ return nextNode.onFilter(invocation);
+ }
+
+ return AsyncUtils.tryCatch(() -> filter.onFilter(invocation, nextNode))
+ .thenApply(this::rethrowExceptionInResponse);
+ }
+
+ private Response rethrowExceptionInResponse(Response response) {
+ if (response.isFailed() && response.getResult() instanceof Throwable) {
+ AsyncUtils.rethrow(response.getResult());
+ }
+
+ return response;
+ }
+}
diff --git a/core/src/main/java/org/apache/servicecomb/core/filter/impl/ScheduleFilter.java b/core/src/main/java/org/apache/servicecomb/core/filter/impl/ScheduleFilter.java
new file mode 100644
index 0000000..f64a053
--- /dev/null
+++ b/core/src/main/java/org/apache/servicecomb/core/filter/impl/ScheduleFilter.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.core.filter.impl;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import org.apache.servicecomb.core.Invocation;
+import org.apache.servicecomb.core.exception.Exceptions;
+import org.apache.servicecomb.core.filter.Filter;
+import org.apache.servicecomb.core.filter.FilterMeta;
+import org.apache.servicecomb.core.filter.FilterNode;
+import org.apache.servicecomb.core.invocation.InvocationStageTrace;
+import org.apache.servicecomb.swagger.invocation.Response;
+
+@FilterMeta(name = "schedule")
+public class ScheduleFilter implements Filter {
+ @Override
+ public CompletableFuture<Response> onFilter(Invocation invocation, FilterNode next) {
+ invocation.getInvocationStageTrace().startSchedule();
+ Executor executor = invocation.getOperationMeta().getExecutor();
+ return CompletableFuture.completedFuture(null)
+ .thenComposeAsync(response -> runInExecutor(invocation, next), executor);
+ }
+
+ private CompletableFuture<Response> runInExecutor(Invocation invocation, FilterNode next) {
+ invocation.onExecuteStart();
+ InvocationStageTrace trace = invocation.getInvocationStageTrace();
+ trace.startServerFiltersRequest();
+ trace.startHandlersRequest();
+
+ checkInQueueTimeout(invocation);
+
+ return next.onFilter(invocation)
+ .whenComplete((response, throwable) -> whenComplete(invocation));
+ }
+
+ private void checkInQueueTimeout(Invocation invocation) {
+ long nanoTimeout = invocation.getOperationMeta().getConfig()
+ .getNanoRequestWaitInPoolTimeout(invocation.getTransport().getName());
+
+ if (System.nanoTime() - invocation.getInvocationStageTrace().getStart() > nanoTimeout) {
+ throw Exceptions.genericProducer("Request in the queue timed out.");
+ }
+ }
+
+ private void whenComplete(Invocation invocation) {
+ invocation.getInvocationStageTrace().finishHandlersResponse();
+ invocation.getInvocationStageTrace().finishServerFiltersResponse();
+ }
+}
diff --git a/core/src/test/java/org/apache/servicecomb/core/filter/FilterChainTest.java b/core/src/test/java/org/apache/servicecomb/core/filter/FilterChainTest.java
new file mode 100644
index 0000000..a7d75c1
--- /dev/null
+++ b/core/src/test/java/org/apache/servicecomb/core/filter/FilterChainTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.core.filter;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.catchThrowable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Vector;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.servicecomb.core.Invocation;
+import org.apache.servicecomb.core.definition.OperationConfig;
+import org.apache.servicecomb.core.definition.OperationMeta;
+import org.apache.servicecomb.core.filter.impl.ScheduleFilter;
+import org.apache.servicecomb.swagger.invocation.Response;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import mockit.Expectations;
+import mockit.Mocked;
+
+public class FilterChainTest {
+ static final ExecutorService EXECUTOR = Executors.newSingleThreadExecutor();
+
+ static String THREAD_NAME;
+
+ @Mocked
+ Invocation invocation;
+
+ @Mocked
+ OperationMeta operationMeta;
+
+ @Mocked
+ OperationConfig operationConfig;
+
+ List<String> msg = new Vector<>();
+
+ Filter recordThreadFilter = (invocation, nextNode) -> {
+ msg.add(Thread.currentThread().getName());
+ if (nextNode == null) {
+ return CompletableFuture.completedFuture(Response.ok(null));
+ }
+
+ return nextNode.onFilter(invocation);
+ };
+
+ Filter scheduler = new ScheduleFilter();
+
+ Filter exceptionFilter = (invocation, nextNode) -> {
+ throw new IllegalStateException("e1");
+ };
+
+ @BeforeClass
+ public static void beforeClass() {
+ try {
+ THREAD_NAME = EXECUTOR.submit(() -> Thread.currentThread().getName()).get();
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private void mockInvocation() {
+ new Expectations() {
+ {
+ invocation.getOperationMeta();
+ result = operationMeta;
+
+ operationMeta.getExecutor();
+ result = EXECUTOR;
+
+ operationConfig.getNanoRequestWaitInPoolTimeout(anyString);
+ result = Long.MAX_VALUE;
+ }
+ };
+ }
+
+ @Test
+ public void should_switch_thread_after_schedule() throws ExecutionException, InterruptedException {
+ mockInvocation();
+
+ FilterNode.buildChain(recordThreadFilter, scheduler, recordThreadFilter)
+ .onFilter(invocation)
+ .get();
+
+ assertThat(msg).containsExactly("main", THREAD_NAME);
+ }
+
+ @Test
+ public void should_stop_chain_when_first_filter_throw_exception() {
+ ExecutionException executionException = (ExecutionException) catchThrowable(
+ () -> FilterNode.buildChain(exceptionFilter, recordThreadFilter)
+ .onFilter(invocation)
+ .get());
+
+ assertThat(msg).isEmpty();
+ assertThat(executionException.getCause())
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage("e1");
+ }
+
+ @Test
+ public void should_stop_chain_when_middle_filter_throw_exception() {
+ ExecutionException executionException = (ExecutionException) catchThrowable(
+ () -> FilterNode.buildChain(recordThreadFilter, exceptionFilter, recordThreadFilter)
+ .onFilter(invocation)
+ .get());
+
+ assertThat(msg).containsExactly("main");
+ assertThat(executionException.getCause())
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage("e1");
+ }
+
+ @Test
+ public void should_support_retry_logic() {
+ Filter exceptionFilter = (invocation, nextNode) -> {
+ throw new CompletionException(new IOException("net error"));
+ };
+ SimpleRetryFilter retryFilter = new SimpleRetryFilter().setMaxRetry(3);
+
+ ExecutionException executionException = (ExecutionException) catchThrowable(
+ () -> FilterNode.buildChain(retryFilter, recordThreadFilter, exceptionFilter)
+ .onFilter(invocation)
+ .get());
+
+ assertThat(msg).containsExactly("main", "main", "main");
+ assertThat(executionException.getCause())
+ .isInstanceOf(IOException.class)
+ .hasMessage("net error");
+ }
+}
\ No newline at end of file
diff --git a/core/src/test/java/org/apache/servicecomb/core/filter/SimpleRetryFilter.java b/core/src/test/java/org/apache/servicecomb/core/filter/SimpleRetryFilter.java
new file mode 100644
index 0000000..061356d
--- /dev/null
+++ b/core/src/test/java/org/apache/servicecomb/core/filter/SimpleRetryFilter.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.core.filter;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.servicecomb.core.Invocation;
+import org.apache.servicecomb.core.exception.Exceptions;
+import org.apache.servicecomb.swagger.invocation.Response;
+
+@FilterMeta(name = "simple-retry")
+public class SimpleRetryFilter implements Filter {
+ protected int maxRetry = 3;
+
+ public SimpleRetryFilter setMaxRetry(int maxRetry) {
+ this.maxRetry = maxRetry;
+ return this;
+ }
+
+ @Override
+ public CompletableFuture<Response> onFilter(Invocation invocation, FilterNode nextNode) {
+ return new RetrySession(invocation, nextNode).run();
+ }
+
+ protected Throwable unwrapException(Throwable throwable) {
+ return Exceptions.unwrapIncludeInvocationException(throwable);
+ }
+
+ protected boolean isRetryException(Throwable throwable) {
+ return !(throwable instanceof IOException);
+ }
+
+ class RetrySession {
+ Invocation invocation;
+
+ FilterNode nextNode;
+
+ int retryCount;
+
+ CompletableFuture<Response> future = new CompletableFuture<>();
+
+ RetrySession(Invocation invocation, FilterNode nextNode) {
+ this.invocation = invocation;
+ this.nextNode = nextNode;
+ }
+
+ CompletableFuture<Response> run() {
+ nextNode.onFilter(invocation)
+ .whenComplete(this::whenNextComplete);
+ return future;
+ }
+
+ private void whenNextComplete(Response response, Throwable throwable) {
+ if (throwable == null) {
+ future.complete(response);
+ return;
+ }
+
+ Throwable unwrapped = unwrapException(throwable);
+ if (isRetryException(unwrapped)) {
+ future.completeExceptionally(throwable);
+ return;
+ }
+
+ retryCount++;
+ if (retryCount >= maxRetry) {
+ future.completeExceptionally(throwable);
+ return;
+ }
+
+ run();
+ }
+ }
+}
diff --git a/foundations/foundation-common/src/main/java/org/apache/servicecomb/foundation/common/utils/AsyncUtils.java b/foundations/foundation-common/src/main/java/org/apache/servicecomb/foundation/common/utils/AsyncUtils.java
new file mode 100644
index 0000000..3bbf7c8
--- /dev/null
+++ b/foundations/foundation-common/src/main/java/org/apache/servicecomb/foundation/common/utils/AsyncUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.foundation.common.utils;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+
+public final class AsyncUtils {
+ private AsyncUtils() {
+ }
+
+ public static <T> CompletableFuture<T> tryCatch(Supplier<CompletableFuture<T>> supplier) {
+ try {
+ return supplier.get();
+ } catch (Throwable e) {
+ return completeExceptionally(e);
+ }
+ }
+
+ public static <T> CompletableFuture<T> completeExceptionally(Throwable throwable) {
+ CompletableFuture<T> future = new CompletableFuture<>();
+ future.completeExceptionally(throwable);
+ return future;
+ }
+
+ /**
+ * throws {@code exception} as unchecked exception, without wrapping exception.
+ *
+ * @param exception exception which will be rethrow
+ * @throws T {@code exception} as unchecked exception
+ */
+ @SuppressWarnings("unchecked")
+ public static <T extends Throwable> void rethrow(Throwable exception) throws T {
+ throw (T) exception;
+ }
+}