You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/06/05 05:23:20 UTC
[32/67] [abbrv] kylin git commit: Broadercaster should allow dynamic
rest server list
Broadercaster should allow dynamic rest server list
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/3fbf90ae
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/3fbf90ae
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/3fbf90ae
Branch: refs/heads/master
Commit: 3fbf90aed1bd78c1f44f9fd1f37fc34ffa704762
Parents: eafbe73
Author: shaofengshi <sh...@apache.org>
Authored: Fri May 26 17:55:36 2017 +0800
Committer: hongbin ma <ma...@kyligence.io>
Committed: Sat May 27 16:20:07 2017 +0800
----------------------------------------------------------------------
.../kylin/common/restclient/RestClient.java | 11 +++++++++-
.../kylin/metadata/cachesync/Broadcaster.java | 23 ++++++++++++--------
2 files changed, 24 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/3fbf90ae/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
index 33a4e7a..fc34a6b 100644
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
+++ b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
@@ -33,6 +33,7 @@ import org.apache.http.HttpResponse;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
@@ -65,6 +66,10 @@ public class RestClient {
private static final int HTTP_CONNECTION_TIMEOUT_MS = 30000;
private static final int HTTP_SOCKET_TIMEOUT_MS = 120000;
+ public static final String SCHEME_HTTP = "http://";
+
+ public static final String KYLIN_API_PATH = "/kylin/api";
+
public static boolean matchFullRestPattern(String uri) {
Matcher m = fullRestPattern.matcher(uri);
return m.matches();
@@ -97,7 +102,7 @@ public class RestClient {
this.port = port;
this.userName = userName;
this.password = password;
- this.baseUrl = "http://" + host + ":" + port + "/kylin/api";
+ this.baseUrl = SCHEME_HTTP + host + ":" + port + KYLIN_API_PATH;
final HttpParams httpParams = new BasicHttpParams();
HttpConnectionParams.setSoTimeout(httpParams, HTTP_SOCKET_TIMEOUT_MS);
@@ -114,6 +119,10 @@ public class RestClient {
}
public void wipeCache(String entity, String event, String cacheKey) throws IOException {
+ wipeCache(client, baseUrl, entity, event, cacheKey);
+ }
+
+ public static void wipeCache(HttpClient client, String baseUrl, String entity, String event, String cacheKey) throws IOException {
String url = baseUrl + "/cache/" + entity + "/" + cacheKey + "/" + event;
HttpPut request = new HttpPut(url);
http://git-wip-us.apache.org/repos/asf/kylin/blob/3fbf90ae/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
index 1394f7b..35d2f42 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
@@ -32,6 +32,11 @@ import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.StringUtils;
+import org.apache.http.client.HttpClient;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.params.BasicHttpParams;
+import org.apache.http.params.HttpConnectionParams;
+import org.apache.http.params.HttpParams;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.restclient.RestClient;
import org.apache.kylin.common.util.DaemonThreadFactory;
@@ -104,29 +109,29 @@ public class Broadcaster {
final String[] nodes = config.getRestServers();
if (nodes == null || nodes.length < 1) {
logger.warn("There is no available rest server; check the 'kylin.server.cluster-servers' config");
- broadcastEvents = null; // disable the broadcaster
- return;
}
logger.debug(nodes.length + " nodes in the cluster: " + Arrays.toString(nodes));
Executors.newSingleThreadExecutor(new DaemonThreadFactory()).execute(new Runnable() {
@Override
public void run() {
- final List<RestClient> restClients = Lists.newArrayList();
- for (String node : config.getRestServers()) {
- restClients.add(new RestClient(node));
- }
- final ExecutorService wipingCachePool = Executors.newFixedThreadPool(restClients.size(), new DaemonThreadFactory());
+ final HttpParams httpParams = new BasicHttpParams();
+ HttpConnectionParams.setConnectionTimeout(httpParams, 3000);
+
+ final HttpClient client = new DefaultHttpClient(httpParams);
+
+ final ExecutorService wipingCachePool = Executors.newFixedThreadPool(3, new DaemonThreadFactory());
while (true) {
try {
final BroadcastEvent broadcastEvent = broadcastEvents.takeFirst();
+ logger.debug("Servers in the cluster: " + Arrays.toString(config.getRestServers()));
logger.info("Announcing new broadcast event: " + broadcastEvent);
- for (final RestClient restClient : restClients) {
+ for (final String address : config.getRestServers()) {
wipingCachePool.execute(new Runnable() {
@Override
public void run() {
try {
- restClient.wipeCache(broadcastEvent.getEntity(), broadcastEvent.getEvent(), broadcastEvent.getCacheKey());
+ RestClient.wipeCache(client, RestClient.SCHEME_HTTP + address + RestClient.KYLIN_API_PATH, broadcastEvent.getEntity(), broadcastEvent.getEvent(), broadcastEvent.getCacheKey());
} catch (IOException e) {
logger.warn("Thread failed during wipe cache at " + broadcastEvent, e);
}