You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by wu...@apache.org on 2018/01/09 02:06:45 UTC
[incubator-servicecomb-java-chassis] 01/03: [SCB-165] Service
registry not given proper instances
This is an automated email from the ASF dual-hosted git repository.
wujimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-java-chassis.git
commit f980a116f409755c5737d63130a47754afde8432
Author: bao.liu <ba...@huawei.com>
AuthorDate: Thu Jan 4 16:37:01 2018 +0800
[SCB-165] Service registry not given proper instances
---
.../definition/loader/DynamicSchemaLoader.java | 4 ++++
.../servicecomb/foundation/common/net/IpPort.java | 22 +++++++++++++++++++++
.../foundation/common/net/TestIpPort.java | 4 ++++
.../serviceregistry/client/IpPortManager.java | 16 ++++++++-------
.../client/http/ServiceRegistryClientImpl.java | 23 +++++++++++++++++++---
.../consumer/MicroserviceVersions.java | 3 +++
.../registry/AbstractServiceRegistry.java | 2 +-
.../registry/RemoteServiceRegistry.java | 9 +++++++--
8 files changed, 70 insertions(+), 13 deletions(-)
diff --git a/core/src/main/java/io/servicecomb/core/definition/loader/DynamicSchemaLoader.java b/core/src/main/java/io/servicecomb/core/definition/loader/DynamicSchemaLoader.java
index 332b35a..08a3c8d 100644
--- a/core/src/main/java/io/servicecomb/core/definition/loader/DynamicSchemaLoader.java
+++ b/core/src/main/java/io/servicecomb/core/definition/loader/DynamicSchemaLoader.java
@@ -39,7 +39,11 @@ import io.servicecomb.serviceregistry.RegistryUtils;
* 需要支持在不同的产品中部署为不同的微服务名
* 微服务名是由环境变量等等方式注入的
* 此时可以在BootListener中进行注册(必须在producer初始化之前注册契约)
+ *
+ * @Deprecated This class is deprecated because when making calls to a provider, schemas will be downloaded from service enter.
+ * And at provider, schemas will register to service center when starting up.
*/
+@Deprecated
public class DynamicSchemaLoader {
private static final Logger LOGGER = LoggerFactory.getLogger(DynamicSchemaLoader.class);
diff --git a/foundations/foundation-common/src/main/java/io/servicecomb/foundation/common/net/IpPort.java b/foundations/foundation-common/src/main/java/io/servicecomb/foundation/common/net/IpPort.java
index 601b063..15dee9d 100644
--- a/foundations/foundation-common/src/main/java/io/servicecomb/foundation/common/net/IpPort.java
+++ b/foundations/foundation-common/src/main/java/io/servicecomb/foundation/common/net/IpPort.java
@@ -53,6 +53,28 @@ public class IpPort {
this.port = port;
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ IpPort ipPort = (IpPort) o;
+
+ if (port != ipPort.port) {
+ return false;
+ }
+ return hostOrIp.equals(ipPort.hostOrIp);
+ }
+
+ @Override
+ public String toString() {
+ return hostOrIp + ":" + port;
+ }
+
public InetSocketAddress getSocketAddress() {
if (socketAddress == null) {
synchronized (lock) {
diff --git a/foundations/foundation-common/src/test/java/io/servicecomb/foundation/common/net/TestIpPort.java b/foundations/foundation-common/src/test/java/io/servicecomb/foundation/common/net/TestIpPort.java
index 21dcda7..a6dbdcf 100644
--- a/foundations/foundation-common/src/test/java/io/servicecomb/foundation/common/net/TestIpPort.java
+++ b/foundations/foundation-common/src/test/java/io/servicecomb/foundation/common/net/TestIpPort.java
@@ -30,5 +30,9 @@ public class TestIpPort {
Assert.assertEquals(inst1.getHostOrIp(), inst2.getHostOrIp());
Assert.assertEquals(inst1.getPort(), inst2.getPort());
Assert.assertEquals(inst1.getSocketAddress().getHostName(), inst2.getSocketAddress().getHostName());
+ Assert.assertEquals(inst1, inst1);
+ Assert.assertEquals(inst1, inst2);
+ Assert.assertEquals(inst1.toString(), "localhost:3333");
+ Assert.assertNotEquals(inst1, new Object());
}
}
diff --git a/service-registry/src/main/java/io/servicecomb/serviceregistry/client/IpPortManager.java b/service-registry/src/main/java/io/servicecomb/serviceregistry/client/IpPortManager.java
index f6a681a..bd34471 100644
--- a/service-registry/src/main/java/io/servicecomb/serviceregistry/client/IpPortManager.java
+++ b/service-registry/src/main/java/io/servicecomb/serviceregistry/client/IpPortManager.java
@@ -19,6 +19,7 @@ package io.servicecomb.serviceregistry.client;
import static io.servicecomb.serviceregistry.api.Const.REGISTRY_APP_ID;
import static io.servicecomb.serviceregistry.api.Const.REGISTRY_SERVICE_NAME;
+
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
@@ -46,10 +47,10 @@ public class IpPortManager {
private ArrayList<IpPort> defaultIpPort;
- private InstanceCache instanceCache = null;
-
private AtomicInteger currentAvailableIndex;
+ private boolean autoDiscoveryInited = false;
+
public IpPortManager(ServiceRegistryConfig serviceRegistryConfig, InstanceCacheManager instanceCacheManager) {
this.serviceRegistryConfig = serviceRegistryConfig;
this.instanceCacheManager = instanceCacheManager;
@@ -65,10 +66,11 @@ public class IpPortManager {
// we have to do this operation after the first time setup has already done
public void initAutoDiscovery() {
- if (this.serviceRegistryConfig.isRegistryAutoDiscovery()) {
- instanceCache = instanceCacheManager.getOrCreate(REGISTRY_APP_ID,
+ if (!autoDiscoveryInited && this.serviceRegistryConfig.isRegistryAutoDiscovery()) {
+ instanceCacheManager.getOrCreate(REGISTRY_APP_ID,
REGISTRY_SERVICE_NAME,
DefinitionConst.VERSION_RULE_LATEST);
+ autoDiscoveryInited = true;
}
}
@@ -100,10 +102,10 @@ public class IpPortManager {
}
private List<CacheEndpoint> getDiscoveredIpPort() {
- if (instanceCache == null) {
- return new ArrayList<>(0);
+ if (!autoDiscoveryInited || !this.serviceRegistryConfig.isRegistryAutoDiscovery()) {
+ return null;
}
- instanceCache = instanceCacheManager.getOrCreate(REGISTRY_APP_ID,
+ InstanceCache instanceCache = instanceCacheManager.getOrCreate(REGISTRY_APP_ID,
REGISTRY_SERVICE_NAME,
DefinitionConst.VERSION_RULE_LATEST);
return instanceCache.getOrCreateTransportMap().get(defaultTransport);
diff --git a/service-registry/src/main/java/io/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java b/service-registry/src/main/java/io/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java
index 53ffe5c..46bddd1 100644
--- a/service-registry/src/main/java/io/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java
+++ b/service-registry/src/main/java/io/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java
@@ -63,7 +63,7 @@ import io.vertx.core.http.HttpClientResponse;
public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
private static final Logger LOGGER = LoggerFactory.getLogger(ServiceRegistryClientImpl.class);
-
+ private static final int MAX_ITERATE = 3;
private IpPortManager ipPortManager;
// key是本进程的微服务id和服务管理中心的id
@@ -80,11 +80,27 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
private void retry(RequestContext requestContext, Handler<RestResponse> responseHandler) {
LOGGER.warn("invoke service [{}] failed, retry.", requestContext.getUri());
- requestContext.setIpPort(ipPortManager.getAvailableAddress(true));
+ requestContext.setIpPort(getRetryAddress(requestContext.getIpPort()));
requestContext.setRetry(true);
RestUtils.httpDo(requestContext, responseHandler);
}
+ // try a different address if it is possible when retrying
+ private IpPort getRetryAddress(IpPort currentIpPort) {
+ int retry = MAX_ITERATE;
+ IpPort nextIpPort = ipPortManager.getAvailableAddress(true);
+ while (retry > 0) {
+ if (currentIpPort.equals(nextIpPort)) {
+ nextIpPort = ipPortManager.getAvailableAddress(true);
+ } else {
+ break;
+ }
+ retry--;
+ }
+ LOGGER.info("Retry. Current address is {} and trying {}", currentIpPort.toString(), nextIpPort.toString());
+ return nextIpPort;
+ }
+
@SuppressWarnings("unchecked")
private <T> Handler<RestResponse> syncHandler(CountDownLatch countDownLatch, Class<T> cls,
Holder<T> holder) {
@@ -111,7 +127,8 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
holder.value =
JsonUtils.readValue(bodyBuffer.getBytes(), cls);
} catch (Exception e) {
- LOGGER.warn(bodyBuffer.toString());
+ LOGGER.warn("read value failed and response message is {}",
+ bodyBuffer.toString());
}
countDownLatch.countDown();
});
diff --git a/service-registry/src/main/java/io/servicecomb/serviceregistry/consumer/MicroserviceVersions.java b/service-registry/src/main/java/io/servicecomb/serviceregistry/consumer/MicroserviceVersions.java
index 48bcfaa..1dadb89 100644
--- a/service-registry/src/main/java/io/servicecomb/serviceregistry/consumer/MicroserviceVersions.java
+++ b/service-registry/src/main/java/io/servicecomb/serviceregistry/consumer/MicroserviceVersions.java
@@ -104,6 +104,9 @@ public class MicroserviceVersions {
microserviceName,
DefinitionConst.VERSION_RULE_ALL);
if (pulledInstances == null) {
+ // exception happens and try pull again later.
+ pendingPullCount.incrementAndGet();
+ appManager.getEventBus().post(new PullMicroserviceVersionsInstancesEvent(this, TimeUnit.SECONDS.toMillis(1)));
return;
}
diff --git a/service-registry/src/main/java/io/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java b/service-registry/src/main/java/io/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java
index 27e4d4d..c91a20b 100644
--- a/service-registry/src/main/java/io/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java
+++ b/service-registry/src/main/java/io/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java
@@ -207,7 +207,7 @@ public abstract class AbstractServiceRegistry implements ServiceRegistry {
serviceName,
versionRule);
if (instances == null) {
- LOGGER.error("find empty instances from service center. service={}/{}/{}", appId, serviceName, versionRule);
+ LOGGER.error("Can not find any instances from service center due to previous errors. service={}/{}/{}", appId, serviceName, versionRule);
return null;
}
diff --git a/service-registry/src/main/java/io/servicecomb/serviceregistry/registry/RemoteServiceRegistry.java b/service-registry/src/main/java/io/servicecomb/serviceregistry/registry/RemoteServiceRegistry.java
index 5b0a3b0..5d6be5a 100644
--- a/service-registry/src/main/java/io/servicecomb/serviceregistry/registry/RemoteServiceRegistry.java
+++ b/service-registry/src/main/java/io/servicecomb/serviceregistry/registry/RemoteServiceRegistry.java
@@ -32,6 +32,7 @@ import io.servicecomb.serviceregistry.client.ServiceRegistryClient;
import io.servicecomb.serviceregistry.client.http.ServiceRegistryClientImpl;
import io.servicecomb.serviceregistry.config.ServiceRegistryConfig;
import io.servicecomb.serviceregistry.definition.MicroserviceDefinition;
+import io.servicecomb.serviceregistry.task.MicroserviceRegisterTask;
import io.servicecomb.serviceregistry.task.event.PeriodicPullEvent;
import io.servicecomb.serviceregistry.task.event.PullMicroserviceVersionsInstancesEvent;
import io.servicecomb.serviceregistry.task.event.ShutdownEvent;
@@ -77,8 +78,6 @@ public class RemoteServiceRegistry extends AbstractServiceRegistry {
public void run() {
super.run();
- ipPortManager.initAutoDiscovery();
-
taskPool.scheduleAtFixedRate(serviceCenterTask,
serviceRegistryConfig.getHeartbeatInterval(),
serviceRegistryConfig.getHeartbeatInterval(),
@@ -101,6 +100,12 @@ public class RemoteServiceRegistry extends AbstractServiceRegistry {
taskPool.schedule(event.getMicroserviceVersions()::pullInstances, event.getMsDelay(), TimeUnit.MILLISECONDS);
}
+ @Subscribe
+ public void onMicroserviceRegistryTask(MicroserviceRegisterTask event) {
+ if (event.isRegistered()) {
+ ipPortManager.initAutoDiscovery();
+ }
+ }
// for testing
ScheduledThreadPoolExecutor getTaskPool() {
return this.taskPool;
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.