You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by wu...@apache.org on 2018/01/09 02:06:44 UTC

[incubator-servicecomb-java-chassis] branch master updated (da3addb -> e812ad7)

This is an automated email from the ASF dual-hosted git repository.

wujimin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-java-chassis.git.


    from da3addb  [SCB-195] improve check method in unit test
     new f980a11  [SCB-165] Service registry not given proper instances
     new 6805402  modify when concurrent call happens do not choose the correct address in short time but with some probability
     new e812ad7  fix concurrent access breaks

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../definition/loader/DynamicSchemaLoader.java     |  4 ++
 .../servicecomb/foundation/common/net/IpPort.java  | 22 ++++++++++
 .../foundation/common/net/TestIpPort.java          |  4 ++
 .../serviceregistry/client/IpPortManager.java      | 50 ++++++++++++----------
 .../client/http/ServiceRegistryClientImpl.java     | 44 +++++++++----------
 .../consumer/MicroserviceVersions.java             |  3 ++
 .../registry/AbstractServiceRegistry.java          |  2 +-
 .../registry/RemoteServiceRegistry.java            |  9 +++-
 .../serviceregistry/client/TestIpPortManager.java  | 23 +++++-----
 .../client/http/TestClientHttp.java                |  6 +--
 10 files changed, 103 insertions(+), 64 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
['"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>'].

[incubator-servicecomb-java-chassis] 02/03: modify when concurrent call happens do not choose the correct address in short time but with some probability

Posted by wu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wujimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-java-chassis.git

commit 6805402ab603738962e78214ccc89be7aedefa3b
Author: bao.liu <ba...@huawei.com>
AuthorDate: Fri Jan 5 15:26:42 2018 +0800

    modify when concurrent call happens do not choose the correct address in short time but with some probability
---
 .../serviceregistry/client/IpPortManager.java      | 33 ++++++------
 .../client/http/ServiceRegistryClientImpl.java     | 59 ++++++++--------------
 .../serviceregistry/client/TestIpPortManager.java  | 23 ++++-----
 .../client/http/TestClientHttp.java                |  6 +--
 4 files changed, 51 insertions(+), 70 deletions(-)

diff --git a/service-registry/src/main/java/io/servicecomb/serviceregistry/client/IpPortManager.java b/service-registry/src/main/java/io/servicecomb/serviceregistry/client/IpPortManager.java
index bd34471..12b4a00 100644
--- a/service-registry/src/main/java/io/servicecomb/serviceregistry/client/IpPortManager.java
+++ b/service-registry/src/main/java/io/servicecomb/serviceregistry/client/IpPortManager.java
@@ -74,31 +74,32 @@ public class IpPortManager {
     }
   }
 
-  public IpPort getAvailableAddress(boolean invalidate) {
-    int index;
-    if (invalidate) {
-      index = currentAvailableIndex.incrementAndGet();
-    } else {
-      index = currentAvailableIndex.get();
+  public IpPort getNextAvailableAddress(IpPort failedIpPort) {
+    IpPort current = getAvailableAddress();
+    if (current.equals(failedIpPort)) {
+      currentAvailableIndex.incrementAndGet();
+      current = getAvailableAddress();
     }
 
+    LOGGER.info("Change service center address from {} to {}", failedIpPort.toString(), current.toString());
+    return current;
+  }
+
+  public IpPort getAvailableAddress() {
+    return getAvailableAddress(currentAvailableIndex.get());
+  }
+
+  private IpPort getAvailableAddress(int index) {
     if (index < defaultIpPort.size()) {
-      return returnWithLog(invalidate, defaultIpPort.get(index));
+      return defaultIpPort.get(index);
     }
     List<CacheEndpoint> endpoints = getDiscoveredIpPort();
     if (endpoints == null || (index >= defaultIpPort.size() + endpoints.size())) {
       currentAvailableIndex.set(0);
-      return returnWithLog(invalidate, defaultIpPort.get(0));
+      return defaultIpPort.get(0);
     }
     CacheEndpoint nextEndpoint = endpoints.get(index - defaultIpPort.size());
-    return returnWithLog(invalidate, new URIEndpointObject(nextEndpoint.getEndpoint()));
-  }
-
-  private IpPort returnWithLog(boolean needLog, IpPort result) {
-    if (needLog) {
-      LOGGER.info("Using next service center address {}:{}.", result.getHostOrIp(), result.getPort());
-    }
-    return result;
+    return new URIEndpointObject(nextEndpoint.getEndpoint());
   }
 
   private List<CacheEndpoint> getDiscoveredIpPort() {
diff --git a/service-registry/src/main/java/io/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java b/service-registry/src/main/java/io/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java
index 46bddd1..3a5e84f 100644
--- a/service-registry/src/main/java/io/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java
+++ b/service-registry/src/main/java/io/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java
@@ -50,8 +50,8 @@ import io.servicecomb.serviceregistry.api.response.GetInstancesResponse;
 import io.servicecomb.serviceregistry.api.response.GetSchemaResponse;
 import io.servicecomb.serviceregistry.api.response.GetServiceResponse;
 import io.servicecomb.serviceregistry.api.response.HeartbeatResponse;
-import io.servicecomb.serviceregistry.api.response.MicroserviceInstanceResponse;
 import io.servicecomb.serviceregistry.api.response.MicroserviceInstanceChangedEvent;
+import io.servicecomb.serviceregistry.api.response.MicroserviceInstanceResponse;
 import io.servicecomb.serviceregistry.api.response.RegisterInstanceResponse;
 import io.servicecomb.serviceregistry.client.ClientException;
 import io.servicecomb.serviceregistry.client.IpPortManager;
@@ -63,7 +63,7 @@ import io.vertx.core.http.HttpClientResponse;
 
 public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
   private static final Logger LOGGER = LoggerFactory.getLogger(ServiceRegistryClientImpl.class);
-  private static final int MAX_ITERATE = 3;
+
   private IpPortManager ipPortManager;
 
   // key是本进程的微服务id和服务管理中心的id
@@ -80,27 +80,11 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
 
   private void retry(RequestContext requestContext, Handler<RestResponse> responseHandler) {
     LOGGER.warn("invoke service [{}] failed, retry.", requestContext.getUri());
-    requestContext.setIpPort(getRetryAddress(requestContext.getIpPort()));
+    requestContext.setIpPort(ipPortManager.getNextAvailableAddress(requestContext.getIpPort()));
     requestContext.setRetry(true);
     RestUtils.httpDo(requestContext, responseHandler);
   }
 
-  // try a different address if it is possible when retrying
-  private IpPort getRetryAddress(IpPort currentIpPort) {
-    int retry = MAX_ITERATE;
-    IpPort nextIpPort = ipPortManager.getAvailableAddress(true);
-    while (retry > 0) {
-      if (currentIpPort.equals(nextIpPort)) {
-        nextIpPort = ipPortManager.getAvailableAddress(true);
-      } else {
-        break;
-      }
-      retry--;
-    }
-    LOGGER.info("Retry. Current address is {} and trying {}", currentIpPort.toString(), nextIpPort.toString());
-    return nextIpPort;
-  }
-
   @SuppressWarnings("unchecked")
   private <T> Handler<RestResponse> syncHandler(CountDownLatch countDownLatch, Class<T> cls,
       Holder<T> holder) {
@@ -154,7 +138,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
         } else {
           countDownLatch.countDown();
         }
-        
+
         return;
       }
 
@@ -171,7 +155,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
   @Override
   public List<Microservice> getAllMicroservices() {
     Holder<GetAllServicesResponse> holder = new Holder<>();
-    IpPort ipPort = ipPortManager.getAvailableAddress(false);
+    IpPort ipPort = ipPortManager.getAvailableAddress();
 
     CountDownLatch countDownLatch = new CountDownLatch(1);
     RestUtils.get(ipPort,
@@ -192,7 +176,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
   @Override
   public String getMicroserviceId(String appId, String microserviceName, String versionRule) {
     Holder<GetExistenceResponse> holder = new Holder<>();
-    IpPort ipPort = ipPortManager.getAvailableAddress(false);
+    IpPort ipPort = ipPortManager.getAvailableAddress();
 
     CountDownLatch countDownLatch = new CountDownLatch(1);
     RestUtils.get(ipPort,
@@ -220,7 +204,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
   @Override
   public boolean isSchemaExist(String microserviceId, String schemaId) {
     Holder<GetExistenceResponse> holder = new Holder<>();
-    IpPort ipPort = ipPortManager.getAvailableAddress(false);
+    IpPort ipPort = ipPortManager.getAvailableAddress();
 
     CountDownLatch countDownLatch = new CountDownLatch(1);
     RestUtils.get(ipPort,
@@ -243,7 +227,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
   @Override
   public boolean registerSchema(String microserviceId, String schemaId, String schemaContent) {
     Holder<ResponseWrapper> holder = new Holder<>();
-    IpPort ipPort = ipPortManager.getAvailableAddress(false);
+    IpPort ipPort = ipPortManager.getAvailableAddress();
 
     try {
       CreateSchemaRequest request = new CreateSchemaRequest();
@@ -288,7 +272,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
   @Override
   public String getSchema(String microserviceId, String schemaId) {
     Holder<GetSchemaResponse> holder = new Holder<>();
-    IpPort ipPort = ipPortManager.getAvailableAddress(false);
+    IpPort ipPort = ipPortManager.getAvailableAddress();
 
     CountDownLatch countDownLatch = new CountDownLatch(1);
     RestUtils.get(ipPort,
@@ -313,7 +297,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
   @Override
   public String registerMicroservice(Microservice microservice) {
     Holder<CreateServiceResponse> holder = new Holder<>();
-    IpPort ipPort = ipPortManager.getAvailableAddress(false);
+    IpPort ipPort = ipPortManager.getAvailableAddress();
     try {
       CreateServiceRequest request = new CreateServiceRequest();
       request.setService(microservice);
@@ -345,7 +329,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
   @Override
   public Microservice getMicroservice(String microserviceId) {
     Holder<GetServiceResponse> holder = new Holder<>();
-    IpPort ipPort = ipPortManager.getAvailableAddress(false);
+    IpPort ipPort = ipPortManager.getAvailableAddress();
 
     CountDownLatch countDownLatch = new CountDownLatch(1);
     RestUtils.get(ipPort,
@@ -366,7 +350,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
   @Override
   public String registerMicroserviceInstance(MicroserviceInstance instance) {
     Holder<RegisterInstanceResponse> holder = new Holder<>();
-    IpPort ipPort = ipPortManager.getAvailableAddress(false);
+    IpPort ipPort = ipPortManager.getAvailableAddress();
 
     try {
       RegisterInstanceRequest request = new RegisterInstanceRequest();
@@ -395,7 +379,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
   @Override
   public List<MicroserviceInstance> getMicroserviceInstance(String consumerId, String providerId) {
     Holder<GetInstancesResponse> holder = new Holder<>();
-    IpPort ipPort = ipPortManager.getAvailableAddress(false);
+    IpPort ipPort = ipPortManager.getAvailableAddress();
 
     CountDownLatch countDownLatch = new CountDownLatch(1);
     RestUtils.get(ipPort,
@@ -416,7 +400,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
   @Override
   public boolean unregisterMicroserviceInstance(String microserviceId, String microserviceInstanceId) {
     Holder<HttpClientResponse> holder = new Holder<>();
-    IpPort ipPort = ipPortManager.getAvailableAddress(false);
+    IpPort ipPort = ipPortManager.getAvailableAddress();
 
     CountDownLatch countDownLatch = new CountDownLatch(1);
     RestUtils.delete(ipPort,
@@ -443,7 +427,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
   @Override
   public HeartbeatResponse heartbeat(String microserviceId, String microserviceInstanceId) {
     Holder<HttpClientResponse> holder = new Holder<>();
-    IpPort ipPort = ipPortManager.getAvailableAddress(false);
+    IpPort ipPort = ipPortManager.getAvailableAddress();
 
     CountDownLatch countDownLatch = new CountDownLatch(1);
     RestUtils.put(ipPort,
@@ -489,7 +473,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
 
           String url = String.format(Const.REGISTRY_API.MICROSERVICE_WATCH, selfMicroserviceId);
 
-          IpPort ipPort = ipPortManager.getAvailableAddress(false);
+          IpPort ipPort = ipPortManager.getAvailableAddress();
           WebsocketUtils.open(ipPort, url, o -> {
             onOpen.success(o);
             LOGGER.info(
@@ -536,7 +520,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
   public List<MicroserviceInstance> findServiceInstance(String consumerId, String appId, String serviceName,
       String versionRule) {
     Holder<FindInstancesResponse> holder = new Holder<>();
-    IpPort ipPort = ipPortManager.getAvailableAddress(false);
+    IpPort ipPort = ipPortManager.getAvailableAddress();
 
     CountDownLatch countDownLatch = new CountDownLatch(1);
     RestUtils.get(ipPort,
@@ -579,7 +563,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
   @Override
   public boolean updateMicroserviceProperties(String microserviceId, Map<String, String> serviceProperties) {
     Holder<HttpClientResponse> holder = new Holder<>();
-    IpPort ipPort = ipPortManager.getAvailableAddress(false);
+    IpPort ipPort = ipPortManager.getAvailableAddress();
 
     try {
       UpdatePropertiesRequest request = new UpdatePropertiesRequest();
@@ -615,7 +599,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
   public boolean updateInstanceProperties(String microserviceId, String microserviceInstanceId,
       Map<String, String> instanceProperties) {
     Holder<HttpClientResponse> holder = new Holder<>();
-    IpPort ipPort = ipPortManager.getAvailableAddress(false);
+    IpPort ipPort = ipPortManager.getAvailableAddress();
 
     try {
       UpdatePropertiesRequest request = new UpdatePropertiesRequest();
@@ -653,7 +637,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
   public MicroserviceInstance findServiceInstance(String serviceId, String instanceId) {
     try {
       Holder<MicroserviceInstanceResponse> holder = new Holder<>();
-      IpPort ipPort = ipPortManager.getAvailableAddress(false);
+      IpPort ipPort = ipPortManager.getAvailableAddress();
       CountDownLatch countDownLatch = new CountDownLatch(1);
       RestUtils.get(ipPort,
           String.format(Const.REGISTRY_API.MICROSERVICE_INSTANCE_OPERATION_ONE, serviceId, instanceId),
@@ -668,8 +652,5 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
       LOGGER.error("get instance from sc failed");
       return null;
     }
-
   }
-  
-  
 }
diff --git a/service-registry/src/test/java/io/servicecomb/serviceregistry/client/TestIpPortManager.java b/service-registry/src/test/java/io/servicecomb/serviceregistry/client/TestIpPortManager.java
index bdd877a..016be35 100644
--- a/service-registry/src/test/java/io/servicecomb/serviceregistry/client/TestIpPortManager.java
+++ b/service-registry/src/test/java/io/servicecomb/serviceregistry/client/TestIpPortManager.java
@@ -80,9 +80,9 @@ public class TestIpPortManager {
     };
 
     IpPortManager manager = new IpPortManager(config, cacheManager);
-    IpPort address1 = manager.getAvailableAddress(false);
+    IpPort address1 = manager.getAvailableAddress();
 
-    if(address1.getPort() == 9980) {
+    if (address1.getPort() == 9980) {
       Assert.assertEquals("127.0.0.1", address1.getHostOrIp());
       Assert.assertEquals(9980, address1.getPort());
     } else {
@@ -90,8 +90,8 @@ public class TestIpPortManager {
       Assert.assertEquals(9981, address1.getPort());
     }
 
-    IpPort address2 = manager.getAvailableAddress(true);
-    if(address1.getPort() == 9980) {
+    IpPort address2 = manager.getNextAvailableAddress(address1);
+    if (address1.getPort() == 9980) {
       Assert.assertEquals("127.0.0.1", address2.getHostOrIp());
       Assert.assertEquals(9981, address2.getPort());
     } else {
@@ -99,8 +99,8 @@ public class TestIpPortManager {
       Assert.assertEquals(9980, address2.getPort());
     }
 
-    IpPort address3 = manager.getAvailableAddress(false);
-    if(address1.getPort() == 9980) {
+    IpPort address3 = manager.getAvailableAddress();
+    if (address1.getPort() == 9980) {
       Assert.assertEquals("127.0.0.1", address3.getHostOrIp());
       Assert.assertEquals(9981, address3.getPort());
     } else {
@@ -120,19 +120,19 @@ public class TestIpPortManager {
         result = addresses;
       }
     };
-    
+
     manager.initAutoDiscovery();
-    IpPort address4 = manager.getAvailableAddress(true);
-    if(address1.getPort() == 9980) {
+    IpPort address4 = manager.getNextAvailableAddress(address3);
+    if (address1.getPort() == 9980) {
       Assert.assertEquals("127.0.0.1", address4.getHostOrIp());
       Assert.assertEquals(9982, address4.getPort());
     } else {
-      address4 = manager.getAvailableAddress(true);
+      address4 = manager.getNextAvailableAddress(address1);
       Assert.assertEquals("127.0.0.1", address4.getHostOrIp());
       Assert.assertEquals(9982, address4.getPort());
     }
 
-    IpPort address5 = manager.getAvailableAddress(true);
+    IpPort address5 = manager.getNextAvailableAddress(address4);
     Assert.assertEquals("127.0.0.1", address5.getHostOrIp());
     Assert.assertEquals(9980, address5.getPort());
   }
@@ -153,5 +153,4 @@ public class TestIpPortManager {
       }
     };
   }
-
 }
diff --git a/service-registry/src/test/java/io/servicecomb/serviceregistry/client/http/TestClientHttp.java b/service-registry/src/test/java/io/servicecomb/serviceregistry/client/http/TestClientHttp.java
index 29f2c98..283b0df 100644
--- a/service-registry/src/test/java/io/servicecomb/serviceregistry/client/http/TestClientHttp.java
+++ b/service-registry/src/test/java/io/servicecomb/serviceregistry/client/http/TestClientHttp.java
@@ -45,7 +45,7 @@ public class TestClientHttp {
     IpPort ipPort = new IpPort("127.0.0.1", 8853);
     new Expectations() {
       {
-        manager.getAvailableAddress(false);
+        manager.getAvailableAddress();
         result = ipPort;
       }
     };
@@ -143,7 +143,7 @@ public class TestClientHttp {
   @Test
   public void testIpPortManager(@Mocked InstanceCacheManager instanceCacheManager) throws Exception {
     IpPortManager oManager = new IpPortManager(ServiceRegistryConfig.INSTANCE, instanceCacheManager);
-    IpPort oIPPort = oManager.getAvailableAddress(true);
-    Assert.assertEquals(oIPPort.getHostOrIp(), oManager.getAvailableAddress(false).getHostOrIp());
+    IpPort oIPPort = oManager.getNextAvailableAddress(new IpPort("", 33));
+    Assert.assertEquals(oIPPort.getHostOrIp(), oManager.getAvailableAddress().getHostOrIp());
   }
 }

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-java-chassis] 01/03: [SCB-165] Service registry not given proper instances

Posted by wu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wujimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-java-chassis.git

commit f980a116f409755c5737d63130a47754afde8432
Author: bao.liu <ba...@huawei.com>
AuthorDate: Thu Jan 4 16:37:01 2018 +0800

    [SCB-165] Service registry not given proper instances
---
 .../definition/loader/DynamicSchemaLoader.java     |  4 ++++
 .../servicecomb/foundation/common/net/IpPort.java  | 22 +++++++++++++++++++++
 .../foundation/common/net/TestIpPort.java          |  4 ++++
 .../serviceregistry/client/IpPortManager.java      | 16 ++++++++-------
 .../client/http/ServiceRegistryClientImpl.java     | 23 +++++++++++++++++++---
 .../consumer/MicroserviceVersions.java             |  3 +++
 .../registry/AbstractServiceRegistry.java          |  2 +-
 .../registry/RemoteServiceRegistry.java            |  9 +++++++--
 8 files changed, 70 insertions(+), 13 deletions(-)

diff --git a/core/src/main/java/io/servicecomb/core/definition/loader/DynamicSchemaLoader.java b/core/src/main/java/io/servicecomb/core/definition/loader/DynamicSchemaLoader.java
index 332b35a..08a3c8d 100644
--- a/core/src/main/java/io/servicecomb/core/definition/loader/DynamicSchemaLoader.java
+++ b/core/src/main/java/io/servicecomb/core/definition/loader/DynamicSchemaLoader.java
@@ -39,7 +39,11 @@ import io.servicecomb.serviceregistry.RegistryUtils;
  *   需要支持在不同的产品中部署为不同的微服务名
  *   微服务名是由环境变量等等方式注入的
  *   此时可以在BootListener中进行注册(必须在producer初始化之前注册契约)
+ *
+ * @Deprecated This class is deprecated because when making calls to a provider, schemas will be downloaded from service enter.
+ * And at provider, schemas will register to service center when starting up.
  */
+@Deprecated
 public class DynamicSchemaLoader {
   private static final Logger LOGGER = LoggerFactory.getLogger(DynamicSchemaLoader.class);
 
diff --git a/foundations/foundation-common/src/main/java/io/servicecomb/foundation/common/net/IpPort.java b/foundations/foundation-common/src/main/java/io/servicecomb/foundation/common/net/IpPort.java
index 601b063..15dee9d 100644
--- a/foundations/foundation-common/src/main/java/io/servicecomb/foundation/common/net/IpPort.java
+++ b/foundations/foundation-common/src/main/java/io/servicecomb/foundation/common/net/IpPort.java
@@ -53,6 +53,28 @@ public class IpPort {
     this.port = port;
   }
 
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    IpPort ipPort = (IpPort) o;
+
+    if (port != ipPort.port) {
+      return false;
+    }
+    return hostOrIp.equals(ipPort.hostOrIp);
+  }
+
+  @Override
+  public String toString() {
+    return hostOrIp + ":" + port;
+  }
+
   public InetSocketAddress getSocketAddress() {
     if (socketAddress == null) {
       synchronized (lock) {
diff --git a/foundations/foundation-common/src/test/java/io/servicecomb/foundation/common/net/TestIpPort.java b/foundations/foundation-common/src/test/java/io/servicecomb/foundation/common/net/TestIpPort.java
index 21dcda7..a6dbdcf 100644
--- a/foundations/foundation-common/src/test/java/io/servicecomb/foundation/common/net/TestIpPort.java
+++ b/foundations/foundation-common/src/test/java/io/servicecomb/foundation/common/net/TestIpPort.java
@@ -30,5 +30,9 @@ public class TestIpPort {
     Assert.assertEquals(inst1.getHostOrIp(), inst2.getHostOrIp());
     Assert.assertEquals(inst1.getPort(), inst2.getPort());
     Assert.assertEquals(inst1.getSocketAddress().getHostName(), inst2.getSocketAddress().getHostName());
+    Assert.assertEquals(inst1, inst1);
+    Assert.assertEquals(inst1, inst2);
+    Assert.assertEquals(inst1.toString(), "localhost:3333");
+    Assert.assertNotEquals(inst1, new Object());
   }
 }
diff --git a/service-registry/src/main/java/io/servicecomb/serviceregistry/client/IpPortManager.java b/service-registry/src/main/java/io/servicecomb/serviceregistry/client/IpPortManager.java
index f6a681a..bd34471 100644
--- a/service-registry/src/main/java/io/servicecomb/serviceregistry/client/IpPortManager.java
+++ b/service-registry/src/main/java/io/servicecomb/serviceregistry/client/IpPortManager.java
@@ -19,6 +19,7 @@ package io.servicecomb.serviceregistry.client;
 
 import static io.servicecomb.serviceregistry.api.Const.REGISTRY_APP_ID;
 import static io.servicecomb.serviceregistry.api.Const.REGISTRY_SERVICE_NAME;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
@@ -46,10 +47,10 @@ public class IpPortManager {
 
   private ArrayList<IpPort> defaultIpPort;
 
-  private InstanceCache instanceCache = null;
-
   private AtomicInteger currentAvailableIndex;
 
+  private boolean autoDiscoveryInited = false;
+
   public IpPortManager(ServiceRegistryConfig serviceRegistryConfig, InstanceCacheManager instanceCacheManager) {
     this.serviceRegistryConfig = serviceRegistryConfig;
     this.instanceCacheManager = instanceCacheManager;
@@ -65,10 +66,11 @@ public class IpPortManager {
 
   // we have to do this operation after the first time setup has already done
   public void initAutoDiscovery() {
-    if (this.serviceRegistryConfig.isRegistryAutoDiscovery()) {
-      instanceCache = instanceCacheManager.getOrCreate(REGISTRY_APP_ID,
+    if (!autoDiscoveryInited && this.serviceRegistryConfig.isRegistryAutoDiscovery()) {
+      instanceCacheManager.getOrCreate(REGISTRY_APP_ID,
           REGISTRY_SERVICE_NAME,
           DefinitionConst.VERSION_RULE_LATEST);
+      autoDiscoveryInited = true;
     }
   }
 
@@ -100,10 +102,10 @@ public class IpPortManager {
   }
 
   private List<CacheEndpoint> getDiscoveredIpPort() {
-    if (instanceCache == null) {
-      return new ArrayList<>(0);
+    if (!autoDiscoveryInited || !this.serviceRegistryConfig.isRegistryAutoDiscovery()) {
+      return null;
     }
-    instanceCache = instanceCacheManager.getOrCreate(REGISTRY_APP_ID,
+    InstanceCache instanceCache = instanceCacheManager.getOrCreate(REGISTRY_APP_ID,
         REGISTRY_SERVICE_NAME,
         DefinitionConst.VERSION_RULE_LATEST);
     return instanceCache.getOrCreateTransportMap().get(defaultTransport);
diff --git a/service-registry/src/main/java/io/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java b/service-registry/src/main/java/io/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java
index 53ffe5c..46bddd1 100644
--- a/service-registry/src/main/java/io/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java
+++ b/service-registry/src/main/java/io/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java
@@ -63,7 +63,7 @@ import io.vertx.core.http.HttpClientResponse;
 
 public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
   private static final Logger LOGGER = LoggerFactory.getLogger(ServiceRegistryClientImpl.class);
-
+  private static final int MAX_ITERATE = 3;
   private IpPortManager ipPortManager;
 
   // key是本进程的微服务id和服务管理中心的id
@@ -80,11 +80,27 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
 
   private void retry(RequestContext requestContext, Handler<RestResponse> responseHandler) {
     LOGGER.warn("invoke service [{}] failed, retry.", requestContext.getUri());
-    requestContext.setIpPort(ipPortManager.getAvailableAddress(true));
+    requestContext.setIpPort(getRetryAddress(requestContext.getIpPort()));
     requestContext.setRetry(true);
     RestUtils.httpDo(requestContext, responseHandler);
   }
 
+  // try a different address if it is possible when retrying
+  private IpPort getRetryAddress(IpPort currentIpPort) {
+    int retry = MAX_ITERATE;
+    IpPort nextIpPort = ipPortManager.getAvailableAddress(true);
+    while (retry > 0) {
+      if (currentIpPort.equals(nextIpPort)) {
+        nextIpPort = ipPortManager.getAvailableAddress(true);
+      } else {
+        break;
+      }
+      retry--;
+    }
+    LOGGER.info("Retry. Current address is {} and trying {}", currentIpPort.toString(), nextIpPort.toString());
+    return nextIpPort;
+  }
+
   @SuppressWarnings("unchecked")
   private <T> Handler<RestResponse> syncHandler(CountDownLatch countDownLatch, Class<T> cls,
       Holder<T> holder) {
@@ -111,7 +127,8 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
               holder.value =
                   JsonUtils.readValue(bodyBuffer.getBytes(), cls);
             } catch (Exception e) {
-              LOGGER.warn(bodyBuffer.toString());
+              LOGGER.warn("read value failed and response message is {}",
+                  bodyBuffer.toString());
             }
             countDownLatch.countDown();
           });
diff --git a/service-registry/src/main/java/io/servicecomb/serviceregistry/consumer/MicroserviceVersions.java b/service-registry/src/main/java/io/servicecomb/serviceregistry/consumer/MicroserviceVersions.java
index 48bcfaa..1dadb89 100644
--- a/service-registry/src/main/java/io/servicecomb/serviceregistry/consumer/MicroserviceVersions.java
+++ b/service-registry/src/main/java/io/servicecomb/serviceregistry/consumer/MicroserviceVersions.java
@@ -104,6 +104,9 @@ public class MicroserviceVersions {
         microserviceName,
         DefinitionConst.VERSION_RULE_ALL);
     if (pulledInstances == null) {
+      // exception happens and try pull again later.
+      pendingPullCount.incrementAndGet();
+      appManager.getEventBus().post(new PullMicroserviceVersionsInstancesEvent(this, TimeUnit.SECONDS.toMillis(1)));
       return;
     }
 
diff --git a/service-registry/src/main/java/io/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java b/service-registry/src/main/java/io/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java
index 27e4d4d..c91a20b 100644
--- a/service-registry/src/main/java/io/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java
+++ b/service-registry/src/main/java/io/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java
@@ -207,7 +207,7 @@ public abstract class AbstractServiceRegistry implements ServiceRegistry {
         serviceName,
         versionRule);
     if (instances == null) {
-      LOGGER.error("find empty instances from service center. service={}/{}/{}", appId, serviceName, versionRule);
+      LOGGER.error("Can not find any instances from service center due to previous errors. service={}/{}/{}", appId, serviceName, versionRule);
       return null;
     }
 
diff --git a/service-registry/src/main/java/io/servicecomb/serviceregistry/registry/RemoteServiceRegistry.java b/service-registry/src/main/java/io/servicecomb/serviceregistry/registry/RemoteServiceRegistry.java
index 5b0a3b0..5d6be5a 100644
--- a/service-registry/src/main/java/io/servicecomb/serviceregistry/registry/RemoteServiceRegistry.java
+++ b/service-registry/src/main/java/io/servicecomb/serviceregistry/registry/RemoteServiceRegistry.java
@@ -32,6 +32,7 @@ import io.servicecomb.serviceregistry.client.ServiceRegistryClient;
 import io.servicecomb.serviceregistry.client.http.ServiceRegistryClientImpl;
 import io.servicecomb.serviceregistry.config.ServiceRegistryConfig;
 import io.servicecomb.serviceregistry.definition.MicroserviceDefinition;
+import io.servicecomb.serviceregistry.task.MicroserviceRegisterTask;
 import io.servicecomb.serviceregistry.task.event.PeriodicPullEvent;
 import io.servicecomb.serviceregistry.task.event.PullMicroserviceVersionsInstancesEvent;
 import io.servicecomb.serviceregistry.task.event.ShutdownEvent;
@@ -77,8 +78,6 @@ public class RemoteServiceRegistry extends AbstractServiceRegistry {
   public void run() {
     super.run();
 
-    ipPortManager.initAutoDiscovery();
-
     taskPool.scheduleAtFixedRate(serviceCenterTask,
         serviceRegistryConfig.getHeartbeatInterval(),
         serviceRegistryConfig.getHeartbeatInterval(),
@@ -101,6 +100,12 @@ public class RemoteServiceRegistry extends AbstractServiceRegistry {
     taskPool.schedule(event.getMicroserviceVersions()::pullInstances, event.getMsDelay(), TimeUnit.MILLISECONDS);
   }
 
+  @Subscribe
+  public void onMicroserviceRegistryTask(MicroserviceRegisterTask event) {
+    if (event.isRegistered()) {
+      ipPortManager.initAutoDiscovery();
+    }
+  }
   // for testing
   ScheduledThreadPoolExecutor getTaskPool() {
     return this.taskPool;

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-java-chassis] 03/03: fix concurrent access breaks

Posted by wu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wujimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-java-chassis.git

commit e812ad79813250ea5fd241a388ba419d2603c789
Author: bao.liu <ba...@huawei.com>
AuthorDate: Mon Jan 8 15:05:15 2018 +0800

    fix concurrent access breaks
---
 .../java/io/servicecomb/serviceregistry/client/IpPortManager.java    | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/service-registry/src/main/java/io/servicecomb/serviceregistry/client/IpPortManager.java b/service-registry/src/main/java/io/servicecomb/serviceregistry/client/IpPortManager.java
index 12b4a00..6bd4b2a 100644
--- a/service-registry/src/main/java/io/servicecomb/serviceregistry/client/IpPortManager.java
+++ b/service-registry/src/main/java/io/servicecomb/serviceregistry/client/IpPortManager.java
@@ -75,9 +75,10 @@ public class IpPortManager {
   }
 
   public IpPort getNextAvailableAddress(IpPort failedIpPort) {
-    IpPort current = getAvailableAddress();
+    int currentIndex = currentAvailableIndex.get();
+    IpPort current = getAvailableAddress(currentIndex);
     if (current.equals(failedIpPort)) {
-      currentAvailableIndex.incrementAndGet();
+      currentAvailableIndex.compareAndSet(currentIndex, currentIndex + 1);
       current = getAvailableAddress();
     }
 

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.