You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2020/06/01 09:38:32 UTC
[servicecomb-java-chassis] branch master updated: [SCB-1967] add
simple load balance
This is an automated email from the ASF dual-hosted git repository.
liubao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git
The following commit(s) were added to refs/heads/master by this push:
new 9a0c45f [SCB-1967] add simple load balance
9a0c45f is described below
commit 9a0c45f580e2552a39c8b42a9209b5545cf654b2
Author: wujimin <wu...@huawei.com>
AuthorDate: Sat May 30 21:23:21 2020 +0800
[SCB-1967] add simple load balance
---
.../servicecomb/core/exception/ExceptionCodes.java | 1 +
.../impl/DefaultFilterProvider.java} | 18 +++-
.../core/filter/impl/ScheduleFilter.java | 4 +-
.../core/filter/impl/SimpleLoadBalanceFilter.java | 102 +++++++++++++++++++++
...g.apache.servicecomb.core.filter.FilterProvider | 18 ++++
.../filter/impl/SimpleLoadBalanceFilterTest.java | 76 +++++++++++++++
6 files changed, 213 insertions(+), 6 deletions(-)
diff --git a/core/src/main/java/org/apache/servicecomb/core/exception/ExceptionCodes.java b/core/src/main/java/org/apache/servicecomb/core/exception/ExceptionCodes.java
index e67b4ba..ad3f5e4 100644
--- a/core/src/main/java/org/apache/servicecomb/core/exception/ExceptionCodes.java
+++ b/core/src/main/java/org/apache/servicecomb/core/exception/ExceptionCodes.java
@@ -18,6 +18,7 @@ package org.apache.servicecomb.core.exception;
public interface ExceptionCodes {
String GENERIC_CLIENT = "SCB.0000";
+ String LB_ADDRESS_NOT_FOUND = "SCB.0001";
String NOT_DEFINED_ANY_SCHEMA = "SCB.0002";
String GENERIC_SERVER = "SCB.5000";
diff --git a/core/src/main/java/org/apache/servicecomb/core/exception/ExceptionCodes.java b/core/src/main/java/org/apache/servicecomb/core/filter/impl/DefaultFilterProvider.java
similarity index 64%
copy from core/src/main/java/org/apache/servicecomb/core/exception/ExceptionCodes.java
copy to core/src/main/java/org/apache/servicecomb/core/filter/impl/DefaultFilterProvider.java
index e67b4ba..e2a2054 100644
--- a/core/src/main/java/org/apache/servicecomb/core/exception/ExceptionCodes.java
+++ b/core/src/main/java/org/apache/servicecomb/core/filter/impl/DefaultFilterProvider.java
@@ -14,11 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.servicecomb.core.exception;
+package org.apache.servicecomb.core.filter.impl;
-public interface ExceptionCodes {
- String GENERIC_CLIENT = "SCB.0000";
- String NOT_DEFINED_ANY_SCHEMA = "SCB.0002";
+import java.util.Arrays;
+import java.util.List;
- String GENERIC_SERVER = "SCB.5000";
+import org.apache.servicecomb.core.filter.Filter;
+import org.apache.servicecomb.core.filter.FilterProvider;
+
+public class DefaultFilterProvider implements FilterProvider {
+ @Override
+ public List<Class<? extends Filter>> getFilters() {
+ return Arrays.asList(
+ SimpleLoadBalanceFilter.class,
+ ScheduleFilter.class);
+ }
}
diff --git a/core/src/main/java/org/apache/servicecomb/core/filter/impl/ScheduleFilter.java b/core/src/main/java/org/apache/servicecomb/core/filter/impl/ScheduleFilter.java
index f64a053..a74bddf 100644
--- a/core/src/main/java/org/apache/servicecomb/core/filter/impl/ScheduleFilter.java
+++ b/core/src/main/java/org/apache/servicecomb/core/filter/impl/ScheduleFilter.java
@@ -16,6 +16,8 @@
*/
package org.apache.servicecomb.core.filter.impl;
+import static org.apache.servicecomb.swagger.invocation.InvocationType.PRODUCER;
+
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
@@ -27,7 +29,7 @@ import org.apache.servicecomb.core.filter.FilterNode;
import org.apache.servicecomb.core.invocation.InvocationStageTrace;
import org.apache.servicecomb.swagger.invocation.Response;
-@FilterMeta(name = "schedule")
+@FilterMeta(name = "schedule", invocationType = PRODUCER)
public class ScheduleFilter implements Filter {
@Override
public CompletableFuture<Response> onFilter(Invocation invocation, FilterNode next) {
diff --git a/core/src/main/java/org/apache/servicecomb/core/filter/impl/SimpleLoadBalanceFilter.java b/core/src/main/java/org/apache/servicecomb/core/filter/impl/SimpleLoadBalanceFilter.java
new file mode 100644
index 0000000..557cf1b
--- /dev/null
+++ b/core/src/main/java/org/apache/servicecomb/core/filter/impl/SimpleLoadBalanceFilter.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.core.filter.impl;
+
+import static org.apache.servicecomb.core.exception.ExceptionCodes.LB_ADDRESS_NOT_FOUND;
+import static org.apache.servicecomb.swagger.invocation.InvocationType.CONSUMER;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.servicecomb.core.Endpoint;
+import org.apache.servicecomb.core.Invocation;
+import org.apache.servicecomb.core.exception.Exceptions;
+import org.apache.servicecomb.core.filter.Filter;
+import org.apache.servicecomb.core.filter.FilterMeta;
+import org.apache.servicecomb.core.filter.FilterNode;
+import org.apache.servicecomb.core.handler.impl.SimpleLoadBalanceHandler;
+import org.apache.servicecomb.core.registry.discovery.EndpointDiscoveryFilter;
+import org.apache.servicecomb.foundation.common.cache.VersionedCache;
+import org.apache.servicecomb.foundation.common.concurrent.ConcurrentHashMapEx;
+import org.apache.servicecomb.registry.discovery.DiscoveryContext;
+import org.apache.servicecomb.registry.discovery.DiscoveryFilter;
+import org.apache.servicecomb.registry.discovery.DiscoveryTree;
+import org.apache.servicecomb.swagger.invocation.Response;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * build-in round robin LB, for demo scenes
+ */
+@FilterMeta(name = "simple-load-balance", invocationType = CONSUMER, shareable = false)
+public class SimpleLoadBalanceFilter implements Filter {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SimpleLoadBalanceHandler.class);
+
+ private DiscoveryTree discoveryTree = new DiscoveryTree();
+
+ // key is grouping filter qualified name
+ private volatile Map<String, AtomicInteger> indexMap = new ConcurrentHashMapEx<>();
+
+ public SimpleLoadBalanceFilter() {
+ discoveryTree.loadFromSPI(DiscoveryFilter.class);
+ discoveryTree.addFilter(new EndpointDiscoveryFilter());
+ discoveryTree.sort();
+ }
+
+ @Override
+ public CompletableFuture<Response> onFilter(Invocation invocation, FilterNode nextNode) {
+ if (invocation.getEndpoint() != null) {
+ return nextNode.onFilter(invocation);
+ }
+
+ invocation.setEndpoint(selectEndpoint(invocation));
+ return nextNode.onFilter(invocation);
+ }
+
+ public Endpoint selectEndpoint(Invocation invocation) {
+ DiscoveryContext context = new DiscoveryContext();
+ context.setInputParameters(invocation);
+ VersionedCache endpointsVersionedCache = discoveryTree.discovery(context,
+ invocation.getAppId(),
+ invocation.getMicroserviceName(),
+ invocation.getMicroserviceVersionRule());
+ if (endpointsVersionedCache.isEmpty()) {
+ String msg = "No available address found.";
+ LOGGER.error("{} microserviceName={}, version={}, discoveryGroupName={}",
+ msg,
+ invocation.getMicroserviceName(),
+ invocation.getMicroserviceVersionRule(),
+ endpointsVersionedCache.name());
+ throw Exceptions.consumer(LB_ADDRESS_NOT_FOUND, msg);
+ }
+
+ List<Endpoint> endpoints = endpointsVersionedCache.data();
+ AtomicInteger index = indexMap.computeIfAbsent(endpointsVersionedCache.name(), name -> {
+ LOGGER.info("Create loadBalancer for {}.", name);
+ return new AtomicInteger();
+ });
+ LOGGER.debug("invocation {} use discoveryGroup {}.",
+ invocation.getMicroserviceQualifiedName(),
+ endpointsVersionedCache.name());
+
+ int idx = Math.abs(index.getAndIncrement());
+ idx = idx % endpoints.size();
+ return endpoints.get(idx);
+ }
+}
diff --git a/core/src/main/resources/META-INF/services/org.apache.servicecomb.core.filter.FilterProvider b/core/src/main/resources/META-INF/services/org.apache.servicecomb.core.filter.FilterProvider
new file mode 100644
index 0000000..51f9e53
--- /dev/null
+++ b/core/src/main/resources/META-INF/services/org.apache.servicecomb.core.filter.FilterProvider
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.servicecomb.core.filter.impl.DefaultFilterProvider
\ No newline at end of file
diff --git a/core/src/test/java/org/apache/servicecomb/core/filter/impl/SimpleLoadBalanceFilterTest.java b/core/src/test/java/org/apache/servicecomb/core/filter/impl/SimpleLoadBalanceFilterTest.java
new file mode 100644
index 0000000..589df77
--- /dev/null
+++ b/core/src/test/java/org/apache/servicecomb/core/filter/impl/SimpleLoadBalanceFilterTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.core.filter.impl;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.servicecomb.core.Endpoint;
+import org.apache.servicecomb.core.Invocation;
+import org.apache.servicecomb.core.filter.FilterNode;
+import org.apache.servicecomb.registry.discovery.DiscoveryContext;
+import org.apache.servicecomb.swagger.invocation.Response;
+import org.junit.Test;
+
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mocked;
+import mockit.Verifications;
+
+public class SimpleLoadBalanceFilterTest {
+ SimpleLoadBalanceFilter filter = new SimpleLoadBalanceFilter();
+
+ @Injectable
+ Invocation invocation;
+
+ @Injectable
+ Endpoint endpoint;
+
+ @Mocked
+ DiscoveryContext discoveryContext;
+
+ @Injectable
+ FilterNode nextNode;
+
+ @Test
+ public void should_invoke_next_directly_when_invocation_already_has_endpoint()
+ throws ExecutionException, InterruptedException {
+ Response response = Response.ok("ok");
+ new Expectations() {
+ {
+ invocation.getEndpoint();
+ result = endpoint;
+
+ nextNode.onFilter(invocation);
+ result = CompletableFuture.completedFuture(response);
+ }
+ };
+
+ Response result = filter.onFilter(invocation, nextNode).get();
+
+ assertThat(result).isSameAs(response);
+ new Verifications() {
+ {
+ discoveryContext.setInputParameters(invocation);
+ times = 0;
+ }
+ };
+ }
+}
\ No newline at end of file