You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by wu...@apache.org on 2018/01/09 02:06:46 UTC

[incubator-servicecomb-java-chassis] 02/03: modify when concurrent call happens do not choose the correct address in short time but with some probability

This is an automated email from the ASF dual-hosted git repository.

wujimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-java-chassis.git

commit 6805402ab603738962e78214ccc89be7aedefa3b
Author: bao.liu <ba...@huawei.com>
AuthorDate: Fri Jan 5 15:26:42 2018 +0800

    modify when concurrent call happens do not choose the correct address in short time but with some probability
---
 .../serviceregistry/client/IpPortManager.java      | 33 ++++++------
 .../client/http/ServiceRegistryClientImpl.java     | 59 ++++++++--------------
 .../serviceregistry/client/TestIpPortManager.java  | 23 ++++-----
 .../client/http/TestClientHttp.java                |  6 +--
 4 files changed, 51 insertions(+), 70 deletions(-)

diff --git a/service-registry/src/main/java/io/servicecomb/serviceregistry/client/IpPortManager.java b/service-registry/src/main/java/io/servicecomb/serviceregistry/client/IpPortManager.java
index bd34471..12b4a00 100644
--- a/service-registry/src/main/java/io/servicecomb/serviceregistry/client/IpPortManager.java
+++ b/service-registry/src/main/java/io/servicecomb/serviceregistry/client/IpPortManager.java
@@ -74,31 +74,32 @@ public class IpPortManager {
     }
   }
 
-  public IpPort getAvailableAddress(boolean invalidate) {
-    int index;
-    if (invalidate) {
-      index = currentAvailableIndex.incrementAndGet();
-    } else {
-      index = currentAvailableIndex.get();
+  public IpPort getNextAvailableAddress(IpPort failedIpPort) {
+    IpPort current = getAvailableAddress();
+    if (current.equals(failedIpPort)) {
+      currentAvailableIndex.incrementAndGet();
+      current = getAvailableAddress();
     }
 
+    LOGGER.info("Change service center address from {} to {}", failedIpPort.toString(), current.toString());
+    return current;
+  }
+
+  public IpPort getAvailableAddress() {
+    return getAvailableAddress(currentAvailableIndex.get());
+  }
+
+  private IpPort getAvailableAddress(int index) {
     if (index < defaultIpPort.size()) {
-      return returnWithLog(invalidate, defaultIpPort.get(index));
+      return defaultIpPort.get(index);
     }
     List<CacheEndpoint> endpoints = getDiscoveredIpPort();
     if (endpoints == null || (index >= defaultIpPort.size() + endpoints.size())) {
       currentAvailableIndex.set(0);
-      return returnWithLog(invalidate, defaultIpPort.get(0));
+      return defaultIpPort.get(0);
     }
     CacheEndpoint nextEndpoint = endpoints.get(index - defaultIpPort.size());
-    return returnWithLog(invalidate, new URIEndpointObject(nextEndpoint.getEndpoint()));
-  }
-
-  private IpPort returnWithLog(boolean needLog, IpPort result) {
-    if (needLog) {
-      LOGGER.info("Using next service center address {}:{}.", result.getHostOrIp(), result.getPort());
-    }
-    return result;
+    return new URIEndpointObject(nextEndpoint.getEndpoint());
   }
 
   private List<CacheEndpoint> getDiscoveredIpPort() {
diff --git a/service-registry/src/main/java/io/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java b/service-registry/src/main/java/io/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java
index 46bddd1..3a5e84f 100644
--- a/service-registry/src/main/java/io/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java
+++ b/service-registry/src/main/java/io/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java
@@ -50,8 +50,8 @@ import io.servicecomb.serviceregistry.api.response.GetInstancesResponse;
 import io.servicecomb.serviceregistry.api.response.GetSchemaResponse;
 import io.servicecomb.serviceregistry.api.response.GetServiceResponse;
 import io.servicecomb.serviceregistry.api.response.HeartbeatResponse;
-import io.servicecomb.serviceregistry.api.response.MicroserviceInstanceResponse;
 import io.servicecomb.serviceregistry.api.response.MicroserviceInstanceChangedEvent;
+import io.servicecomb.serviceregistry.api.response.MicroserviceInstanceResponse;
 import io.servicecomb.serviceregistry.api.response.RegisterInstanceResponse;
 import io.servicecomb.serviceregistry.client.ClientException;
 import io.servicecomb.serviceregistry.client.IpPortManager;
@@ -63,7 +63,7 @@ import io.vertx.core.http.HttpClientResponse;
 
 public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
   private static final Logger LOGGER = LoggerFactory.getLogger(ServiceRegistryClientImpl.class);
-  private static final int MAX_ITERATE = 3;
+
   private IpPortManager ipPortManager;
 
   // key是本进程的微服务id和服务管理中心的id
@@ -80,27 +80,11 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
 
   private void retry(RequestContext requestContext, Handler<RestResponse> responseHandler) {
     LOGGER.warn("invoke service [{}] failed, retry.", requestContext.getUri());
-    requestContext.setIpPort(getRetryAddress(requestContext.getIpPort()));
+    requestContext.setIpPort(ipPortManager.getNextAvailableAddress(requestContext.getIpPort()));
     requestContext.setRetry(true);
     RestUtils.httpDo(requestContext, responseHandler);
   }
 
-  // try a different address if it is possible when retrying
-  private IpPort getRetryAddress(IpPort currentIpPort) {
-    int retry = MAX_ITERATE;
-    IpPort nextIpPort = ipPortManager.getAvailableAddress(true);
-    while (retry > 0) {
-      if (currentIpPort.equals(nextIpPort)) {
-        nextIpPort = ipPortManager.getAvailableAddress(true);
-      } else {
-        break;
-      }
-      retry--;
-    }
-    LOGGER.info("Retry. Current address is {} and trying {}", currentIpPort.toString(), nextIpPort.toString());
-    return nextIpPort;
-  }
-
   @SuppressWarnings("unchecked")
   private <T> Handler<RestResponse> syncHandler(CountDownLatch countDownLatch, Class<T> cls,
       Holder<T> holder) {
@@ -154,7 +138,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
         } else {
           countDownLatch.countDown();
         }
-        
+
         return;
       }
 
@@ -171,7 +155,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
   @Override
   public List<Microservice> getAllMicroservices() {
     Holder<GetAllServicesResponse> holder = new Holder<>();
-    IpPort ipPort = ipPortManager.getAvailableAddress(false);
+    IpPort ipPort = ipPortManager.getAvailableAddress();
 
     CountDownLatch countDownLatch = new CountDownLatch(1);
     RestUtils.get(ipPort,
@@ -192,7 +176,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
   @Override
   public String getMicroserviceId(String appId, String microserviceName, String versionRule) {
     Holder<GetExistenceResponse> holder = new Holder<>();
-    IpPort ipPort = ipPortManager.getAvailableAddress(false);
+    IpPort ipPort = ipPortManager.getAvailableAddress();
 
     CountDownLatch countDownLatch = new CountDownLatch(1);
     RestUtils.get(ipPort,
@@ -220,7 +204,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
   @Override
   public boolean isSchemaExist(String microserviceId, String schemaId) {
     Holder<GetExistenceResponse> holder = new Holder<>();
-    IpPort ipPort = ipPortManager.getAvailableAddress(false);
+    IpPort ipPort = ipPortManager.getAvailableAddress();
 
     CountDownLatch countDownLatch = new CountDownLatch(1);
     RestUtils.get(ipPort,
@@ -243,7 +227,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
   @Override
   public boolean registerSchema(String microserviceId, String schemaId, String schemaContent) {
     Holder<ResponseWrapper> holder = new Holder<>();
-    IpPort ipPort = ipPortManager.getAvailableAddress(false);
+    IpPort ipPort = ipPortManager.getAvailableAddress();
 
     try {
       CreateSchemaRequest request = new CreateSchemaRequest();
@@ -288,7 +272,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
   @Override
   public String getSchema(String microserviceId, String schemaId) {
     Holder<GetSchemaResponse> holder = new Holder<>();
-    IpPort ipPort = ipPortManager.getAvailableAddress(false);
+    IpPort ipPort = ipPortManager.getAvailableAddress();
 
     CountDownLatch countDownLatch = new CountDownLatch(1);
     RestUtils.get(ipPort,
@@ -313,7 +297,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
   @Override
   public String registerMicroservice(Microservice microservice) {
     Holder<CreateServiceResponse> holder = new Holder<>();
-    IpPort ipPort = ipPortManager.getAvailableAddress(false);
+    IpPort ipPort = ipPortManager.getAvailableAddress();
     try {
       CreateServiceRequest request = new CreateServiceRequest();
       request.setService(microservice);
@@ -345,7 +329,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
   @Override
   public Microservice getMicroservice(String microserviceId) {
     Holder<GetServiceResponse> holder = new Holder<>();
-    IpPort ipPort = ipPortManager.getAvailableAddress(false);
+    IpPort ipPort = ipPortManager.getAvailableAddress();
 
     CountDownLatch countDownLatch = new CountDownLatch(1);
     RestUtils.get(ipPort,
@@ -366,7 +350,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
   @Override
   public String registerMicroserviceInstance(MicroserviceInstance instance) {
     Holder<RegisterInstanceResponse> holder = new Holder<>();
-    IpPort ipPort = ipPortManager.getAvailableAddress(false);
+    IpPort ipPort = ipPortManager.getAvailableAddress();
 
     try {
       RegisterInstanceRequest request = new RegisterInstanceRequest();
@@ -395,7 +379,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
   @Override
   public List<MicroserviceInstance> getMicroserviceInstance(String consumerId, String providerId) {
     Holder<GetInstancesResponse> holder = new Holder<>();
-    IpPort ipPort = ipPortManager.getAvailableAddress(false);
+    IpPort ipPort = ipPortManager.getAvailableAddress();
 
     CountDownLatch countDownLatch = new CountDownLatch(1);
     RestUtils.get(ipPort,
@@ -416,7 +400,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
   @Override
   public boolean unregisterMicroserviceInstance(String microserviceId, String microserviceInstanceId) {
     Holder<HttpClientResponse> holder = new Holder<>();
-    IpPort ipPort = ipPortManager.getAvailableAddress(false);
+    IpPort ipPort = ipPortManager.getAvailableAddress();
 
     CountDownLatch countDownLatch = new CountDownLatch(1);
     RestUtils.delete(ipPort,
@@ -443,7 +427,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
   @Override
   public HeartbeatResponse heartbeat(String microserviceId, String microserviceInstanceId) {
     Holder<HttpClientResponse> holder = new Holder<>();
-    IpPort ipPort = ipPortManager.getAvailableAddress(false);
+    IpPort ipPort = ipPortManager.getAvailableAddress();
 
     CountDownLatch countDownLatch = new CountDownLatch(1);
     RestUtils.put(ipPort,
@@ -489,7 +473,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
 
           String url = String.format(Const.REGISTRY_API.MICROSERVICE_WATCH, selfMicroserviceId);
 
-          IpPort ipPort = ipPortManager.getAvailableAddress(false);
+          IpPort ipPort = ipPortManager.getAvailableAddress();
           WebsocketUtils.open(ipPort, url, o -> {
             onOpen.success(o);
             LOGGER.info(
@@ -536,7 +520,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
   public List<MicroserviceInstance> findServiceInstance(String consumerId, String appId, String serviceName,
       String versionRule) {
     Holder<FindInstancesResponse> holder = new Holder<>();
-    IpPort ipPort = ipPortManager.getAvailableAddress(false);
+    IpPort ipPort = ipPortManager.getAvailableAddress();
 
     CountDownLatch countDownLatch = new CountDownLatch(1);
     RestUtils.get(ipPort,
@@ -579,7 +563,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
   @Override
   public boolean updateMicroserviceProperties(String microserviceId, Map<String, String> serviceProperties) {
     Holder<HttpClientResponse> holder = new Holder<>();
-    IpPort ipPort = ipPortManager.getAvailableAddress(false);
+    IpPort ipPort = ipPortManager.getAvailableAddress();
 
     try {
       UpdatePropertiesRequest request = new UpdatePropertiesRequest();
@@ -615,7 +599,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
   public boolean updateInstanceProperties(String microserviceId, String microserviceInstanceId,
       Map<String, String> instanceProperties) {
     Holder<HttpClientResponse> holder = new Holder<>();
-    IpPort ipPort = ipPortManager.getAvailableAddress(false);
+    IpPort ipPort = ipPortManager.getAvailableAddress();
 
     try {
       UpdatePropertiesRequest request = new UpdatePropertiesRequest();
@@ -653,7 +637,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
   public MicroserviceInstance findServiceInstance(String serviceId, String instanceId) {
     try {
       Holder<MicroserviceInstanceResponse> holder = new Holder<>();
-      IpPort ipPort = ipPortManager.getAvailableAddress(false);
+      IpPort ipPort = ipPortManager.getAvailableAddress();
       CountDownLatch countDownLatch = new CountDownLatch(1);
       RestUtils.get(ipPort,
           String.format(Const.REGISTRY_API.MICROSERVICE_INSTANCE_OPERATION_ONE, serviceId, instanceId),
@@ -668,8 +652,5 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
       LOGGER.error("get instance from sc failed");
       return null;
     }
-
   }
-  
-  
 }
diff --git a/service-registry/src/test/java/io/servicecomb/serviceregistry/client/TestIpPortManager.java b/service-registry/src/test/java/io/servicecomb/serviceregistry/client/TestIpPortManager.java
index bdd877a..016be35 100644
--- a/service-registry/src/test/java/io/servicecomb/serviceregistry/client/TestIpPortManager.java
+++ b/service-registry/src/test/java/io/servicecomb/serviceregistry/client/TestIpPortManager.java
@@ -80,9 +80,9 @@ public class TestIpPortManager {
     };
 
     IpPortManager manager = new IpPortManager(config, cacheManager);
-    IpPort address1 = manager.getAvailableAddress(false);
+    IpPort address1 = manager.getAvailableAddress();
 
-    if(address1.getPort() == 9980) {
+    if (address1.getPort() == 9980) {
       Assert.assertEquals("127.0.0.1", address1.getHostOrIp());
       Assert.assertEquals(9980, address1.getPort());
     } else {
@@ -90,8 +90,8 @@ public class TestIpPortManager {
       Assert.assertEquals(9981, address1.getPort());
     }
 
-    IpPort address2 = manager.getAvailableAddress(true);
-    if(address1.getPort() == 9980) {
+    IpPort address2 = manager.getNextAvailableAddress(address1);
+    if (address1.getPort() == 9980) {
       Assert.assertEquals("127.0.0.1", address2.getHostOrIp());
       Assert.assertEquals(9981, address2.getPort());
     } else {
@@ -99,8 +99,8 @@ public class TestIpPortManager {
       Assert.assertEquals(9980, address2.getPort());
     }
 
-    IpPort address3 = manager.getAvailableAddress(false);
-    if(address1.getPort() == 9980) {
+    IpPort address3 = manager.getAvailableAddress();
+    if (address1.getPort() == 9980) {
       Assert.assertEquals("127.0.0.1", address3.getHostOrIp());
       Assert.assertEquals(9981, address3.getPort());
     } else {
@@ -120,19 +120,19 @@ public class TestIpPortManager {
         result = addresses;
       }
     };
-    
+
     manager.initAutoDiscovery();
-    IpPort address4 = manager.getAvailableAddress(true);
-    if(address1.getPort() == 9980) {
+    IpPort address4 = manager.getNextAvailableAddress(address3);
+    if (address1.getPort() == 9980) {
       Assert.assertEquals("127.0.0.1", address4.getHostOrIp());
       Assert.assertEquals(9982, address4.getPort());
     } else {
-      address4 = manager.getAvailableAddress(true);
+      address4 = manager.getNextAvailableAddress(address1);
       Assert.assertEquals("127.0.0.1", address4.getHostOrIp());
       Assert.assertEquals(9982, address4.getPort());
     }
 
-    IpPort address5 = manager.getAvailableAddress(true);
+    IpPort address5 = manager.getNextAvailableAddress(address4);
     Assert.assertEquals("127.0.0.1", address5.getHostOrIp());
     Assert.assertEquals(9980, address5.getPort());
   }
@@ -153,5 +153,4 @@ public class TestIpPortManager {
       }
     };
   }
-
 }
diff --git a/service-registry/src/test/java/io/servicecomb/serviceregistry/client/http/TestClientHttp.java b/service-registry/src/test/java/io/servicecomb/serviceregistry/client/http/TestClientHttp.java
index 29f2c98..283b0df 100644
--- a/service-registry/src/test/java/io/servicecomb/serviceregistry/client/http/TestClientHttp.java
+++ b/service-registry/src/test/java/io/servicecomb/serviceregistry/client/http/TestClientHttp.java
@@ -45,7 +45,7 @@ public class TestClientHttp {
     IpPort ipPort = new IpPort("127.0.0.1", 8853);
     new Expectations() {
       {
-        manager.getAvailableAddress(false);
+        manager.getAvailableAddress();
         result = ipPort;
       }
     };
@@ -143,7 +143,7 @@ public class TestClientHttp {
   @Test
   public void testIpPortManager(@Mocked InstanceCacheManager instanceCacheManager) throws Exception {
     IpPortManager oManager = new IpPortManager(ServiceRegistryConfig.INSTANCE, instanceCacheManager);
-    IpPort oIPPort = oManager.getAvailableAddress(true);
-    Assert.assertEquals(oIPPort.getHostOrIp(), oManager.getAvailableAddress(false).getHostOrIp());
+    IpPort oIPPort = oManager.getNextAvailableAddress(new IpPort("", 33));
+    Assert.assertEquals(oIPPort.getHostOrIp(), oManager.getAvailableAddress().getHostOrIp());
   }
 }

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.