You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/06/05 05:23:19 UTC
[31/67] [abbrv] kylin git commit: KYLIN-2619 refine Broadcaster
KYLIN-2619 refine Broadcaster
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/87d5d8db
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/87d5d8db
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/87d5d8db
Branch: refs/heads/master
Commit: 87d5d8db276b590c86c473db5997414285b2d689
Parents: 3fbf90a
Author: shaofengshi <sh...@apache.org>
Authored: Sat May 27 14:27:39 2017 +0800
Committer: hongbin ma <ma...@kyligence.io>
Committed: Sat May 27 16:20:07 2017 +0800
----------------------------------------------------------------------
.../kylin/common/restclient/RestClient.java | 10 +++----
.../kylin/metadata/cachesync/Broadcaster.java | 28 +++++++++++---------
2 files changed, 18 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/87d5d8db/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
index fc34a6b..13490cb 100644
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
+++ b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
@@ -33,7 +33,6 @@ import org.apache.http.HttpResponse;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
-import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
@@ -119,19 +118,16 @@ public class RestClient {
}
public void wipeCache(String entity, String event, String cacheKey) throws IOException {
- wipeCache(client, baseUrl, entity, event, cacheKey);
- }
-
- public static void wipeCache(HttpClient client, String baseUrl, String entity, String event, String cacheKey) throws IOException {
String url = baseUrl + "/cache/" + entity + "/" + cacheKey + "/" + event;
HttpPut request = new HttpPut(url);
try {
HttpResponse response = client.execute(request);
- String msg = EntityUtils.toString(response.getEntity());
- if (response.getStatusLine().getStatusCode() != 200)
+ if (response.getStatusLine().getStatusCode() != 200) {
+ String msg = EntityUtils.toString(response.getEntity());
throw new IOException("Invalid response " + response.getStatusLine().getStatusCode() + " with cache wipe url " + url + "\n" + msg);
+ }
} catch (Exception ex) {
throw new IOException(ex);
} finally {
http://git-wip-us.apache.org/repos/asf/kylin/blob/87d5d8db/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
index 35d2f42..4a8c6d3 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
@@ -29,14 +29,12 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.StringUtils;
-import org.apache.http.client.HttpClient;
-import org.apache.http.impl.client.DefaultHttpClient;
-import org.apache.http.params.BasicHttpParams;
-import org.apache.http.params.HttpConnectionParams;
-import org.apache.http.params.HttpParams;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.restclient.RestClient;
import org.apache.kylin.common.util.DaemonThreadFactory;
@@ -115,23 +113,27 @@ public class Broadcaster {
Executors.newSingleThreadExecutor(new DaemonThreadFactory()).execute(new Runnable() {
@Override
public void run() {
- final HttpParams httpParams = new BasicHttpParams();
- HttpConnectionParams.setConnectionTimeout(httpParams, 3000);
+ final Map<String, RestClient> restClientMap = Maps.newHashMap();
+ final ExecutorService wipingCachePool = new ThreadPoolExecutor(1, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
- final HttpClient client = new DefaultHttpClient(httpParams);
-
- final ExecutorService wipingCachePool = Executors.newFixedThreadPool(3, new DaemonThreadFactory());
while (true) {
try {
final BroadcastEvent broadcastEvent = broadcastEvents.takeFirst();
- logger.debug("Servers in the cluster: " + Arrays.toString(config.getRestServers()));
+ String[] restServers = config.getRestServers();
+ logger.info("Servers in the cluster: " + Arrays.toString(restServers));
+ for (final String node : restServers) {
+ if (restClientMap.containsKey(node) == false) {
+ restClientMap.put(node, new RestClient(node));
+ }
+ }
+
logger.info("Announcing new broadcast event: " + broadcastEvent);
- for (final String address : config.getRestServers()) {
+ for (final String node : restServers) {
wipingCachePool.execute(new Runnable() {
@Override
public void run() {
try {
- RestClient.wipeCache(client, RestClient.SCHEME_HTTP + address + RestClient.KYLIN_API_PATH, broadcastEvent.getEntity(), broadcastEvent.getEvent(), broadcastEvent.getCacheKey());
+ restClientMap.get(node).wipeCache(broadcastEvent.getEntity(), broadcastEvent.getEvent(), broadcastEvent.getCacheKey());
} catch (IOException e) {
logger.warn("Thread failed during wipe cache at " + broadcastEvent, e);
}