You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by wu...@apache.org on 2018/01/09 02:06:45 UTC

[incubator-servicecomb-java-chassis] 01/03: [SCB-165] Service registry not given proper instances

This is an automated email from the ASF dual-hosted git repository.

wujimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-java-chassis.git

commit f980a116f409755c5737d63130a47754afde8432
Author: bao.liu <ba...@huawei.com>
AuthorDate: Thu Jan 4 16:37:01 2018 +0800

    [SCB-165] Service registry not given proper instances
---
 .../definition/loader/DynamicSchemaLoader.java     |  4 ++++
 .../servicecomb/foundation/common/net/IpPort.java  | 22 +++++++++++++++++++++
 .../foundation/common/net/TestIpPort.java          |  4 ++++
 .../serviceregistry/client/IpPortManager.java      | 16 ++++++++-------
 .../client/http/ServiceRegistryClientImpl.java     | 23 +++++++++++++++++++---
 .../consumer/MicroserviceVersions.java             |  3 +++
 .../registry/AbstractServiceRegistry.java          |  2 +-
 .../registry/RemoteServiceRegistry.java            |  9 +++++++--
 8 files changed, 70 insertions(+), 13 deletions(-)

diff --git a/core/src/main/java/io/servicecomb/core/definition/loader/DynamicSchemaLoader.java b/core/src/main/java/io/servicecomb/core/definition/loader/DynamicSchemaLoader.java
index 332b35a..08a3c8d 100644
--- a/core/src/main/java/io/servicecomb/core/definition/loader/DynamicSchemaLoader.java
+++ b/core/src/main/java/io/servicecomb/core/definition/loader/DynamicSchemaLoader.java
@@ -39,7 +39,11 @@ import io.servicecomb.serviceregistry.RegistryUtils;
  *   需要支持在不同的产品中部署为不同的微服务名
  *   微服务名是由环境变量等等方式注入的
  *   此时可以在BootListener中进行注册(必须在producer初始化之前注册契约)
+ *
+ * @Deprecated This class is deprecated because when making calls to a provider, schemas will be downloaded from service enter.
+ * And at provider, schemas will register to service center when starting up.
  */
+@Deprecated
 public class DynamicSchemaLoader {
   private static final Logger LOGGER = LoggerFactory.getLogger(DynamicSchemaLoader.class);
 
diff --git a/foundations/foundation-common/src/main/java/io/servicecomb/foundation/common/net/IpPort.java b/foundations/foundation-common/src/main/java/io/servicecomb/foundation/common/net/IpPort.java
index 601b063..15dee9d 100644
--- a/foundations/foundation-common/src/main/java/io/servicecomb/foundation/common/net/IpPort.java
+++ b/foundations/foundation-common/src/main/java/io/servicecomb/foundation/common/net/IpPort.java
@@ -53,6 +53,28 @@ public class IpPort {
     this.port = port;
   }
 
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    IpPort ipPort = (IpPort) o;
+
+    if (port != ipPort.port) {
+      return false;
+    }
+    return hostOrIp.equals(ipPort.hostOrIp);
+  }
+
+  @Override
+  public String toString() {
+    return hostOrIp + ":" + port;
+  }
+
   public InetSocketAddress getSocketAddress() {
     if (socketAddress == null) {
       synchronized (lock) {
diff --git a/foundations/foundation-common/src/test/java/io/servicecomb/foundation/common/net/TestIpPort.java b/foundations/foundation-common/src/test/java/io/servicecomb/foundation/common/net/TestIpPort.java
index 21dcda7..a6dbdcf 100644
--- a/foundations/foundation-common/src/test/java/io/servicecomb/foundation/common/net/TestIpPort.java
+++ b/foundations/foundation-common/src/test/java/io/servicecomb/foundation/common/net/TestIpPort.java
@@ -30,5 +30,9 @@ public class TestIpPort {
     Assert.assertEquals(inst1.getHostOrIp(), inst2.getHostOrIp());
     Assert.assertEquals(inst1.getPort(), inst2.getPort());
     Assert.assertEquals(inst1.getSocketAddress().getHostName(), inst2.getSocketAddress().getHostName());
+    Assert.assertEquals(inst1, inst1);
+    Assert.assertEquals(inst1, inst2);
+    Assert.assertEquals(inst1.toString(), "localhost:3333");
+    Assert.assertNotEquals(inst1, new Object());
   }
 }
diff --git a/service-registry/src/main/java/io/servicecomb/serviceregistry/client/IpPortManager.java b/service-registry/src/main/java/io/servicecomb/serviceregistry/client/IpPortManager.java
index f6a681a..bd34471 100644
--- a/service-registry/src/main/java/io/servicecomb/serviceregistry/client/IpPortManager.java
+++ b/service-registry/src/main/java/io/servicecomb/serviceregistry/client/IpPortManager.java
@@ -19,6 +19,7 @@ package io.servicecomb.serviceregistry.client;
 
 import static io.servicecomb.serviceregistry.api.Const.REGISTRY_APP_ID;
 import static io.servicecomb.serviceregistry.api.Const.REGISTRY_SERVICE_NAME;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
@@ -46,10 +47,10 @@ public class IpPortManager {
 
   private ArrayList<IpPort> defaultIpPort;
 
-  private InstanceCache instanceCache = null;
-
   private AtomicInteger currentAvailableIndex;
 
+  private boolean autoDiscoveryInited = false;
+
   public IpPortManager(ServiceRegistryConfig serviceRegistryConfig, InstanceCacheManager instanceCacheManager) {
     this.serviceRegistryConfig = serviceRegistryConfig;
     this.instanceCacheManager = instanceCacheManager;
@@ -65,10 +66,11 @@ public class IpPortManager {
 
   // we have to do this operation after the first time setup has already done
   public void initAutoDiscovery() {
-    if (this.serviceRegistryConfig.isRegistryAutoDiscovery()) {
-      instanceCache = instanceCacheManager.getOrCreate(REGISTRY_APP_ID,
+    if (!autoDiscoveryInited && this.serviceRegistryConfig.isRegistryAutoDiscovery()) {
+      instanceCacheManager.getOrCreate(REGISTRY_APP_ID,
           REGISTRY_SERVICE_NAME,
           DefinitionConst.VERSION_RULE_LATEST);
+      autoDiscoveryInited = true;
     }
   }
 
@@ -100,10 +102,10 @@ public class IpPortManager {
   }
 
   private List<CacheEndpoint> getDiscoveredIpPort() {
-    if (instanceCache == null) {
-      return new ArrayList<>(0);
+    if (!autoDiscoveryInited || !this.serviceRegistryConfig.isRegistryAutoDiscovery()) {
+      return null;
     }
-    instanceCache = instanceCacheManager.getOrCreate(REGISTRY_APP_ID,
+    InstanceCache instanceCache = instanceCacheManager.getOrCreate(REGISTRY_APP_ID,
         REGISTRY_SERVICE_NAME,
         DefinitionConst.VERSION_RULE_LATEST);
     return instanceCache.getOrCreateTransportMap().get(defaultTransport);
diff --git a/service-registry/src/main/java/io/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java b/service-registry/src/main/java/io/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java
index 53ffe5c..46bddd1 100644
--- a/service-registry/src/main/java/io/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java
+++ b/service-registry/src/main/java/io/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java
@@ -63,7 +63,7 @@ import io.vertx.core.http.HttpClientResponse;
 
 public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
   private static final Logger LOGGER = LoggerFactory.getLogger(ServiceRegistryClientImpl.class);
-
+  private static final int MAX_ITERATE = 3;
   private IpPortManager ipPortManager;
 
   // key是本进程的微服务id和服务管理中心的id
@@ -80,11 +80,27 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
 
   private void retry(RequestContext requestContext, Handler<RestResponse> responseHandler) {
     LOGGER.warn("invoke service [{}] failed, retry.", requestContext.getUri());
-    requestContext.setIpPort(ipPortManager.getAvailableAddress(true));
+    requestContext.setIpPort(getRetryAddress(requestContext.getIpPort()));
     requestContext.setRetry(true);
     RestUtils.httpDo(requestContext, responseHandler);
   }
 
+  // try a different address if it is possible when retrying
+  private IpPort getRetryAddress(IpPort currentIpPort) {
+    int retry = MAX_ITERATE;
+    IpPort nextIpPort = ipPortManager.getAvailableAddress(true);
+    while (retry > 0) {
+      if (currentIpPort.equals(nextIpPort)) {
+        nextIpPort = ipPortManager.getAvailableAddress(true);
+      } else {
+        break;
+      }
+      retry--;
+    }
+    LOGGER.info("Retry. Current address is {} and trying {}", currentIpPort.toString(), nextIpPort.toString());
+    return nextIpPort;
+  }
+
   @SuppressWarnings("unchecked")
   private <T> Handler<RestResponse> syncHandler(CountDownLatch countDownLatch, Class<T> cls,
       Holder<T> holder) {
@@ -111,7 +127,8 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
               holder.value =
                   JsonUtils.readValue(bodyBuffer.getBytes(), cls);
             } catch (Exception e) {
-              LOGGER.warn(bodyBuffer.toString());
+              LOGGER.warn("read value failed and response message is {}",
+                  bodyBuffer.toString());
             }
             countDownLatch.countDown();
           });
diff --git a/service-registry/src/main/java/io/servicecomb/serviceregistry/consumer/MicroserviceVersions.java b/service-registry/src/main/java/io/servicecomb/serviceregistry/consumer/MicroserviceVersions.java
index 48bcfaa..1dadb89 100644
--- a/service-registry/src/main/java/io/servicecomb/serviceregistry/consumer/MicroserviceVersions.java
+++ b/service-registry/src/main/java/io/servicecomb/serviceregistry/consumer/MicroserviceVersions.java
@@ -104,6 +104,9 @@ public class MicroserviceVersions {
         microserviceName,
         DefinitionConst.VERSION_RULE_ALL);
     if (pulledInstances == null) {
+      // exception happens and try pull again later.
+      pendingPullCount.incrementAndGet();
+      appManager.getEventBus().post(new PullMicroserviceVersionsInstancesEvent(this, TimeUnit.SECONDS.toMillis(1)));
       return;
     }
 
diff --git a/service-registry/src/main/java/io/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java b/service-registry/src/main/java/io/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java
index 27e4d4d..c91a20b 100644
--- a/service-registry/src/main/java/io/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java
+++ b/service-registry/src/main/java/io/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java
@@ -207,7 +207,7 @@ public abstract class AbstractServiceRegistry implements ServiceRegistry {
         serviceName,
         versionRule);
     if (instances == null) {
-      LOGGER.error("find empty instances from service center. service={}/{}/{}", appId, serviceName, versionRule);
+      LOGGER.error("Can not find any instances from service center due to previous errors. service={}/{}/{}", appId, serviceName, versionRule);
       return null;
     }
 
diff --git a/service-registry/src/main/java/io/servicecomb/serviceregistry/registry/RemoteServiceRegistry.java b/service-registry/src/main/java/io/servicecomb/serviceregistry/registry/RemoteServiceRegistry.java
index 5b0a3b0..5d6be5a 100644
--- a/service-registry/src/main/java/io/servicecomb/serviceregistry/registry/RemoteServiceRegistry.java
+++ b/service-registry/src/main/java/io/servicecomb/serviceregistry/registry/RemoteServiceRegistry.java
@@ -32,6 +32,7 @@ import io.servicecomb.serviceregistry.client.ServiceRegistryClient;
 import io.servicecomb.serviceregistry.client.http.ServiceRegistryClientImpl;
 import io.servicecomb.serviceregistry.config.ServiceRegistryConfig;
 import io.servicecomb.serviceregistry.definition.MicroserviceDefinition;
+import io.servicecomb.serviceregistry.task.MicroserviceRegisterTask;
 import io.servicecomb.serviceregistry.task.event.PeriodicPullEvent;
 import io.servicecomb.serviceregistry.task.event.PullMicroserviceVersionsInstancesEvent;
 import io.servicecomb.serviceregistry.task.event.ShutdownEvent;
@@ -77,8 +78,6 @@ public class RemoteServiceRegistry extends AbstractServiceRegistry {
   public void run() {
     super.run();
 
-    ipPortManager.initAutoDiscovery();
-
     taskPool.scheduleAtFixedRate(serviceCenterTask,
         serviceRegistryConfig.getHeartbeatInterval(),
         serviceRegistryConfig.getHeartbeatInterval(),
@@ -101,6 +100,12 @@ public class RemoteServiceRegistry extends AbstractServiceRegistry {
     taskPool.schedule(event.getMicroserviceVersions()::pullInstances, event.getMsDelay(), TimeUnit.MILLISECONDS);
   }
 
+  @Subscribe
+  public void onMicroserviceRegistryTask(MicroserviceRegisterTask event) {
+    if (event.isRegistered()) {
+      ipPortManager.initAutoDiscovery();
+    }
+  }
   // for testing
   ScheduledThreadPoolExecutor getTaskPool() {
     return this.taskPool;

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.