You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by wu...@apache.org on 2018/01/09 02:06:46 UTC
[incubator-servicecomb-java-chassis] 02/03: modify when concurrent
call happens do not choose the correct address in short time but with some
probability
This is an automated email from the ASF dual-hosted git repository.
wujimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-java-chassis.git
commit 6805402ab603738962e78214ccc89be7aedefa3b
Author: bao.liu <ba...@huawei.com>
AuthorDate: Fri Jan 5 15:26:42 2018 +0800
modify when concurrent call happens do not choose the correct address in short time but with some probability
---
.../serviceregistry/client/IpPortManager.java | 33 ++++++------
.../client/http/ServiceRegistryClientImpl.java | 59 ++++++++--------------
.../serviceregistry/client/TestIpPortManager.java | 23 ++++-----
.../client/http/TestClientHttp.java | 6 +--
4 files changed, 51 insertions(+), 70 deletions(-)
diff --git a/service-registry/src/main/java/io/servicecomb/serviceregistry/client/IpPortManager.java b/service-registry/src/main/java/io/servicecomb/serviceregistry/client/IpPortManager.java
index bd34471..12b4a00 100644
--- a/service-registry/src/main/java/io/servicecomb/serviceregistry/client/IpPortManager.java
+++ b/service-registry/src/main/java/io/servicecomb/serviceregistry/client/IpPortManager.java
@@ -74,31 +74,32 @@ public class IpPortManager {
}
}
- public IpPort getAvailableAddress(boolean invalidate) {
- int index;
- if (invalidate) {
- index = currentAvailableIndex.incrementAndGet();
- } else {
- index = currentAvailableIndex.get();
+ public IpPort getNextAvailableAddress(IpPort failedIpPort) {
+ IpPort current = getAvailableAddress();
+ if (current.equals(failedIpPort)) {
+ currentAvailableIndex.incrementAndGet();
+ current = getAvailableAddress();
}
+ LOGGER.info("Change service center address from {} to {}", failedIpPort.toString(), current.toString());
+ return current;
+ }
+
+ public IpPort getAvailableAddress() {
+ return getAvailableAddress(currentAvailableIndex.get());
+ }
+
+ private IpPort getAvailableAddress(int index) {
if (index < defaultIpPort.size()) {
- return returnWithLog(invalidate, defaultIpPort.get(index));
+ return defaultIpPort.get(index);
}
List<CacheEndpoint> endpoints = getDiscoveredIpPort();
if (endpoints == null || (index >= defaultIpPort.size() + endpoints.size())) {
currentAvailableIndex.set(0);
- return returnWithLog(invalidate, defaultIpPort.get(0));
+ return defaultIpPort.get(0);
}
CacheEndpoint nextEndpoint = endpoints.get(index - defaultIpPort.size());
- return returnWithLog(invalidate, new URIEndpointObject(nextEndpoint.getEndpoint()));
- }
-
- private IpPort returnWithLog(boolean needLog, IpPort result) {
- if (needLog) {
- LOGGER.info("Using next service center address {}:{}.", result.getHostOrIp(), result.getPort());
- }
- return result;
+ return new URIEndpointObject(nextEndpoint.getEndpoint());
}
private List<CacheEndpoint> getDiscoveredIpPort() {
diff --git a/service-registry/src/main/java/io/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java b/service-registry/src/main/java/io/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java
index 46bddd1..3a5e84f 100644
--- a/service-registry/src/main/java/io/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java
+++ b/service-registry/src/main/java/io/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java
@@ -50,8 +50,8 @@ import io.servicecomb.serviceregistry.api.response.GetInstancesResponse;
import io.servicecomb.serviceregistry.api.response.GetSchemaResponse;
import io.servicecomb.serviceregistry.api.response.GetServiceResponse;
import io.servicecomb.serviceregistry.api.response.HeartbeatResponse;
-import io.servicecomb.serviceregistry.api.response.MicroserviceInstanceResponse;
import io.servicecomb.serviceregistry.api.response.MicroserviceInstanceChangedEvent;
+import io.servicecomb.serviceregistry.api.response.MicroserviceInstanceResponse;
import io.servicecomb.serviceregistry.api.response.RegisterInstanceResponse;
import io.servicecomb.serviceregistry.client.ClientException;
import io.servicecomb.serviceregistry.client.IpPortManager;
@@ -63,7 +63,7 @@ import io.vertx.core.http.HttpClientResponse;
public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
private static final Logger LOGGER = LoggerFactory.getLogger(ServiceRegistryClientImpl.class);
- private static final int MAX_ITERATE = 3;
+
private IpPortManager ipPortManager;
// key是本进程的微服务id和服务管理中心的id
@@ -80,27 +80,11 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
private void retry(RequestContext requestContext, Handler<RestResponse> responseHandler) {
LOGGER.warn("invoke service [{}] failed, retry.", requestContext.getUri());
- requestContext.setIpPort(getRetryAddress(requestContext.getIpPort()));
+ requestContext.setIpPort(ipPortManager.getNextAvailableAddress(requestContext.getIpPort()));
requestContext.setRetry(true);
RestUtils.httpDo(requestContext, responseHandler);
}
- // try a different address if it is possible when retrying
- private IpPort getRetryAddress(IpPort currentIpPort) {
- int retry = MAX_ITERATE;
- IpPort nextIpPort = ipPortManager.getAvailableAddress(true);
- while (retry > 0) {
- if (currentIpPort.equals(nextIpPort)) {
- nextIpPort = ipPortManager.getAvailableAddress(true);
- } else {
- break;
- }
- retry--;
- }
- LOGGER.info("Retry. Current address is {} and trying {}", currentIpPort.toString(), nextIpPort.toString());
- return nextIpPort;
- }
-
@SuppressWarnings("unchecked")
private <T> Handler<RestResponse> syncHandler(CountDownLatch countDownLatch, Class<T> cls,
Holder<T> holder) {
@@ -154,7 +138,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
} else {
countDownLatch.countDown();
}
-
+
return;
}
@@ -171,7 +155,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
@Override
public List<Microservice> getAllMicroservices() {
Holder<GetAllServicesResponse> holder = new Holder<>();
- IpPort ipPort = ipPortManager.getAvailableAddress(false);
+ IpPort ipPort = ipPortManager.getAvailableAddress();
CountDownLatch countDownLatch = new CountDownLatch(1);
RestUtils.get(ipPort,
@@ -192,7 +176,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
@Override
public String getMicroserviceId(String appId, String microserviceName, String versionRule) {
Holder<GetExistenceResponse> holder = new Holder<>();
- IpPort ipPort = ipPortManager.getAvailableAddress(false);
+ IpPort ipPort = ipPortManager.getAvailableAddress();
CountDownLatch countDownLatch = new CountDownLatch(1);
RestUtils.get(ipPort,
@@ -220,7 +204,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
@Override
public boolean isSchemaExist(String microserviceId, String schemaId) {
Holder<GetExistenceResponse> holder = new Holder<>();
- IpPort ipPort = ipPortManager.getAvailableAddress(false);
+ IpPort ipPort = ipPortManager.getAvailableAddress();
CountDownLatch countDownLatch = new CountDownLatch(1);
RestUtils.get(ipPort,
@@ -243,7 +227,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
@Override
public boolean registerSchema(String microserviceId, String schemaId, String schemaContent) {
Holder<ResponseWrapper> holder = new Holder<>();
- IpPort ipPort = ipPortManager.getAvailableAddress(false);
+ IpPort ipPort = ipPortManager.getAvailableAddress();
try {
CreateSchemaRequest request = new CreateSchemaRequest();
@@ -288,7 +272,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
@Override
public String getSchema(String microserviceId, String schemaId) {
Holder<GetSchemaResponse> holder = new Holder<>();
- IpPort ipPort = ipPortManager.getAvailableAddress(false);
+ IpPort ipPort = ipPortManager.getAvailableAddress();
CountDownLatch countDownLatch = new CountDownLatch(1);
RestUtils.get(ipPort,
@@ -313,7 +297,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
@Override
public String registerMicroservice(Microservice microservice) {
Holder<CreateServiceResponse> holder = new Holder<>();
- IpPort ipPort = ipPortManager.getAvailableAddress(false);
+ IpPort ipPort = ipPortManager.getAvailableAddress();
try {
CreateServiceRequest request = new CreateServiceRequest();
request.setService(microservice);
@@ -345,7 +329,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
@Override
public Microservice getMicroservice(String microserviceId) {
Holder<GetServiceResponse> holder = new Holder<>();
- IpPort ipPort = ipPortManager.getAvailableAddress(false);
+ IpPort ipPort = ipPortManager.getAvailableAddress();
CountDownLatch countDownLatch = new CountDownLatch(1);
RestUtils.get(ipPort,
@@ -366,7 +350,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
@Override
public String registerMicroserviceInstance(MicroserviceInstance instance) {
Holder<RegisterInstanceResponse> holder = new Holder<>();
- IpPort ipPort = ipPortManager.getAvailableAddress(false);
+ IpPort ipPort = ipPortManager.getAvailableAddress();
try {
RegisterInstanceRequest request = new RegisterInstanceRequest();
@@ -395,7 +379,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
@Override
public List<MicroserviceInstance> getMicroserviceInstance(String consumerId, String providerId) {
Holder<GetInstancesResponse> holder = new Holder<>();
- IpPort ipPort = ipPortManager.getAvailableAddress(false);
+ IpPort ipPort = ipPortManager.getAvailableAddress();
CountDownLatch countDownLatch = new CountDownLatch(1);
RestUtils.get(ipPort,
@@ -416,7 +400,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
@Override
public boolean unregisterMicroserviceInstance(String microserviceId, String microserviceInstanceId) {
Holder<HttpClientResponse> holder = new Holder<>();
- IpPort ipPort = ipPortManager.getAvailableAddress(false);
+ IpPort ipPort = ipPortManager.getAvailableAddress();
CountDownLatch countDownLatch = new CountDownLatch(1);
RestUtils.delete(ipPort,
@@ -443,7 +427,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
@Override
public HeartbeatResponse heartbeat(String microserviceId, String microserviceInstanceId) {
Holder<HttpClientResponse> holder = new Holder<>();
- IpPort ipPort = ipPortManager.getAvailableAddress(false);
+ IpPort ipPort = ipPortManager.getAvailableAddress();
CountDownLatch countDownLatch = new CountDownLatch(1);
RestUtils.put(ipPort,
@@ -489,7 +473,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
String url = String.format(Const.REGISTRY_API.MICROSERVICE_WATCH, selfMicroserviceId);
- IpPort ipPort = ipPortManager.getAvailableAddress(false);
+ IpPort ipPort = ipPortManager.getAvailableAddress();
WebsocketUtils.open(ipPort, url, o -> {
onOpen.success(o);
LOGGER.info(
@@ -536,7 +520,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
public List<MicroserviceInstance> findServiceInstance(String consumerId, String appId, String serviceName,
String versionRule) {
Holder<FindInstancesResponse> holder = new Holder<>();
- IpPort ipPort = ipPortManager.getAvailableAddress(false);
+ IpPort ipPort = ipPortManager.getAvailableAddress();
CountDownLatch countDownLatch = new CountDownLatch(1);
RestUtils.get(ipPort,
@@ -579,7 +563,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
@Override
public boolean updateMicroserviceProperties(String microserviceId, Map<String, String> serviceProperties) {
Holder<HttpClientResponse> holder = new Holder<>();
- IpPort ipPort = ipPortManager.getAvailableAddress(false);
+ IpPort ipPort = ipPortManager.getAvailableAddress();
try {
UpdatePropertiesRequest request = new UpdatePropertiesRequest();
@@ -615,7 +599,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
public boolean updateInstanceProperties(String microserviceId, String microserviceInstanceId,
Map<String, String> instanceProperties) {
Holder<HttpClientResponse> holder = new Holder<>();
- IpPort ipPort = ipPortManager.getAvailableAddress(false);
+ IpPort ipPort = ipPortManager.getAvailableAddress();
try {
UpdatePropertiesRequest request = new UpdatePropertiesRequest();
@@ -653,7 +637,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
public MicroserviceInstance findServiceInstance(String serviceId, String instanceId) {
try {
Holder<MicroserviceInstanceResponse> holder = new Holder<>();
- IpPort ipPort = ipPortManager.getAvailableAddress(false);
+ IpPort ipPort = ipPortManager.getAvailableAddress();
CountDownLatch countDownLatch = new CountDownLatch(1);
RestUtils.get(ipPort,
String.format(Const.REGISTRY_API.MICROSERVICE_INSTANCE_OPERATION_ONE, serviceId, instanceId),
@@ -668,8 +652,5 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
LOGGER.error("get instance from sc failed");
return null;
}
-
}
-
-
}
diff --git a/service-registry/src/test/java/io/servicecomb/serviceregistry/client/TestIpPortManager.java b/service-registry/src/test/java/io/servicecomb/serviceregistry/client/TestIpPortManager.java
index bdd877a..016be35 100644
--- a/service-registry/src/test/java/io/servicecomb/serviceregistry/client/TestIpPortManager.java
+++ b/service-registry/src/test/java/io/servicecomb/serviceregistry/client/TestIpPortManager.java
@@ -80,9 +80,9 @@ public class TestIpPortManager {
};
IpPortManager manager = new IpPortManager(config, cacheManager);
- IpPort address1 = manager.getAvailableAddress(false);
+ IpPort address1 = manager.getAvailableAddress();
- if(address1.getPort() == 9980) {
+ if (address1.getPort() == 9980) {
Assert.assertEquals("127.0.0.1", address1.getHostOrIp());
Assert.assertEquals(9980, address1.getPort());
} else {
@@ -90,8 +90,8 @@ public class TestIpPortManager {
Assert.assertEquals(9981, address1.getPort());
}
- IpPort address2 = manager.getAvailableAddress(true);
- if(address1.getPort() == 9980) {
+ IpPort address2 = manager.getNextAvailableAddress(address1);
+ if (address1.getPort() == 9980) {
Assert.assertEquals("127.0.0.1", address2.getHostOrIp());
Assert.assertEquals(9981, address2.getPort());
} else {
@@ -99,8 +99,8 @@ public class TestIpPortManager {
Assert.assertEquals(9980, address2.getPort());
}
- IpPort address3 = manager.getAvailableAddress(false);
- if(address1.getPort() == 9980) {
+ IpPort address3 = manager.getAvailableAddress();
+ if (address1.getPort() == 9980) {
Assert.assertEquals("127.0.0.1", address3.getHostOrIp());
Assert.assertEquals(9981, address3.getPort());
} else {
@@ -120,19 +120,19 @@ public class TestIpPortManager {
result = addresses;
}
};
-
+
manager.initAutoDiscovery();
- IpPort address4 = manager.getAvailableAddress(true);
- if(address1.getPort() == 9980) {
+ IpPort address4 = manager.getNextAvailableAddress(address3);
+ if (address1.getPort() == 9980) {
Assert.assertEquals("127.0.0.1", address4.getHostOrIp());
Assert.assertEquals(9982, address4.getPort());
} else {
- address4 = manager.getAvailableAddress(true);
+ address4 = manager.getNextAvailableAddress(address1);
Assert.assertEquals("127.0.0.1", address4.getHostOrIp());
Assert.assertEquals(9982, address4.getPort());
}
- IpPort address5 = manager.getAvailableAddress(true);
+ IpPort address5 = manager.getNextAvailableAddress(address4);
Assert.assertEquals("127.0.0.1", address5.getHostOrIp());
Assert.assertEquals(9980, address5.getPort());
}
@@ -153,5 +153,4 @@ public class TestIpPortManager {
}
};
}
-
}
diff --git a/service-registry/src/test/java/io/servicecomb/serviceregistry/client/http/TestClientHttp.java b/service-registry/src/test/java/io/servicecomb/serviceregistry/client/http/TestClientHttp.java
index 29f2c98..283b0df 100644
--- a/service-registry/src/test/java/io/servicecomb/serviceregistry/client/http/TestClientHttp.java
+++ b/service-registry/src/test/java/io/servicecomb/serviceregistry/client/http/TestClientHttp.java
@@ -45,7 +45,7 @@ public class TestClientHttp {
IpPort ipPort = new IpPort("127.0.0.1", 8853);
new Expectations() {
{
- manager.getAvailableAddress(false);
+ manager.getAvailableAddress();
result = ipPort;
}
};
@@ -143,7 +143,7 @@ public class TestClientHttp {
@Test
public void testIpPortManager(@Mocked InstanceCacheManager instanceCacheManager) throws Exception {
IpPortManager oManager = new IpPortManager(ServiceRegistryConfig.INSTANCE, instanceCacheManager);
- IpPort oIPPort = oManager.getAvailableAddress(true);
- Assert.assertEquals(oIPPort.getHostOrIp(), oManager.getAvailableAddress(false).getHostOrIp());
+ IpPort oIPPort = oManager.getNextAvailableAddress(new IpPort("", 33));
+ Assert.assertEquals(oIPPort.getHostOrIp(), oManager.getAvailableAddress().getHostOrIp());
}
}
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.