You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2020/06/01 09:38:32 UTC

[servicecomb-java-chassis] branch master updated: [SCB-1967] add simple load balance

This is an automated email from the ASF dual-hosted git repository.

liubao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a0c45f  [SCB-1967] add simple load balance
9a0c45f is described below

commit 9a0c45f580e2552a39c8b42a9209b5545cf654b2
Author: wujimin <wu...@huawei.com>
AuthorDate: Sat May 30 21:23:21 2020 +0800

    [SCB-1967] add simple load balance
---
 .../servicecomb/core/exception/ExceptionCodes.java |   1 +
 .../impl/DefaultFilterProvider.java}               |  18 +++-
 .../core/filter/impl/ScheduleFilter.java           |   4 +-
 .../core/filter/impl/SimpleLoadBalanceFilter.java  | 102 +++++++++++++++++++++
 ...g.apache.servicecomb.core.filter.FilterProvider |  18 ++++
 .../filter/impl/SimpleLoadBalanceFilterTest.java   |  76 +++++++++++++++
 6 files changed, 213 insertions(+), 6 deletions(-)

diff --git a/core/src/main/java/org/apache/servicecomb/core/exception/ExceptionCodes.java b/core/src/main/java/org/apache/servicecomb/core/exception/ExceptionCodes.java
index e67b4ba..ad3f5e4 100644
--- a/core/src/main/java/org/apache/servicecomb/core/exception/ExceptionCodes.java
+++ b/core/src/main/java/org/apache/servicecomb/core/exception/ExceptionCodes.java
@@ -18,6 +18,7 @@ package org.apache.servicecomb.core.exception;
 
 public interface ExceptionCodes {
   String GENERIC_CLIENT = "SCB.0000";
+  String LB_ADDRESS_NOT_FOUND = "SCB.0001";
   String NOT_DEFINED_ANY_SCHEMA = "SCB.0002";
 
   String GENERIC_SERVER = "SCB.5000";
diff --git a/core/src/main/java/org/apache/servicecomb/core/exception/ExceptionCodes.java b/core/src/main/java/org/apache/servicecomb/core/filter/impl/DefaultFilterProvider.java
similarity index 64%
copy from core/src/main/java/org/apache/servicecomb/core/exception/ExceptionCodes.java
copy to core/src/main/java/org/apache/servicecomb/core/filter/impl/DefaultFilterProvider.java
index e67b4ba..e2a2054 100644
--- a/core/src/main/java/org/apache/servicecomb/core/exception/ExceptionCodes.java
+++ b/core/src/main/java/org/apache/servicecomb/core/filter/impl/DefaultFilterProvider.java
@@ -14,11 +14,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.servicecomb.core.exception;
+package org.apache.servicecomb.core.filter.impl;
 
-public interface ExceptionCodes {
-  String GENERIC_CLIENT = "SCB.0000";
-  String NOT_DEFINED_ANY_SCHEMA = "SCB.0002";
+import java.util.Arrays;
+import java.util.List;
 
-  String GENERIC_SERVER = "SCB.5000";
+import org.apache.servicecomb.core.filter.Filter;
+import org.apache.servicecomb.core.filter.FilterProvider;
+
+public class DefaultFilterProvider implements FilterProvider {
+  @Override
+  public List<Class<? extends Filter>> getFilters() {
+    return Arrays.asList(
+        SimpleLoadBalanceFilter.class,
+        ScheduleFilter.class);
+  }
 }
diff --git a/core/src/main/java/org/apache/servicecomb/core/filter/impl/ScheduleFilter.java b/core/src/main/java/org/apache/servicecomb/core/filter/impl/ScheduleFilter.java
index f64a053..a74bddf 100644
--- a/core/src/main/java/org/apache/servicecomb/core/filter/impl/ScheduleFilter.java
+++ b/core/src/main/java/org/apache/servicecomb/core/filter/impl/ScheduleFilter.java
@@ -16,6 +16,8 @@
  */
 package org.apache.servicecomb.core.filter.impl;
 
+import static org.apache.servicecomb.swagger.invocation.InvocationType.PRODUCER;
+
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
@@ -27,7 +29,7 @@ import org.apache.servicecomb.core.filter.FilterNode;
 import org.apache.servicecomb.core.invocation.InvocationStageTrace;
 import org.apache.servicecomb.swagger.invocation.Response;
 
-@FilterMeta(name = "schedule")
+@FilterMeta(name = "schedule", invocationType = PRODUCER)
 public class ScheduleFilter implements Filter {
   @Override
   public CompletableFuture<Response> onFilter(Invocation invocation, FilterNode next) {
diff --git a/core/src/main/java/org/apache/servicecomb/core/filter/impl/SimpleLoadBalanceFilter.java b/core/src/main/java/org/apache/servicecomb/core/filter/impl/SimpleLoadBalanceFilter.java
new file mode 100644
index 0000000..557cf1b
--- /dev/null
+++ b/core/src/main/java/org/apache/servicecomb/core/filter/impl/SimpleLoadBalanceFilter.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.core.filter.impl;
+
+import static org.apache.servicecomb.core.exception.ExceptionCodes.LB_ADDRESS_NOT_FOUND;
+import static org.apache.servicecomb.swagger.invocation.InvocationType.CONSUMER;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.servicecomb.core.Endpoint;
+import org.apache.servicecomb.core.Invocation;
+import org.apache.servicecomb.core.exception.Exceptions;
+import org.apache.servicecomb.core.filter.Filter;
+import org.apache.servicecomb.core.filter.FilterMeta;
+import org.apache.servicecomb.core.filter.FilterNode;
+import org.apache.servicecomb.core.handler.impl.SimpleLoadBalanceHandler;
+import org.apache.servicecomb.core.registry.discovery.EndpointDiscoveryFilter;
+import org.apache.servicecomb.foundation.common.cache.VersionedCache;
+import org.apache.servicecomb.foundation.common.concurrent.ConcurrentHashMapEx;
+import org.apache.servicecomb.registry.discovery.DiscoveryContext;
+import org.apache.servicecomb.registry.discovery.DiscoveryFilter;
+import org.apache.servicecomb.registry.discovery.DiscoveryTree;
+import org.apache.servicecomb.swagger.invocation.Response;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * build-in round robin LB, for demo scenes
+ */
+@FilterMeta(name = "simple-load-balance", invocationType = CONSUMER, shareable = false)
+public class SimpleLoadBalanceFilter implements Filter {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SimpleLoadBalanceHandler.class);
+
+  private DiscoveryTree discoveryTree = new DiscoveryTree();
+
+  // key is grouping filter qualified name
+  private volatile Map<String, AtomicInteger> indexMap = new ConcurrentHashMapEx<>();
+
+  public SimpleLoadBalanceFilter() {
+    discoveryTree.loadFromSPI(DiscoveryFilter.class);
+    discoveryTree.addFilter(new EndpointDiscoveryFilter());
+    discoveryTree.sort();
+  }
+
+  @Override
+  public CompletableFuture<Response> onFilter(Invocation invocation, FilterNode nextNode) {
+    if (invocation.getEndpoint() != null) {
+      return nextNode.onFilter(invocation);
+    }
+
+    invocation.setEndpoint(selectEndpoint(invocation));
+    return nextNode.onFilter(invocation);
+  }
+
+  public Endpoint selectEndpoint(Invocation invocation) {
+    DiscoveryContext context = new DiscoveryContext();
+    context.setInputParameters(invocation);
+    VersionedCache endpointsVersionedCache = discoveryTree.discovery(context,
+        invocation.getAppId(),
+        invocation.getMicroserviceName(),
+        invocation.getMicroserviceVersionRule());
+    if (endpointsVersionedCache.isEmpty()) {
+      String msg = "No available address found.";
+      LOGGER.error("{} microserviceName={}, version={}, discoveryGroupName={}",
+          msg,
+          invocation.getMicroserviceName(),
+          invocation.getMicroserviceVersionRule(),
+          endpointsVersionedCache.name());
+      throw Exceptions.consumer(LB_ADDRESS_NOT_FOUND, msg);
+    }
+
+    List<Endpoint> endpoints = endpointsVersionedCache.data();
+    AtomicInteger index = indexMap.computeIfAbsent(endpointsVersionedCache.name(), name -> {
+      LOGGER.info("Create loadBalancer for {}.", name);
+      return new AtomicInteger();
+    });
+    LOGGER.debug("invocation {} use discoveryGroup {}.",
+        invocation.getMicroserviceQualifiedName(),
+        endpointsVersionedCache.name());
+
+    int idx = Math.abs(index.getAndIncrement());
+    idx = idx % endpoints.size();
+    return endpoints.get(idx);
+  }
+}
diff --git a/core/src/main/resources/META-INF/services/org.apache.servicecomb.core.filter.FilterProvider b/core/src/main/resources/META-INF/services/org.apache.servicecomb.core.filter.FilterProvider
new file mode 100644
index 0000000..51f9e53
--- /dev/null
+++ b/core/src/main/resources/META-INF/services/org.apache.servicecomb.core.filter.FilterProvider
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.servicecomb.core.filter.impl.DefaultFilterProvider
\ No newline at end of file
diff --git a/core/src/test/java/org/apache/servicecomb/core/filter/impl/SimpleLoadBalanceFilterTest.java b/core/src/test/java/org/apache/servicecomb/core/filter/impl/SimpleLoadBalanceFilterTest.java
new file mode 100644
index 0000000..589df77
--- /dev/null
+++ b/core/src/test/java/org/apache/servicecomb/core/filter/impl/SimpleLoadBalanceFilterTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.core.filter.impl;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.servicecomb.core.Endpoint;
+import org.apache.servicecomb.core.Invocation;
+import org.apache.servicecomb.core.filter.FilterNode;
+import org.apache.servicecomb.registry.discovery.DiscoveryContext;
+import org.apache.servicecomb.swagger.invocation.Response;
+import org.junit.Test;
+
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mocked;
+import mockit.Verifications;
+
+public class SimpleLoadBalanceFilterTest {
+  SimpleLoadBalanceFilter filter = new SimpleLoadBalanceFilter();
+
+  @Injectable
+  Invocation invocation;
+
+  @Injectable
+  Endpoint endpoint;
+
+  @Mocked
+  DiscoveryContext discoveryContext;
+
+  @Injectable
+  FilterNode nextNode;
+
+  @Test
+  public void should_invoke_next_directly_when_invocation_already_has_endpoint()
+      throws ExecutionException, InterruptedException {
+    Response response = Response.ok("ok");
+    new Expectations() {
+      {
+        invocation.getEndpoint();
+        result = endpoint;
+
+        nextNode.onFilter(invocation);
+        result = CompletableFuture.completedFuture(response);
+      }
+    };
+
+    Response result = filter.onFilter(invocation, nextNode).get();
+
+    assertThat(result).isSameAs(response);
+    new Verifications() {
+      {
+        discoveryContext.setInputParameters(invocation);
+        times = 0;
+      }
+    };
+  }
+}
\ No newline at end of file