You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/06/05 05:23:20 UTC

[32/67] [abbrv] kylin git commit: Broadercaster should allow dynamic rest server list

Broadercaster should allow dynamic rest server list


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/3fbf90ae
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/3fbf90ae
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/3fbf90ae

Branch: refs/heads/master
Commit: 3fbf90aed1bd78c1f44f9fd1f37fc34ffa704762
Parents: eafbe73
Author: shaofengshi <sh...@apache.org>
Authored: Fri May 26 17:55:36 2017 +0800
Committer: hongbin ma <ma...@kyligence.io>
Committed: Sat May 27 16:20:07 2017 +0800

----------------------------------------------------------------------
 .../kylin/common/restclient/RestClient.java     | 11 +++++++++-
 .../kylin/metadata/cachesync/Broadcaster.java   | 23 ++++++++++++--------
 2 files changed, 24 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/3fbf90ae/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
index 33a4e7a..fc34a6b 100644
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
+++ b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
@@ -33,6 +33,7 @@ import org.apache.http.HttpResponse;
 import org.apache.http.auth.AuthScope;
 import org.apache.http.auth.UsernamePasswordCredentials;
 import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.HttpClient;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.client.methods.HttpPut;
@@ -65,6 +66,10 @@ public class RestClient {
     private static final int HTTP_CONNECTION_TIMEOUT_MS = 30000;
     private static final int HTTP_SOCKET_TIMEOUT_MS = 120000;
 
+    public static final String SCHEME_HTTP = "http://";
+
+    public static final String KYLIN_API_PATH = "/kylin/api";
+
     public static boolean matchFullRestPattern(String uri) {
         Matcher m = fullRestPattern.matcher(uri);
         return m.matches();
@@ -97,7 +102,7 @@ public class RestClient {
         this.port = port;
         this.userName = userName;
         this.password = password;
-        this.baseUrl = "http://" + host + ":" + port + "/kylin/api";
+        this.baseUrl = SCHEME_HTTP + host + ":" + port + KYLIN_API_PATH;
 
         final HttpParams httpParams = new BasicHttpParams();
         HttpConnectionParams.setSoTimeout(httpParams, HTTP_SOCKET_TIMEOUT_MS);
@@ -114,6 +119,10 @@ public class RestClient {
     }
 
     public void wipeCache(String entity, String event, String cacheKey) throws IOException {
+        wipeCache(client, baseUrl, entity, event, cacheKey);
+    }
+
+    public static void wipeCache(HttpClient client, String baseUrl, String entity, String event, String cacheKey) throws IOException {
         String url = baseUrl + "/cache/" + entity + "/" + cacheKey + "/" + event;
         HttpPut request = new HttpPut(url);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/3fbf90ae/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
index 1394f7b..35d2f42 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
@@ -32,6 +32,11 @@ import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.http.client.HttpClient;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.params.BasicHttpParams;
+import org.apache.http.params.HttpConnectionParams;
+import org.apache.http.params.HttpParams;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.restclient.RestClient;
 import org.apache.kylin.common.util.DaemonThreadFactory;
@@ -104,29 +109,29 @@ public class Broadcaster {
         final String[] nodes = config.getRestServers();
         if (nodes == null || nodes.length < 1) {
             logger.warn("There is no available rest server; check the 'kylin.server.cluster-servers' config");
-            broadcastEvents = null; // disable the broadcaster
-            return;
         }
         logger.debug(nodes.length + " nodes in the cluster: " + Arrays.toString(nodes));
 
         Executors.newSingleThreadExecutor(new DaemonThreadFactory()).execute(new Runnable() {
             @Override
             public void run() {
-                final List<RestClient> restClients = Lists.newArrayList();
-                for (String node : config.getRestServers()) {
-                    restClients.add(new RestClient(node));
-                }
-                final ExecutorService wipingCachePool = Executors.newFixedThreadPool(restClients.size(), new DaemonThreadFactory());
+                final HttpParams httpParams = new BasicHttpParams();
+                HttpConnectionParams.setConnectionTimeout(httpParams, 3000);
+
+                final HttpClient client = new DefaultHttpClient(httpParams);
+
+                final ExecutorService wipingCachePool = Executors.newFixedThreadPool(3, new DaemonThreadFactory());
                 while (true) {
                     try {
                         final BroadcastEvent broadcastEvent = broadcastEvents.takeFirst();
+                        logger.debug("Servers in the cluster: " + Arrays.toString(config.getRestServers()));
                         logger.info("Announcing new broadcast event: " + broadcastEvent);
-                        for (final RestClient restClient : restClients) {
+                        for (final String address : config.getRestServers()) {
                             wipingCachePool.execute(new Runnable() {
                                 @Override
                                 public void run() {
                                     try {
-                                        restClient.wipeCache(broadcastEvent.getEntity(), broadcastEvent.getEvent(), broadcastEvent.getCacheKey());
+                                        RestClient.wipeCache(client, RestClient.SCHEME_HTTP + address + RestClient.KYLIN_API_PATH,  broadcastEvent.getEntity(), broadcastEvent.getEvent(), broadcastEvent.getCacheKey());
                                     } catch (IOException e) {
                                         logger.warn("Thread failed during wipe cache at " + broadcastEvent, e);
                                     }