You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2021/03/23 01:02:45 UTC
[servicecomb-java-chassis] branch master updated: [SCB-2226]
zero-config support multiple modes,
multicast/unicast/local/redis/db and so on (#2309)
This is an automated email from the ASF dual-hosted git repository.
liubao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git
The following commit(s) were added to refs/heads/master by this push:
new 8619d5c [SCB-2226] zero-config support multiple modes, multicast/unicast/local/redis/db and so on (#2309)
8619d5c is described below
commit 8619d5cf3d7250fcd6f35fe5416c193791de5d38
Author: wujimin <wu...@huawei.com>
AuthorDate: Tue Mar 23 09:02:37 2021 +0800
[SCB-2226] zero-config support multiple modes, multicast/unicast/local/redis/db and so on (#2309)
---
coverage-reports/pom.xml | 4 +
.../src/main/resources/application.yml | 3 +
.../servicecomb/config/DynamicPropertiesImpl.java | 7 +-
.../main/resources/META-INF/spring/cse.bean.xml | 24 -----
parents/default/pom.xml | 17 +++
.../lightweight/AbstractLightweightDiscovery.java | 50 ++++++++-
.../AbstractLightweightRegistration.java | 61 +++++++++++
.../servicecomb/registry/lightweight}/Message.java | 16 ++-
.../registry/lightweight/MessageExecutor.java} | 55 +++-------
.../registry/lightweight}/MessageType.java | 2 +-
.../registry/lightweight/MicroserviceInfo.java | 4 +-
...egistryServerService.java => StoreService.java} | 6 +-
.../registry/lightweight}/MessageTest.java | 8 +-
...erverServiceTest.java => StoreServiceTest.java} | 4 +-
.../servicecomb/localregistry/RegistryBean.java | 1 -
.../registry/RemoteServiceRegistry.java | 1 +
.../zeroconfig/AbstractZeroConfigRegistration.java | 51 +++++++++
.../org/apache/servicecomb/zeroconfig/Config.java | 35 +++++-
.../servicecomb/zeroconfig/ZeroConfigConst.java | 23 +++-
.../zeroconfig/ZeroConfigDiscovery.java | 35 ++----
.../zeroconfig/ZeroConfigRegistration.java | 119 ---------------------
.../ConditionOnLocal.java} | 68 ++++++------
.../zeroconfig/local/LocalRegistration.java | 62 +++++++++++
.../ConditionOnMulticast.java} | 68 ++++++------
.../zeroconfig/{ => multicast}/Multicast.java | 17 +--
.../multicast/MulticastRegistration.java | 74 +++++++++++++
.../zeroconfig/multicast/MulticastServer.java | 73 +++++++++++++
27 files changed, 579 insertions(+), 309 deletions(-)
diff --git a/coverage-reports/pom.xml b/coverage-reports/pom.xml
index 4905419..ea49d30 100644
--- a/coverage-reports/pom.xml
+++ b/coverage-reports/pom.xml
@@ -232,6 +232,10 @@
<!-- service registry -->
<dependency>
<groupId>org.apache.servicecomb</groupId>
+ <artifactId>registry-lightweight</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.servicecomb</groupId>
<artifactId>registry-local</artifactId>
</dependency>
<dependency>
diff --git a/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-server/src/main/resources/application.yml b/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-server/src/main/resources/application.yml
index 9f6d98b..7d4118f 100644
--- a/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-server/src/main/resources/application.yml
+++ b/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-server/src/main/resources/application.yml
@@ -25,6 +25,9 @@ service_description:
name: demo-zeroconfig-schemadiscovery-registry-server
version: 0.0.2
servicecomb:
+ service:
+ zero-config:
+ mode: multicast
rest:
address: 0.0.0.0:8080
handler:
diff --git a/foundations/foundation-config/src/main/java/org/apache/servicecomb/config/DynamicPropertiesImpl.java b/foundations/foundation-config/src/main/java/org/apache/servicecomb/config/DynamicPropertiesImpl.java
index 027d70c..c90caa4 100644
--- a/foundations/foundation-config/src/main/java/org/apache/servicecomb/config/DynamicPropertiesImpl.java
+++ b/foundations/foundation-config/src/main/java/org/apache/servicecomb/config/DynamicPropertiesImpl.java
@@ -24,6 +24,7 @@ import java.util.function.IntConsumer;
import java.util.function.LongConsumer;
import org.apache.commons.configuration.AbstractConfiguration;
+import org.springframework.stereotype.Component;
import com.netflix.config.ConcurrentCompositeConfiguration;
import com.netflix.config.ConfigurationManager;
@@ -35,12 +36,12 @@ import com.netflix.config.DynamicLongProperty;
import com.netflix.config.DynamicPropertyFactory;
import com.netflix.config.DynamicStringProperty;
+@Component
class DynamicPropertiesImpl implements DynamicProperties {
-
- DynamicPropertiesImpl() {
+ public DynamicPropertiesImpl() {
}
- DynamicPropertiesImpl(AbstractConfiguration... configurations) {
+ public DynamicPropertiesImpl(AbstractConfiguration... configurations) {
ConcurrentCompositeConfiguration configuration = new ConcurrentCompositeConfiguration();
Arrays.stream(configurations).forEach(configuration::addConfiguration);
diff --git a/foundations/foundation-config/src/main/resources/META-INF/spring/cse.bean.xml b/foundations/foundation-config/src/main/resources/META-INF/spring/cse.bean.xml
deleted file mode 100644
index 6a794b6..0000000
--- a/foundations/foundation-config/src/main/resources/META-INF/spring/cse.bean.xml
+++ /dev/null
@@ -1,24 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="
- http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
- <bean class="org.apache.servicecomb.config.DynamicPropertiesImpl"/>
-</beans>
diff --git a/parents/default/pom.xml b/parents/default/pom.xml
index 10a8b0c..d169438 100644
--- a/parents/default/pom.xml
+++ b/parents/default/pom.xml
@@ -102,4 +102,21 @@
<scope>test</scope>
</dependency>
</dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>2.4</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git a/service-registry/registry-lightweight/src/main/java/org/apache/servicecomb/registry/lightweight/AbstractLightweightDiscovery.java b/service-registry/registry-lightweight/src/main/java/org/apache/servicecomb/registry/lightweight/AbstractLightweightDiscovery.java
index 16abd0c..382f68e 100644
--- a/service-registry/registry-lightweight/src/main/java/org/apache/servicecomb/registry/lightweight/AbstractLightweightDiscovery.java
+++ b/service-registry/registry-lightweight/src/main/java/org/apache/servicecomb/registry/lightweight/AbstractLightweightDiscovery.java
@@ -17,22 +17,68 @@
package org.apache.servicecomb.registry.lightweight;
+import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.apache.servicecomb.registry.api.Discovery;
import org.apache.servicecomb.registry.api.registry.Microservice;
import org.apache.servicecomb.registry.api.registry.MicroserviceInstance;
import org.apache.servicecomb.registry.api.registry.MicroserviceInstances;
+import org.apache.servicecomb.registry.consumer.AppManager;
import org.apache.servicecomb.registry.lightweight.store.MicroserviceStore;
import org.apache.servicecomb.registry.lightweight.store.Store;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+
+@SuppressWarnings("UnstableApiUsage")
+public abstract class AbstractLightweightDiscovery implements Discovery, InitializingBean {
+ protected EventBus eventBus;
-public abstract class AbstractLightweightDiscovery implements Discovery {
protected Store store;
- public AbstractLightweightDiscovery(Store store) {
+ protected AppManager appManager;
+
+ @Autowired
+ public AbstractLightweightDiscovery setEventBus(EventBus eventBus) {
+ this.eventBus = eventBus;
+ return this;
+ }
+
+ @Autowired
+ public AbstractLightweightDiscovery setStore(Store store) {
this.store = store;
+ return this;
+ }
+
+ @Autowired
+ public AbstractLightweightDiscovery setAppManager(AppManager appManager) {
+ this.appManager = appManager;
+ return this;
+ }
+
+ @Override
+ public void afterPropertiesSet() throws Exception {
+ eventBus.register(this);
+ }
+
+ @SuppressWarnings("unused")
+ @Subscribe
+ public void onSchemaChanged(SchemaChangedEvent event) {
+ Microservice microservice = event.getMicroservice();
+ appManager.markWaitingDelete(microservice.getAppId(), microservice.getServiceName());
+ }
+
+ protected void startPullInstances(Duration pullInterval) {
+ Executors
+ .newSingleThreadScheduledExecutor(runnable -> new Thread(runnable, name()))
+ .scheduleAtFixedRate(appManager::safePullInstances, 0, pullInterval.getSeconds(), TimeUnit.SECONDS);
}
@Override
diff --git a/service-registry/registry-lightweight/src/main/java/org/apache/servicecomb/registry/lightweight/AbstractLightweightRegistration.java b/service-registry/registry-lightweight/src/main/java/org/apache/servicecomb/registry/lightweight/AbstractLightweightRegistration.java
index 179ca8f..3c57371 100644
--- a/service-registry/registry-lightweight/src/main/java/org/apache/servicecomb/registry/lightweight/AbstractLightweightRegistration.java
+++ b/service-registry/registry-lightweight/src/main/java/org/apache/servicecomb/registry/lightweight/AbstractLightweightRegistration.java
@@ -17,25 +17,74 @@
package org.apache.servicecomb.registry.lightweight;
+import java.io.IOException;
+import java.time.Duration;
import java.util.Collection;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.apache.servicecomb.registry.api.Registration;
+import org.apache.servicecomb.registry.api.event.MicroserviceInstanceRegisteredEvent;
import org.apache.servicecomb.registry.api.registry.BasePath;
import org.apache.servicecomb.registry.api.registry.Microservice;
import org.apache.servicecomb.registry.api.registry.MicroserviceInstance;
import org.apache.servicecomb.registry.api.registry.MicroserviceInstanceStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import com.google.common.eventbus.EventBus;
+
+@SuppressWarnings("UnstableApiUsage")
public abstract class AbstractLightweightRegistration implements Registration {
+ private static final Logger LOGGER = LoggerFactory.getLogger(AbstractLightweightRegistration.class);
+
+ protected EventBus eventBus;
+
protected Self self;
@Autowired
+ public AbstractLightweightRegistration setEventBus(EventBus eventBus) {
+ this.eventBus = eventBus;
+ return this;
+ }
+
+ @Autowired
public AbstractLightweightRegistration setSelf(Self self) {
this.self = self;
return this;
}
@Override
+ public void init() {
+
+ }
+
+ protected void startRegister(Duration interval) {
+ Executors
+ .newSingleThreadScheduledExecutor(runnable -> new Thread(runnable, name()))
+ .scheduleAtFixedRate(this::sendRegister, 0, interval.getSeconds(), TimeUnit.SECONDS);
+ }
+
+ protected void postRegisteredEvent() {
+ eventBus.post(new MicroserviceInstanceRegisteredEvent(
+ name(),
+ self.getInstance().getInstanceId(),
+ false
+ ));
+ }
+
+ protected void sendRegister() {
+ try {
+ doSendRegister();
+ } catch (Exception e) {
+ LOGGER.error("register failed.", e);
+ }
+ }
+
+ protected abstract void doSendRegister() throws IOException;
+
+ @Override
public MicroserviceInstance getMicroserviceInstance() {
return self.getInstance();
}
@@ -70,4 +119,16 @@ public abstract class AbstractLightweightRegistration implements Registration {
public void addBasePath(Collection<BasePath> basePaths) {
self.getMicroservice().getPaths().addAll(basePaths);
}
+
+
+ @Override
+ public void destroy() {
+ try {
+ doSendUnregister();
+ } catch (Exception e) {
+ LOGGER.error("unregister failed.", e);
+ }
+ }
+
+ protected abstract void doSendUnregister() throws IOException;
}
\ No newline at end of file
diff --git a/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/Message.java b/service-registry/registry-lightweight/src/main/java/org/apache/servicecomb/registry/lightweight/Message.java
similarity index 78%
rename from service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/Message.java
rename to service-registry/registry-lightweight/src/main/java/org/apache/servicecomb/registry/lightweight/Message.java
index 7a4e3bd..eda811f 100644
--- a/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/Message.java
+++ b/service-registry/registry-lightweight/src/main/java/org/apache/servicecomb/registry/lightweight/Message.java
@@ -15,10 +15,9 @@
* limitations under the License.
*/
-package org.apache.servicecomb.zeroconfig;
+package org.apache.servicecomb.registry.lightweight;
-import org.apache.servicecomb.registry.lightweight.RegisterRequest;
-import org.apache.servicecomb.registry.lightweight.UnregisterRequest;
+import java.io.IOException;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
@@ -26,6 +25,8 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.JsonTypeInfo.As;
import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
+import io.vertx.core.json.jackson.DatabindCodec;
+
public class Message<T> {
public static <T> Message<T> of(MessageType type, T body) {
return new Message<T>()
@@ -61,4 +62,13 @@ public class Message<T> {
this.body = body;
return this;
}
+
+ public byte[] encode() throws IOException {
+ return DatabindCodec.mapper().writeValueAsBytes(this);
+ }
+
+ public static Message<?> decode(byte[] bytes, int length) throws IOException {
+ return DatabindCodec.mapper()
+ .readValue(bytes, 0, length, Message.class);
+ }
}
diff --git a/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/ZeroConfigServer.java b/service-registry/registry-lightweight/src/main/java/org/apache/servicecomb/registry/lightweight/MessageExecutor.java
similarity index 52%
rename from service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/ZeroConfigServer.java
rename to service-registry/registry-lightweight/src/main/java/org/apache/servicecomb/registry/lightweight/MessageExecutor.java
index 17ae8b2..9f5fa11 100644
--- a/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/ZeroConfigServer.java
+++ b/service-registry/registry-lightweight/src/main/java/org/apache/servicecomb/registry/lightweight/MessageExecutor.java
@@ -15,55 +15,47 @@
* limitations under the License.
*/
-package org.apache.servicecomb.zeroconfig;
+package org.apache.servicecomb.registry.lightweight;
-import java.net.SocketTimeoutException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
-import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
-import org.apache.servicecomb.registry.lightweight.RegisterRequest;
-import org.apache.servicecomb.registry.lightweight.RegistryServerService;
-import org.apache.servicecomb.registry.lightweight.Self;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
-public class ZeroConfigServer {
- private static final Logger LOGGER = LoggerFactory.getLogger(ZeroConfigServer.class);
-
+public class MessageExecutor {
private final Self self;
- private final Multicast multicast;
-
- private final RegistryServerService registryServerService;
+ private final StoreService storeService;
private final Map<MessageType, Consumer<?>> messageProcessors = new HashMap<>();
- private final Executor taskExecutor = Executors
- .newSingleThreadExecutor(runnable -> new Thread(runnable, "zero-config-server-task"));
+ private final ScheduledExecutorService taskExecutor = Executors
+ .newSingleThreadScheduledExecutor(runnable -> new Thread(runnable, "lightweight-message-executor"));
- public ZeroConfigServer(Self self, Multicast multicast, RegistryServerService registryServerService) {
+ public MessageExecutor(Self self, StoreService storeService) {
this.self = self;
- this.multicast = multicast;
- this.registryServerService = registryServerService;
+ this.storeService = storeService;
addMessageProcessor(MessageType.REGISTER, this::register);
- addMessageProcessor(MessageType.UNREGISTER, registryServerService::unregister);
+ addMessageProcessor(MessageType.UNREGISTER, storeService::unregister);
+ }
- Executors
- .newSingleThreadExecutor(runnable -> new Thread(runnable, "zero-config-server-recv"))
- .execute(this::recv);
+ public void startCheckDeadInstances(Duration interval) {
+ taskExecutor.scheduleAtFixedRate(
+ () -> storeService.deleteDeadInstances(interval),
+ 0, interval.getSeconds(), TimeUnit.SECONDS);
}
private void register(RegisterRequest request) {
if (request.isCrossApp() || Objects.equals(request.getAppId(), self.getAppId())) {
- registryServerService.register(request);
+ storeService.register(request);
}
}
@@ -71,21 +63,8 @@ public class ZeroConfigServer {
messageProcessors.put(messageType, messageProcessor);
}
- private void recv() {
- for (; ; ) {
- try {
- Message<?> message = multicast.recv();
- processMessage(message);
- } catch (SocketTimeoutException e) {
- registryServerService.deleteDeadInstances(Duration.ofSeconds(90));
- } catch (Exception e) {
- LOGGER.error("failed to receive or decode message.", e);
- }
- }
- }
-
@SuppressWarnings("unchecked")
- private <T> void processMessage(Message<T> message) {
+ public <T> void processMessage(Message<T> message) {
Consumer<T> consumer = (Consumer<T>) messageProcessors.get(message.getType());
taskExecutor.execute(() -> {
diff --git a/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/MessageType.java b/service-registry/registry-lightweight/src/main/java/org/apache/servicecomb/registry/lightweight/MessageType.java
similarity index 91%
rename from service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/MessageType.java
rename to service-registry/registry-lightweight/src/main/java/org/apache/servicecomb/registry/lightweight/MessageType.java
index 0df16ed..d3287ef 100644
--- a/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/MessageType.java
+++ b/service-registry/registry-lightweight/src/main/java/org/apache/servicecomb/registry/lightweight/MessageType.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.zeroconfig;
+package org.apache.servicecomb.registry.lightweight;
public enum MessageType {
REGISTER,
diff --git a/service-registry/registry-lightweight/src/main/java/org/apache/servicecomb/registry/lightweight/MicroserviceInfo.java b/service-registry/registry-lightweight/src/main/java/org/apache/servicecomb/registry/lightweight/MicroserviceInfo.java
index b8d9577..9b62bef 100644
--- a/service-registry/registry-lightweight/src/main/java/org/apache/servicecomb/registry/lightweight/MicroserviceInfo.java
+++ b/service-registry/registry-lightweight/src/main/java/org/apache/servicecomb/registry/lightweight/MicroserviceInfo.java
@@ -25,10 +25,10 @@ import org.apache.servicecomb.registry.api.registry.MicroserviceInstance;
public class MicroserviceInfo {
private Microservice microservice;
- private Map<String, String> schemasById;
-
private MicroserviceInstance instance;
+ private Map<String, String> schemasById;
+
public Microservice getMicroservice() {
return microservice;
}
diff --git a/service-registry/registry-lightweight/src/main/java/org/apache/servicecomb/registry/lightweight/RegistryServerService.java b/service-registry/registry-lightweight/src/main/java/org/apache/servicecomb/registry/lightweight/StoreService.java
similarity index 94%
rename from service-registry/registry-lightweight/src/main/java/org/apache/servicecomb/registry/lightweight/RegistryServerService.java
rename to service-registry/registry-lightweight/src/main/java/org/apache/servicecomb/registry/lightweight/StoreService.java
index ce6ad28..173305b 100644
--- a/service-registry/registry-lightweight/src/main/java/org/apache/servicecomb/registry/lightweight/RegistryServerService.java
+++ b/service-registry/registry-lightweight/src/main/java/org/apache/servicecomb/registry/lightweight/StoreService.java
@@ -33,8 +33,8 @@ import org.springframework.stereotype.Component;
import com.google.common.eventbus.EventBus;
@Component
-public class RegistryServerService {
- private static final Logger LOGGER = LoggerFactory.getLogger(RegistryServerService.class);
+public class StoreService {
+ private static final Logger LOGGER = LoggerFactory.getLogger(StoreService.class);
private final EventBus eventBus;
@@ -42,7 +42,7 @@ public class RegistryServerService {
private final DiscoveryClient discoveryClient;
- public RegistryServerService(EventBus eventBus, Store store, DiscoveryClient discoveryClient) {
+ public StoreService(EventBus eventBus, Store store, DiscoveryClient discoveryClient) {
this.store = store;
this.discoveryClient = discoveryClient;
this.eventBus = eventBus;
diff --git a/service-registry/registry-zero-config/src/test/java/org/apache/servicecomb/zeroconfig/multicast/MessageTest.java b/service-registry/registry-lightweight/src/test/java/org/apache/servicecomb/registry/lightweight/MessageTest.java
similarity index 87%
rename from service-registry/registry-zero-config/src/test/java/org/apache/servicecomb/zeroconfig/multicast/MessageTest.java
rename to service-registry/registry-lightweight/src/test/java/org/apache/servicecomb/registry/lightweight/MessageTest.java
index 957977f..c91a14b 100644
--- a/service-registry/registry-zero-config/src/test/java/org/apache/servicecomb/zeroconfig/multicast/MessageTest.java
+++ b/service-registry/registry-lightweight/src/test/java/org/apache/servicecomb/registry/lightweight/MessageTest.java
@@ -15,14 +15,10 @@
* limitations under the License.
*/
-package org.apache.servicecomb.zeroconfig.multicast;
+package org.apache.servicecomb.registry.lightweight;
import static org.assertj.core.api.Assertions.assertThat;
-import org.apache.servicecomb.registry.lightweight.RegisterRequest;
-import org.apache.servicecomb.registry.lightweight.UnregisterRequest;
-import org.apache.servicecomb.zeroconfig.Message;
-import org.apache.servicecomb.zeroconfig.MessageType;
import org.junit.jupiter.api.Test;
import io.vertx.core.json.Json;
@@ -41,7 +37,9 @@ class MessageTest {
+ "{\n"
+ " \"type\" : \"REGISTER\",\n"
+ " \"body\" : {\n"
+ + " \"appId\" : null,\n"
+ " \"serviceId\" : null,\n"
+ + " \"crossApp\" : false,\n"
+ " \"schemasSummary\" : null,\n"
+ " \"instanceId\" : null,\n"
+ " \"status\" : null,\n"
diff --git a/service-registry/registry-lightweight/src/test/java/org/apache/servicecomb/registry/lightweight/RegistryServerServiceTest.java b/service-registry/registry-lightweight/src/test/java/org/apache/servicecomb/registry/lightweight/StoreServiceTest.java
similarity index 95%
rename from service-registry/registry-lightweight/src/test/java/org/apache/servicecomb/registry/lightweight/RegistryServerServiceTest.java
rename to service-registry/registry-lightweight/src/test/java/org/apache/servicecomb/registry/lightweight/StoreServiceTest.java
index c46da8b..7774dbb 100644
--- a/service-registry/registry-lightweight/src/test/java/org/apache/servicecomb/registry/lightweight/RegistryServerServiceTest.java
+++ b/service-registry/registry-lightweight/src/test/java/org/apache/servicecomb/registry/lightweight/StoreServiceTest.java
@@ -38,7 +38,7 @@ import com.google.common.eventbus.Subscribe;
import io.vertx.core.json.Json;
-class RegistryServerServiceTest extends TestBase {
+class StoreServiceTest extends TestBase {
EventBus eventBus = new EventBus();
MockTicker ticker = new MockTicker();
@@ -47,7 +47,7 @@ class RegistryServerServiceTest extends TestBase {
DiscoveryClient discoveryClient = Mockito.mock(DiscoveryClient.class);
- RegistryServerService service = new RegistryServerService(eventBus, store, discoveryClient);
+ StoreService service = new StoreService(eventBus, store, discoveryClient);
@BeforeEach
void setUp() {
diff --git a/service-registry/registry-local/src/main/java/org/apache/servicecomb/localregistry/RegistryBean.java b/service-registry/registry-local/src/main/java/org/apache/servicecomb/localregistry/RegistryBean.java
index 8e3ca43..9ea27f6 100644
--- a/service-registry/registry-local/src/main/java/org/apache/servicecomb/localregistry/RegistryBean.java
+++ b/service-registry/registry-local/src/main/java/org/apache/servicecomb/localregistry/RegistryBean.java
@@ -26,7 +26,6 @@ import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
import org.apache.servicecomb.config.BootStrapProperties;
-import org.apache.servicecomb.registry.api.registry.MicroserviceFactory;
/**
* Configuration bean for local services. Bean configuration is token
diff --git a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/registry/RemoteServiceRegistry.java b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/registry/RemoteServiceRegistry.java
index 940ecd6..ca0df9b 100644
--- a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/registry/RemoteServiceRegistry.java
+++ b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/registry/RemoteServiceRegistry.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.servicecomb.serviceregistry.registry;
import java.util.List;
diff --git a/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/AbstractZeroConfigRegistration.java b/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/AbstractZeroConfigRegistration.java
new file mode 100644
index 0000000..6fa690f
--- /dev/null
+++ b/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/AbstractZeroConfigRegistration.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.zeroconfig;
+
+import static org.apache.servicecomb.zeroconfig.ZeroConfigConst.ORDER;
+
+import org.apache.servicecomb.registry.lightweight.AbstractLightweightRegistration;
+import org.springframework.beans.factory.annotation.Autowired;
+
+public abstract class AbstractZeroConfigRegistration extends AbstractLightweightRegistration {
+ protected Config config;
+
+ @Autowired
+ public AbstractZeroConfigRegistration setConfig(Config config) {
+ this.config = config;
+ return this;
+ }
+
+ @Override
+ public int getOrder() {
+ return ORDER;
+ }
+
+ @Override
+ public boolean enabled() {
+ return config.isEnabled();
+ }
+
+ @Override
+ public void run() {
+ // switch to registered status, before send register message
+ // otherwise send message maybe failed
+ postRegisteredEvent();
+ startRegister(config.getHeartbeatInterval());
+ }
+}
diff --git a/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/Config.java b/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/Config.java
index e82761b..9b69e04 100644
--- a/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/Config.java
+++ b/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/Config.java
@@ -14,23 +14,35 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.servicecomb.zeroconfig;
import static org.apache.servicecomb.zeroconfig.ZeroConfigConst.CFG_ADDRESS;
import static org.apache.servicecomb.zeroconfig.ZeroConfigConst.CFG_ENABLED;
import static org.apache.servicecomb.zeroconfig.ZeroConfigConst.CFG_GROUP;
+import static org.apache.servicecomb.zeroconfig.ZeroConfigConst.CFG_HEARTBEAT_INTERVAL;
+import static org.apache.servicecomb.zeroconfig.ZeroConfigConst.CFG_HEARTBEAT_LOST_TIMES;
+import static org.apache.servicecomb.zeroconfig.ZeroConfigConst.CFG_PULL_INTERVAL;
import static org.apache.servicecomb.zeroconfig.ZeroConfigConst.DEFAULT_ADDRESS;
import static org.apache.servicecomb.zeroconfig.ZeroConfigConst.DEFAULT_GROUP;
+import static org.apache.servicecomb.zeroconfig.ZeroConfigConst.DEFAULT_HEARTBEAT_INTERVAL;
+import static org.apache.servicecomb.zeroconfig.ZeroConfigConst.DEFAULT_HEARTBEAT_LOST_TIMES;
+import static org.apache.servicecomb.zeroconfig.ZeroConfigConst.DEFAULT_PULL_INTERVAL;
+
+import java.time.Duration;
import org.apache.servicecomb.config.DynamicProperties;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class Config {
- private final DynamicProperties dynamicProperties;
+ private DynamicProperties dynamicProperties;
- public Config(DynamicProperties dynamicProperties) {
+ @Autowired
+ public Config setDynamicProperties(DynamicProperties dynamicProperties) {
this.dynamicProperties = dynamicProperties;
+ return this;
}
public boolean isEnabled() {
@@ -45,4 +57,23 @@ public class Config {
public String getGroup() {
return dynamicProperties.getStringProperty(CFG_GROUP, DEFAULT_GROUP);
}
+
+ public Duration getHeartbeatInterval() {
+ String interval = dynamicProperties.getStringProperty(CFG_HEARTBEAT_INTERVAL, DEFAULT_HEARTBEAT_INTERVAL);
+ return toDuration(interval);
+ }
+
+ public Duration getCheckDeadInstancesInterval() {
+ int lostTimes = dynamicProperties.getIntProperty(CFG_HEARTBEAT_LOST_TIMES, DEFAULT_HEARTBEAT_LOST_TIMES);
+ return getHeartbeatInterval().multipliedBy(lostTimes);
+ }
+
+ public Duration getPullInterval() {
+ String interval = dynamicProperties.getStringProperty(CFG_PULL_INTERVAL, DEFAULT_PULL_INTERVAL);
+ return toDuration(interval);
+ }
+
+ private Duration toDuration(String interval) {
+ return Duration.parse("PT" + interval);
+ }
}
diff --git a/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/ZeroConfigConst.java b/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/ZeroConfigConst.java
index 118e864..2aa13e2 100644
--- a/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/ZeroConfigConst.java
+++ b/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/ZeroConfigConst.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.servicecomb.zeroconfig;
public interface ZeroConfigConst {
@@ -21,15 +22,33 @@ public interface ZeroConfigConst {
String PREFIX = "servicecomb.service.zero-config.";
+ String CFG_MODE = PREFIX + "mode";
+
String CFG_ENABLED = PREFIX + "enabled";
- String CFG_GROUP = PREFIX + "group";
+ String CFG_GROUP = PREFIX + "multicast.group";
+
+ String CFG_ADDRESS = PREFIX + "multicast.address";
+
+ String CFG_HEARTBEAT_INTERVAL = PREFIX + "heartbeat.interval";
+
+ String CFG_HEARTBEAT_LOST_TIMES = PREFIX + "heartbeat.lost-times";
- String CFG_ADDRESS = PREFIX + "address";
+ String CFG_PULL_INTERVAL = PREFIX + "pull-interval";
String DEFAULT_GROUP = "225.6.7.8";
String DEFAULT_ADDRESS = "0.0.0.0:6666";
+ String DEFAULT_HEARTBEAT_INTERVAL = "30s";
+
+ int DEFAULT_HEARTBEAT_LOST_TIMES = 3;
+
+ String DEFAULT_PULL_INTERVAL = "3s";
+
+ String MODE_MULTICAST = "multicast";
+
+ String MODE_LOCAL = "local";
+
int ORDER = 101;
}
diff --git a/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/ZeroConfigDiscovery.java b/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/ZeroConfigDiscovery.java
index 605e854..0cf7e93 100644
--- a/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/ZeroConfigDiscovery.java
+++ b/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/ZeroConfigDiscovery.java
@@ -18,42 +18,27 @@ package org.apache.servicecomb.zeroconfig;
import static org.apache.servicecomb.zeroconfig.ZeroConfigConst.ORDER;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.servicecomb.registry.api.registry.Microservice;
-import org.apache.servicecomb.registry.consumer.AppManager;
import org.apache.servicecomb.registry.lightweight.AbstractLightweightDiscovery;
-import org.apache.servicecomb.registry.lightweight.SchemaChangedEvent;
-import org.apache.servicecomb.registry.lightweight.store.Store;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import com.google.common.eventbus.EventBus;
-import com.google.common.eventbus.Subscribe;
-
@Component
public class ZeroConfigDiscovery extends AbstractLightweightDiscovery {
private static final String NAME = "zero-config discovery";
- private final Config config;
-
- private final AppManager appManager;
+ private Config config;
- public ZeroConfigDiscovery(Config config, Store store, EventBus eventBus, AppManager appManager) {
- super(store);
+ @Autowired
+ public ZeroConfigDiscovery setConfig(Config config) {
this.config = config;
- this.appManager = appManager;
-
- Executors
- .newSingleThreadScheduledExecutor(runnable -> new Thread(runnable, "zero-config-discovery"))
- .scheduleAtFixedRate(appManager::safePullInstances, 0, 3, TimeUnit.SECONDS);
- eventBus.register(this);
+ return this;
}
- @Subscribe
- public void onSchemaChanged(SchemaChangedEvent event) {
- Microservice microservice = event.getMicroservice();
- appManager.markWaitingDelete(microservice.getAppId(), microservice.getServiceName());
+ @Override
+ public void afterPropertiesSet() throws Exception {
+ super.afterPropertiesSet();
+
+ startPullInstances(config.getPullInterval());
}
@Override
diff --git a/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/ZeroConfigRegistration.java b/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/ZeroConfigRegistration.java
deleted file mode 100644
index ec9f057..0000000
--- a/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/ZeroConfigRegistration.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.servicecomb.zeroconfig;
-
-import static org.apache.servicecomb.zeroconfig.ZeroConfigConst.ORDER;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.servicecomb.registry.api.event.MicroserviceInstanceRegisteredEvent;
-import org.apache.servicecomb.registry.lightweight.AbstractLightweightRegistration;
-import org.apache.servicecomb.registry.lightweight.RegisterInstanceEvent;
-import org.apache.servicecomb.registry.lightweight.Self;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
-
-import com.google.common.eventbus.EventBus;
-import com.google.common.eventbus.Subscribe;
-
-@Component
-public class ZeroConfigRegistration extends AbstractLightweightRegistration {
- private static final Logger LOGGER = LoggerFactory.getLogger(ZeroConfigRegistration.class);
-
- private static final String NAME = "zero-config registration";
-
- private final Config config;
-
- private final Multicast multicast;
-
- private final Self self;
-
- private final EventBus eventBus;
-
- private final ScheduledExecutorService executorService = Executors
- .newSingleThreadScheduledExecutor(runnable -> new Thread(runnable, "zero-config-register"));
-
- public ZeroConfigRegistration(Config config, Multicast multicast, Self self, EventBus eventBus) {
- this.config = config;
- this.multicast = multicast;
- this.self = self;
- this.eventBus = eventBus;
-
- eventBus.register(this);
- }
-
- private void sendRegister() {
- try {
- multicast.send(MessageType.REGISTER, self.buildRegisterRequest());
- } catch (Exception e) {
- LOGGER.error("register failed.", e);
- }
- }
-
- @Subscribe
- public void onRegisterInstance(RegisterInstanceEvent event) {
- if (event.getInstance().getInstanceId().equals(self.getInstance().getInstanceId())) {
- return;
- }
-
- sendRegister();
- }
-
- @Override
- public boolean enabled() {
- return config.isEnabled();
- }
-
- @Override
- public void init() {
-
- }
-
- @Override
- public void run() {
- // switch to registered status, before send register message
- // otherwise send message maybe failed
- eventBus.post(new MicroserviceInstanceRegisteredEvent(
- NAME,
- self.getInstance().getInstanceId(),
- false
- ));
- executorService.scheduleAtFixedRate(this::sendRegister, 0, 10, TimeUnit.SECONDS);
- }
-
- @Override
- public void destroy() {
- try {
- multicast.send(MessageType.UNREGISTER, self.buildUnregisterRequest());
- } catch (Exception e) {
- LOGGER.error("unregister failed.", e);
- }
- }
-
- @Override
- public int getOrder() {
- return ORDER;
- }
-
- @Override
- public String name() {
- return NAME;
- }
-}
diff --git a/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/ZeroConfigConst.java b/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/local/ConditionOnLocal.java
similarity index 54%
copy from service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/ZeroConfigConst.java
copy to service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/local/ConditionOnLocal.java
index 118e864..7fc3ab8 100644
--- a/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/ZeroConfigConst.java
+++ b/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/local/ConditionOnLocal.java
@@ -1,35 +1,33 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.servicecomb.zeroconfig;
-
-public interface ZeroConfigConst {
- int MAX_PACKET_SIZE = 1024;
-
- String PREFIX = "servicecomb.service.zero-config.";
-
- String CFG_ENABLED = PREFIX + "enabled";
-
- String CFG_GROUP = PREFIX + "group";
-
- String CFG_ADDRESS = PREFIX + "address";
-
- String DEFAULT_GROUP = "225.6.7.8";
-
- String DEFAULT_ADDRESS = "0.0.0.0:6666";
-
- int ORDER = 101;
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.zeroconfig.local;
+
+import static org.apache.servicecomb.zeroconfig.ZeroConfigConst.CFG_MODE;
+import static org.apache.servicecomb.zeroconfig.ZeroConfigConst.MODE_LOCAL;
+
+import org.springframework.context.annotation.Condition;
+import org.springframework.context.annotation.ConditionContext;
+import org.springframework.core.type.AnnotatedTypeMetadata;
+
+public class ConditionOnLocal implements Condition {
+ @Override
+ public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
+ String mode = context.getEnvironment().getProperty(CFG_MODE);
+ return MODE_LOCAL.equals(mode);
+ }
+}
diff --git a/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/local/LocalRegistration.java b/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/local/LocalRegistration.java
new file mode 100644
index 0000000..6607624
--- /dev/null
+++ b/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/local/LocalRegistration.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.zeroconfig.local;
+
+import org.apache.servicecomb.registry.lightweight.Message;
+import org.apache.servicecomb.registry.lightweight.MessageExecutor;
+import org.apache.servicecomb.registry.lightweight.MessageType;
+import org.apache.servicecomb.registry.lightweight.RegisterRequest;
+import org.apache.servicecomb.registry.lightweight.UnregisterRequest;
+import org.apache.servicecomb.zeroconfig.AbstractZeroConfigRegistration;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Conditional;
+import org.springframework.stereotype.Component;
+
+/**
+ * for single node environments
+ */
+@Component
+@Conditional(ConditionOnLocal.class)
+public class LocalRegistration extends AbstractZeroConfigRegistration {
+ private static final String NAME = "zero-config-local";
+
+ private MessageExecutor messageExecutor;
+
+ @Autowired
+ public LocalRegistration setMessageExecutor(MessageExecutor messageExecutor) {
+ this.messageExecutor = messageExecutor;
+ return this;
+ }
+
+ @Override
+ public String name() {
+ return NAME;
+ }
+
+ @Override
+ protected void doSendRegister() {
+ Message<RegisterRequest> message = Message.of(MessageType.REGISTER, self.buildRegisterRequest());
+ messageExecutor.processMessage(message);
+ }
+
+ @Override
+ protected void doSendUnregister() {
+ Message<UnregisterRequest> message = Message.of(MessageType.UNREGISTER, self.buildUnregisterRequest());
+ messageExecutor.processMessage(message);
+ }
+}
diff --git a/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/ZeroConfigConst.java b/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/multicast/ConditionOnMulticast.java
similarity index 53%
copy from service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/ZeroConfigConst.java
copy to service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/multicast/ConditionOnMulticast.java
index 118e864..656498c 100644
--- a/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/ZeroConfigConst.java
+++ b/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/multicast/ConditionOnMulticast.java
@@ -1,35 +1,33 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.servicecomb.zeroconfig;
-
-public interface ZeroConfigConst {
- int MAX_PACKET_SIZE = 1024;
-
- String PREFIX = "servicecomb.service.zero-config.";
-
- String CFG_ENABLED = PREFIX + "enabled";
-
- String CFG_GROUP = PREFIX + "group";
-
- String CFG_ADDRESS = PREFIX + "address";
-
- String DEFAULT_GROUP = "225.6.7.8";
-
- String DEFAULT_ADDRESS = "0.0.0.0:6666";
-
- int ORDER = 101;
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.zeroconfig.multicast;
+
+import static org.apache.servicecomb.zeroconfig.ZeroConfigConst.CFG_MODE;
+import static org.apache.servicecomb.zeroconfig.ZeroConfigConst.MODE_MULTICAST;
+
+import org.springframework.context.annotation.Condition;
+import org.springframework.context.annotation.ConditionContext;
+import org.springframework.core.type.AnnotatedTypeMetadata;
+
+public class ConditionOnMulticast implements Condition {
+ @Override
+ public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
+ String mode = context.getEnvironment().getProperty(CFG_MODE);
+ return mode == null || MODE_MULTICAST.equals(mode);
+ }
+}
diff --git a/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/Multicast.java b/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/multicast/Multicast.java
similarity index 82%
rename from service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/Multicast.java
rename to service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/multicast/Multicast.java
index f209e81..7d07b26 100644
--- a/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/Multicast.java
+++ b/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/multicast/Multicast.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.zeroconfig;
+package org.apache.servicecomb.zeroconfig.multicast;
import java.io.IOException;
import java.net.DatagramPacket;
@@ -25,15 +25,19 @@ import java.net.MulticastSocket;
import java.net.UnknownHostException;
import java.util.concurrent.TimeUnit;
+import org.apache.servicecomb.registry.lightweight.Message;
+import org.apache.servicecomb.registry.lightweight.MessageType;
+import org.apache.servicecomb.zeroconfig.Config;
+import org.apache.servicecomb.zeroconfig.ZeroConfigConst;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.context.annotation.Conditional;
import org.springframework.stereotype.Component;
import com.google.common.net.HostAndPort;
-import io.vertx.core.json.jackson.DatabindCodec;
-
@Component
+@Conditional(ConditionOnMulticast.class)
public class Multicast {
private static final Logger LOGGER = LoggerFactory.getLogger(Multicast.class);
@@ -59,6 +63,7 @@ public class Multicast {
this.multicastSocket.setSoTimeout((int) TimeUnit.SECONDS.toMillis(5));
}
+
@SuppressWarnings("UnstableApiUsage")
private InetSocketAddress initBindAddress(Config config) {
HostAndPort hostAndPort = HostAndPort.fromString(config.getAddress());
@@ -70,8 +75,7 @@ public class Multicast {
}
public <T> void send(MessageType type, T body) throws IOException {
- Message<T> message = Message.of(type, body);
- byte[] buffer = DatabindCodec.mapper().writeValueAsBytes(message);
+ byte[] buffer = Message.of(type, body).encode();
DatagramPacket packet = new DatagramPacket(buffer, buffer.length, group, bindAddress.getPort());
multicastSocket.send(packet);
}
@@ -79,7 +83,6 @@ public class Multicast {
public Message<?> recv() throws IOException {
multicastSocket.receive(recvPacket);
- return DatabindCodec.mapper()
- .readValue(recvPacket.getData(), 0, recvPacket.getLength(), Message.class);
+ return Message.decode(recvPacket.getData(), recvPacket.getLength());
}
}
diff --git a/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/multicast/MulticastRegistration.java b/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/multicast/MulticastRegistration.java
new file mode 100644
index 0000000..f2788bd
--- /dev/null
+++ b/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/multicast/MulticastRegistration.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.zeroconfig.multicast;
+
+import java.io.IOException;
+
+import org.apache.servicecomb.registry.lightweight.MessageType;
+import org.apache.servicecomb.registry.lightweight.RegisterInstanceEvent;
+import org.apache.servicecomb.zeroconfig.AbstractZeroConfigRegistration;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Conditional;
+import org.springframework.stereotype.Component;
+
+import com.google.common.eventbus.Subscribe;
+
+@Component
+@Conditional(ConditionOnMulticast.class)
+@SuppressWarnings("UnstableApiUsage")
+public class MulticastRegistration extends AbstractZeroConfigRegistration implements InitializingBean {
+ private static final String NAME = "zero-config-multicast";
+
+ private Multicast multicast;
+
+ @Autowired
+ public MulticastRegistration setMulticast(Multicast multicast) {
+ this.multicast = multicast;
+ return this;
+ }
+
+ @Override
+ public String name() {
+ return NAME;
+ }
+
+ @Override
+ public void afterPropertiesSet() {
+ eventBus.register(this);
+ }
+
+ @Override
+ protected void doSendRegister() throws IOException {
+ multicast.send(MessageType.REGISTER, self.buildRegisterRequest());
+ }
+
+ @Override
+ protected void doSendUnregister() throws IOException {
+ multicast.send(MessageType.UNREGISTER, self.buildUnregisterRequest());
+ }
+
+ @SuppressWarnings("unused")
+ @Subscribe
+ public void onRegisterInstance(RegisterInstanceEvent event) {
+ if (event.getInstance().getInstanceId().equals(self.getInstance().getInstanceId())) {
+ return;
+ }
+
+ sendRegister();
+ }
+}
diff --git a/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/multicast/MulticastServer.java b/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/multicast/MulticastServer.java
new file mode 100644
index 0000000..c0d04d1
--- /dev/null
+++ b/service-registry/registry-zero-config/src/main/java/org/apache/servicecomb/zeroconfig/multicast/MulticastServer.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.zeroconfig.multicast;
+
+import java.net.SocketTimeoutException;
+import java.util.concurrent.Executors;
+
+import org.apache.servicecomb.registry.lightweight.Message;
+import org.apache.servicecomb.registry.lightweight.MessageExecutor;
+import org.apache.servicecomb.zeroconfig.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.annotation.Conditional;
+import org.springframework.stereotype.Component;
+
+@Component
+@Conditional(ConditionOnMulticast.class)
+@SuppressWarnings("unused")
+public class MulticastServer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(MulticastServer.class);
+
+ private final Multicast multicast;
+
+ private final MessageExecutor messageExecutor;
+
+ public MulticastServer(Config config, Multicast multicast, MessageExecutor messageExecutor) {
+ this.multicast = multicast;
+ this.messageExecutor = messageExecutor;
+
+ Executors
+ .newSingleThreadExecutor(runnable -> new Thread(runnable, "multicast-server-recv"))
+ .execute(this::recv);
+ messageExecutor.startCheckDeadInstances(config.getCheckDeadInstancesInterval());
+ }
+
+ @SuppressWarnings("InfiniteLoopStatement")
+ private void recv() {
+ for (; ; ) {
+ Message<?> message = recvMsg();
+ if (message == null) {
+ continue;
+ }
+
+ messageExecutor.processMessage(message);
+ }
+ }
+
+ private Message<?> recvMsg() {
+ try {
+ return multicast.recv();
+ } catch (SocketTimeoutException ignore) {
+ return null;
+ } catch (Exception e) {
+ LOGGER.error("failed to receive or decode message.", e);
+ return null;
+ }
+ }
+}