You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/06/05 05:23:19 UTC

[31/67] [abbrv] kylin git commit: KYLIN-2619 refine Broadcaster

KYLIN-2619 refine Broadcaster


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/87d5d8db
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/87d5d8db
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/87d5d8db

Branch: refs/heads/master
Commit: 87d5d8db276b590c86c473db5997414285b2d689
Parents: 3fbf90a
Author: shaofengshi <sh...@apache.org>
Authored: Sat May 27 14:27:39 2017 +0800
Committer: hongbin ma <ma...@kyligence.io>
Committed: Sat May 27 16:20:07 2017 +0800

----------------------------------------------------------------------
 .../kylin/common/restclient/RestClient.java     | 10 +++----
 .../kylin/metadata/cachesync/Broadcaster.java   | 28 +++++++++++---------
 2 files changed, 18 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/87d5d8db/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
index fc34a6b..13490cb 100644
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
+++ b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
@@ -33,7 +33,6 @@ import org.apache.http.HttpResponse;
 import org.apache.http.auth.AuthScope;
 import org.apache.http.auth.UsernamePasswordCredentials;
 import org.apache.http.client.CredentialsProvider;
-import org.apache.http.client.HttpClient;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.client.methods.HttpPut;
@@ -119,19 +118,16 @@ public class RestClient {
     }
 
     public void wipeCache(String entity, String event, String cacheKey) throws IOException {
-        wipeCache(client, baseUrl, entity, event, cacheKey);
-    }
-
-    public static void wipeCache(HttpClient client, String baseUrl, String entity, String event, String cacheKey) throws IOException {
         String url = baseUrl + "/cache/" + entity + "/" + cacheKey + "/" + event;
         HttpPut request = new HttpPut(url);
 
         try {
             HttpResponse response = client.execute(request);
-            String msg = EntityUtils.toString(response.getEntity());
 
-            if (response.getStatusLine().getStatusCode() != 200)
+            if (response.getStatusLine().getStatusCode() != 200) {
+                String msg = EntityUtils.toString(response.getEntity());
                 throw new IOException("Invalid response " + response.getStatusLine().getStatusCode() + " with cache wipe url " + url + "\n" + msg);
+            }
         } catch (Exception ex) {
             throw new IOException(ex);
         } finally {

http://git-wip-us.apache.org/repos/asf/kylin/blob/87d5d8db/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
index 35d2f42..4a8c6d3 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
@@ -29,14 +29,12 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.http.client.HttpClient;
-import org.apache.http.impl.client.DefaultHttpClient;
-import org.apache.http.params.BasicHttpParams;
-import org.apache.http.params.HttpConnectionParams;
-import org.apache.http.params.HttpParams;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.restclient.RestClient;
 import org.apache.kylin.common.util.DaemonThreadFactory;
@@ -115,23 +113,27 @@ public class Broadcaster {
         Executors.newSingleThreadExecutor(new DaemonThreadFactory()).execute(new Runnable() {
             @Override
             public void run() {
-                final HttpParams httpParams = new BasicHttpParams();
-                HttpConnectionParams.setConnectionTimeout(httpParams, 3000);
+                final Map<String, RestClient> restClientMap = Maps.newHashMap();
+                final ExecutorService wipingCachePool = new ThreadPoolExecutor(1, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
 
-                final HttpClient client = new DefaultHttpClient(httpParams);
-
-                final ExecutorService wipingCachePool = Executors.newFixedThreadPool(3, new DaemonThreadFactory());
                 while (true) {
                     try {
                         final BroadcastEvent broadcastEvent = broadcastEvents.takeFirst();
-                        logger.debug("Servers in the cluster: " + Arrays.toString(config.getRestServers()));
+                        String[] restServers = config.getRestServers();
+                        logger.info("Servers in the cluster: " + Arrays.toString(restServers));
+                        for (final String node : restServers) {
+                            if (restClientMap.containsKey(node) == false) {
+                                restClientMap.put(node, new RestClient(node));
+                            }
+                        }
+
                         logger.info("Announcing new broadcast event: " + broadcastEvent);
-                        for (final String address : config.getRestServers()) {
+                        for (final String node : restServers) {
                             wipingCachePool.execute(new Runnable() {
                                 @Override
                                 public void run() {
                                     try {
-                                        RestClient.wipeCache(client, RestClient.SCHEME_HTTP + address + RestClient.KYLIN_API_PATH,  broadcastEvent.getEntity(), broadcastEvent.getEvent(), broadcastEvent.getCacheKey());
+                                        restClientMap.get(node).wipeCache(broadcastEvent.getEntity(), broadcastEvent.getEvent(), broadcastEvent.getCacheKey());
                                     } catch (IOException e) {
                                         logger.warn("Thread failed during wipe cache at " + broadcastEvent, e);
                                     }