You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by GitBox <gi...@apache.org> on 2018/08/01 10:01:19 UTC

[GitHub] heyile closed pull request #842: [SCB-782]support revision check when use pull mode with config center

heyile closed pull request #842: [SCB-782]support revision check when use pull mode with config center
URL: https://github.com/apache/incubator-servicecomb-java-chassis/pull/842
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ConfigCenterClient.java b/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ConfigCenterClient.java
index a58de4da2..8ae181f15 100644
--- a/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ConfigCenterClient.java
+++ b/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ConfigCenterClient.java
@@ -136,7 +136,7 @@ public void connectServer() {
     }
     refreshMembers(memberDiscovery);
     ConfigRefresh refreshTask = new ConfigRefresh(parseConfigUtils, memberDiscovery);
-    refreshTask.run(true);
+    refreshTask.run();
     executor.scheduleWithFixedDelay(refreshTask,
         firstRefreshInterval,
         refreshInterval,
@@ -160,13 +160,14 @@ private void refreshMembers(MemberDiscovery memberDiscovery) {
       String configCenter = memberDiscovery.getConfigServer();
       IpPort ipPort = NetUtils.parseIpPortFromURI(configCenter);
       clientMgr.findThreadBindClientPool().runOnContext(client -> {
-        HttpClientRequest request = client.get(ipPort.getPort(), ipPort.getHostOrIp(), uriConst.MEMBERS, rsp -> {
-          if (rsp.statusCode() == HttpResponseStatus.OK.code()) {
-            rsp.bodyHandler(buf -> {
-              memberDiscovery.refreshMembers(buf.toJsonObject());
+        HttpClientRequest request =
+            client.get(ipPort.getPort(), ipPort.getHostOrIp(), uriConst.MEMBERS, rsp -> {
+              if (rsp.statusCode() == HttpResponseStatus.OK.code()) {
+                rsp.bodyHandler(buf -> {
+                  memberDiscovery.refreshMembers(buf.toJsonObject());
+                });
+              }
             });
-          }
-        });
         SignRequest signReq = createSignRequest(request.method().toString(),
             configCenter + uriConst.MEMBERS,
             new HashMap<>(),
@@ -174,7 +175,8 @@ private void refreshMembers(MemberDiscovery memberDiscovery) {
         if (ConfigCenterConfig.INSTANCE.getToken() != null) {
           request.headers().add("X-Auth-Token", ConfigCenterConfig.INSTANCE.getToken());
         }
-        authHeaderProviders.forEach(provider -> request.headers().addAll(provider.getSignAuthHeaders(signReq)));
+        authHeaderProviders.forEach(provider -> request.headers()
+            .addAll(provider.getSignAuthHeaders(signReq)));
         request.exceptionHandler(e -> {
           LOGGER.error("Fetch member from {} failed. Error message is [{}].", configCenter, e.getMessage());
         });
@@ -235,12 +237,12 @@ private HttpClientOptions createHttpClientOptions() {
     }
 
     public void run(boolean wait) {
-      // this will be single threaded, so we don't care about concurrent
-      // staffs
       try {
         String configCenter = memberdis.getConfigServer();
         if (refreshMode == 1) {
-          refreshConfig(configCenter, wait);
+          //must make sure there is only one thread is invoking refreshConfigMethod
+          //hence, we change wait parameter from false to true
+          refreshConfig(configCenter, true);
         } else if (!isWatching) {
           // 重新监听时需要先加载,避免在断开期间丢失变更
           refreshConfig(configCenter, wait);
@@ -302,6 +304,7 @@ public void doWatch(String configCenter)
                 LOGGER.info("watching config recieved {}", action);
                 Map<String, Object> mAction = action.toJsonObject().getMap();
                 if ("CREATE".equals(mAction.get("action"))) {
+                  //event loop can not be blocked,we just keep nothing changed in push mode
                   refreshConfig(configCenter, false);
                 } else if ("MEMBER_CHANGE".equals(mAction.get("action"))) {
                   refreshMembers(memberdis);
@@ -314,7 +317,8 @@ public void doWatch(String configCenter)
               waiter.countDown();
             },
             e -> {
-              LOGGER.error("watcher connect to config center {} refresh port {} failed. Error message is [{}]",
+              LOGGER.error(
+                  "watcher connect to config center {} refresh port {} failed. Error message is [{}]",
                   configCenter,
                   refreshPort,
                   e.getMessage());
@@ -352,12 +356,14 @@ public void refreshConfig(String configcenter, boolean wait) {
       CountDownLatch latch = new CountDownLatch(1);
       String encodeServiceName = "";
       try {
-        encodeServiceName = URLEncoder.encode(StringUtils.deleteWhitespace(serviceName), StandardCharsets.UTF_8.name());
+        encodeServiceName =
+            URLEncoder.encode(StringUtils.deleteWhitespace(serviceName), StandardCharsets.UTF_8.name());
       } catch (UnsupportedEncodingException e) {
         LOGGER.error("encode failed. Error message: {}", e.getMessage());
         encodeServiceName = StringUtils.deleteWhitespace(serviceName);
       }
-      String path = uriConst.ITEMS + "?dimensionsInfo=" + encodeServiceName;
+      String path = uriConst.ITEMS + "?dimensionsInfo=" + encodeServiceName + "&revision="
+          + ParseConfigUtils.CURRENT_VERSION_INFO;
       clientMgr.findThreadBindClientPool().runOnContext(client -> {
         IpPort ipPort = NetUtils.parseIpPortFromURI(configcenter);
         HttpClientRequest request = client.get(ipPort.getPort(), ipPort.getHostOrIp(), path, rsp -> {
@@ -370,11 +376,17 @@ public void refreshConfig(String configcenter, boolean wait) {
                         }));
                 EventManager.post(new ConnSuccEvent());
               } catch (IOException e) {
-                EventManager.post(new ConnFailEvent("config refresh result parse fail " + e.getMessage()));
-                LOGGER.error("Config refresh from {} failed. Error message is [{}].", configcenter, e.getMessage());
+                EventManager.post(new ConnFailEvent(
+                    "config refresh result parse fail " + e.getMessage()));
+                LOGGER.error("Config refresh from {} failed. Error message is [{}].",
+                    configcenter,
+                    e.getMessage());
               }
               latch.countDown();
             });
+          } else if (rsp.statusCode() == HttpResponseStatus.NOT_MODIFIED.code()) {
+            //nothing changed
+            latch.countDown();
           } else {
             rsp.bodyHandler(buf -> {
               LOGGER.error("Server error message is [{}].", buf);
@@ -383,7 +395,7 @@ public void refreshConfig(String configcenter, boolean wait) {
             EventManager.post(new ConnFailEvent("fetch config fail"));
             LOGGER.error("Config refresh from {} failed.", configcenter);
           }
-        });
+        }).setTimeout((BOOTUP_WAIT_TIME - 1) * 1000);
         Map<String, String> headers = new HashMap<>();
         headers.put("x-domain-name", tenantName);
         if (ConfigCenterConfig.INSTANCE.getToken() != null) {
@@ -398,7 +410,9 @@ public void refreshConfig(String configcenter, boolean wait) {
                 null))));
         request.exceptionHandler(e -> {
           EventManager.post(new ConnFailEvent("fetch config fail"));
-          LOGGER.error("Config refresh from {} failed. Error message is [{}].", configcenter, e.getMessage());
+          LOGGER.error("Config refresh from {} failed. Error message is [{}].",
+              configcenter,
+              e.getMessage());
           latch.countDown();
         });
         request.end();
diff --git a/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ParseConfigUtils.java b/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ParseConfigUtils.java
index a891088fe..566764397 100644
--- a/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ParseConfigUtils.java
+++ b/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ParseConfigUtils.java
@@ -40,6 +40,8 @@
 
   public static final Map<String, Object> flatItems = new HashMap<>();
 
+  public static String CURRENT_VERSION_INFO = "default";
+
   private UpdateHandler updateHandler;
 
   public ParseConfigUtils(UpdateHandler updateHandler) {
@@ -47,6 +49,12 @@ public ParseConfigUtils(UpdateHandler updateHandler) {
   }
 
   public void refreshConfigItems(Map<String, Map<String, Object>> remoteItems) {
+
+    CURRENT_VERSION_INFO =
+        remoteItems.getOrDefault("revision", new HashMap<>()).getOrDefault("version", "default").toString();
+    //make sure the CURRENT_VERSION_INFO != ""
+    CURRENT_VERSION_INFO = CURRENT_VERSION_INFO.equals("") ? "default" : CURRENT_VERSION_INFO;
+    remoteItems.remove("revision");//the key revision is not the config setting
     multiDimensionItems.clear();
     multiDimensionItems.putAll(remoteItems);
     doRefreshItems();
diff --git a/dynamic-config/config-cc/src/test/java/org/apache/servicecomb/config/client/TestConfigCenterClient.java b/dynamic-config/config-cc/src/test/java/org/apache/servicecomb/config/client/TestConfigCenterClient.java
index 83398f107..cc81d3eda 100644
--- a/dynamic-config/config-cc/src/test/java/org/apache/servicecomb/config/client/TestConfigCenterClient.java
+++ b/dynamic-config/config-cc/src/test/java/org/apache/servicecomb/config/client/TestConfigCenterClient.java
@@ -47,6 +47,7 @@
 import io.vertx.core.http.HttpMethod;
 import io.vertx.core.http.WebSocket;
 import io.vertx.core.json.JsonObject;
+
 import mockit.Deencapsulation;
 import mockit.Expectations;
 import mockit.Mock;
@@ -115,7 +116,77 @@ public void testConnectRefreshModeTwo() {
 
   @SuppressWarnings("unchecked")
   @Test
-  public void testConfigRefresh(@Mocked ClientPoolManager<HttpClientWithContext> clientMgr,
+  public void testConfigRefreshModeOne(@Mocked ClientPoolManager<HttpClientWithContext> clientMgr,
+      @Mocked HttpClientWithContext httpClientWithContext) {
+    String version1 = refreshAndGetCurrentRevision(clientMgr, httpClientWithContext, 200, "huawei");
+    //test the sdk get and change the latestRevision
+    Assert.assertEquals("huawei", version1);
+    String version2 = refreshAndGetCurrentRevision(clientMgr, httpClientWithContext, 304, "rkd");
+    //test that when return code is 304, the sdk do not change the latestRevision
+    Assert.assertNotEquals("rkd", version2);
+    String version3 = refreshAndGetCurrentRevision(clientMgr, httpClientWithContext, 200, "");
+    //make sure the current version is not ""
+    Assert.assertNotEquals("", version3);
+  }
+
+  @SuppressWarnings("unchecked")
+  private String refreshAndGetCurrentRevision(ClientPoolManager<HttpClientWithContext> clientMgr,
+      HttpClientWithContext httpClientWithContext, int statusCode, String version) {
+
+    ConfigCenterConfigurationSourceImpl impl = new ConfigCenterConfigurationSourceImpl();
+    UpdateHandler updateHandler = impl.new UpdateHandler();
+    HttpClientRequest request = Mockito.mock(HttpClientRequest.class);
+    Mockito.when(request.headers()).thenReturn(MultiMap.caseInsensitiveMultiMap());
+    Buffer rsp = Mockito.mock(Buffer.class);
+    Mockito.when(rsp.toString())
+        .thenReturn(String.format(
+            "{\"application\":{\"3\":\"2\",\"aa\":\"1\"},\"vmalledge\":{\"aa\":\"3\"},\"revision\": { \"version\": \"%s\"} }",
+            version));
+
+    HttpClientResponse httpClientResponse = Mockito.mock(HttpClientResponse.class);
+    Mockito.when(httpClientResponse.bodyHandler(Mockito.any(Handler.class))).then(invocation -> {
+      Handler<Buffer> handler = invocation.getArgumentAt(0, Handler.class);
+      handler.handle(rsp);
+      return null;
+    });
+    Mockito.when(httpClientResponse.statusCode()).thenReturn(statusCode);
+
+    HttpClient httpClient = Mockito.mock(HttpClient.class);
+    Mockito.when(
+        httpClient.get(Mockito.anyInt(), Mockito.anyString(), Mockito.anyString(), Mockito.any(Handler.class)))
+        .then(invocation -> {
+          Handler<HttpClientResponse> handler = invocation.getArgumentAt(3, Handler.class);
+          handler.handle(httpClientResponse);
+          return request;
+        });
+
+    new MockUp<HttpClientWithContext>() {
+      @Mock
+      public void runOnContext(RunHandler handler) {
+        handler.run(httpClient);
+      }
+    };
+    new Expectations() {
+      {
+        clientMgr.findThreadBindClientPool();
+        result = httpClientWithContext;
+      }
+    };
+
+    ConfigCenterClient cc = new ConfigCenterClient(updateHandler);
+    Deencapsulation.setField(cc, "clientMgr", clientMgr);
+    ParseConfigUtils parseConfigUtils = new ParseConfigUtils(updateHandler);
+    MemberDiscovery memberdis = new MemberDiscovery(Arrays.asList("http://configcentertest:30103"));
+    ConfigRefresh refresh = cc.new ConfigRefresh(parseConfigUtils, memberdis);
+    Deencapsulation.setField(cc, "refreshMode", 1);
+    refresh.run();
+    String currentVersionInfo = Deencapsulation.getField(parseConfigUtils, "CURRENT_VERSION_INFO").toString();
+    return currentVersionInfo;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testConfigRefreshModeZero(@Mocked ClientPoolManager<HttpClientWithContext> clientMgr,
       @Mocked HttpClientWithContext httpClientWithContext) {
     ConfigCenterConfigurationSourceImpl impl = new ConfigCenterConfigurationSourceImpl();
     UpdateHandler updateHandler = impl.new UpdateHandler();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services