You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2018/02/04 00:51:32 UTC
[29/50] [abbrv] kylin git commit: KYLIN-3158 Retry failed the
metadata sync event on the failed node only
KYLIN-3158 Retry failed the metadata sync event on the failed node only
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/baf7133d
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/baf7133d
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/baf7133d
Branch: refs/heads/sync
Commit: baf7133dabd8f1e53f5235b61da2bc5da334d870
Parents: 0fce61c
Author: Li Yang <li...@apache.org>
Authored: Fri Jan 12 21:46:47 2018 +0800
Committer: Li Yang <li...@apache.org>
Committed: Fri Jan 26 22:54:58 2018 +0800
----------------------------------------------------------------------
.../apache/kylin/common/KylinConfigBase.java | 4 +
.../kylin/metadata/cachesync/Broadcaster.java | 127 ++++++++++++++-----
.../metadata/cachesync/SingleValueCache.java | 6 +-
.../metadata/cachesync/BroadcasterTest.java | 43 +++++++
.../kylin/rest/init/InitialTaskManager.java | 2 +-
.../apache/kylin/rest/service/CacheService.java | 4 +-
6 files changed, 151 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/baf7133d/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index b053daa..5045b2f 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -303,6 +303,10 @@ abstract public class KylinConfigBase implements Serializable {
return Integer.parseInt(getOptional("kylin.metadata.sync-retries", "3"));
}
+ public String getCacheSyncErrorHandler() {
+ return getOptional("kylin.metadata.sync-error-handler");
+ }
+
// for test only
public void setMetadataUrl(String metadataUrl) {
setProperty("kylin.metadata.url", metadataUrl);
http://git-wip-us.apache.org/repos/asf/kylin/blob/baf7133d/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
index 05910ea..6462a27 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
@@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.restclient.RestClient;
+import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.DaemonThreadFactory;
import org.apache.kylin.metadata.project.ProjectManager;
import org.slf4j.Logger;
@@ -68,7 +69,7 @@ public class Broadcaster {
public static Broadcaster getInstance(KylinConfig config) {
return config.getManager(Broadcaster.class);
}
-
+
// called by reflection
static Broadcaster newInstance(KylinConfig config) {
return new Broadcaster(config);
@@ -79,13 +80,19 @@ public class Broadcaster {
static final Map<String, List<Listener>> staticListenerMap = Maps.newConcurrentMap();
private KylinConfig config;
+ private ExecutorService announceMainLoop;
+ private ExecutorService announceThreadPool;
+ private SyncErrorHandler syncErrorHandler;
private BlockingDeque<BroadcastEvent> broadcastEvents = new LinkedBlockingDeque<>();
private Map<String, List<Listener>> listenerMap = Maps.newConcurrentMap();
- private AtomicLong counter = new AtomicLong();
-
+ private AtomicLong counter = new AtomicLong(); // a counter for testing purpose
+
private Broadcaster(final KylinConfig config) {
this.config = config;
- final int retryLimitTimes = config.getCacheSyncRetrys();
+ this.syncErrorHandler = getSyncErrorHandler(config);
+ this.announceMainLoop = Executors.newSingleThreadExecutor(new DaemonThreadFactory());
+ this.announceThreadPool = new ThreadPoolExecutor(1, 10, 60L, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory());
final String[] nodes = config.getRestServers();
if (nodes == null || nodes.length < 1) {
@@ -93,21 +100,14 @@ public class Broadcaster {
}
logger.debug(nodes.length + " nodes in the cluster: " + Arrays.toString(nodes));
- Executors.newSingleThreadExecutor(new DaemonThreadFactory()).execute(new Runnable() {
+ announceMainLoop.execute(new Runnable() {
@Override
public void run() {
final Map<String, RestClient> restClientMap = Maps.newHashMap();
- final ExecutorService wipingCachePool = new ThreadPoolExecutor(1, 10, 60L, TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory());
- while (true) {
+ while (!announceThreadPool.isShutdown()) {
try {
final BroadcastEvent broadcastEvent = broadcastEvents.takeFirst();
- broadcastEvent.setRetryTime(broadcastEvent.getRetryTime() + 1);
- if (broadcastEvent.getRetryTime() > retryLimitTimes) {
- logger.info("broadcastEvent retry up to limit times, broadcastEvent:{}", broadcastEvent);
- continue;
- }
String[] restServers = config.getRestServers();
logger.debug("Servers in the cluster: " + Arrays.toString(restServers));
@@ -117,25 +117,27 @@ public class Broadcaster {
}
}
- logger.debug("Announcing new broadcast event: " + broadcastEvent);
+ String toWhere = broadcastEvent.getTargetNode();
+ if (toWhere == null)
+ toWhere = "all";
+ logger.debug("Announcing new broadcast to " + toWhere + ": " + broadcastEvent);
+
for (final String node : restServers) {
- wipingCachePool.execute(new Runnable() {
+ if (!(toWhere.equals("all") || toWhere.equals(node)))
+ continue;
+
+ announceThreadPool.execute(new Runnable() {
@Override
public void run() {
+ RestClient restClient = restClientMap.get(node);
try {
- restClientMap.get(node).wipeCache(broadcastEvent.getEntity(),
- broadcastEvent.getEvent(), broadcastEvent.getCacheKey());
+ restClient.wipeCache(broadcastEvent.getEntity(), broadcastEvent.getEvent(),
+ broadcastEvent.getCacheKey());
} catch (IOException e) {
- logger.warn("Thread failed during wipe cache at {}, error msg: {}",
- broadcastEvent, e);
- // when sync failed, put back to queue
- try {
- broadcastEvents.putLast(broadcastEvent);
- } catch (InterruptedException ex) {
- logger.warn(
- "error reentry failed broadcastEvent to queue, broacastEvent:{}, error: {} ",
- broadcastEvent, ex);
- }
+ logger.error(
+ "Announce broadcast event failed, targetNode {} broadcastEvent {}, error msg: {}",
+ node, broadcastEvent, e);
+ syncErrorHandler.handleAnnounceError(node, restClient, broadcastEvent);
}
}
});
@@ -148,6 +150,23 @@ public class Broadcaster {
});
}
+ private SyncErrorHandler getSyncErrorHandler(KylinConfig config) {
+ String clzName = config.getCacheSyncErrorHandler();
+ if (StringUtils.isEmpty(clzName)) {
+ clzName = DefaultSyncErrorHandler.class.getName();
+ }
+ return (SyncErrorHandler) ClassUtil.newInstance(clzName);
+ }
+
+ public KylinConfig getConfig() {
+ return config;
+ }
+
+ public void stopAnnounce() {
+ announceThreadPool.shutdown();
+ announceMainLoop.shutdown();
+ }
+
// static listener survives cache wipe and goes after normal listeners
public void registerStaticListener(Listener listener, String... entities) {
doRegisterListener(staticListenerMap, listener, entities);
@@ -263,15 +282,19 @@ public class Broadcaster {
}
/**
- * Broadcast an event out
+ * Announce an event out to peer kylin servers
*/
- public void queue(String entity, String event, String key) {
+ public void announce(String entity, String event, String key) {
+ announce(new BroadcastEvent(entity, event, key));
+ }
+
+ public void announce(BroadcastEvent event) {
if (broadcastEvents == null)
return;
try {
counter.incrementAndGet();
- broadcastEvents.putLast(new BroadcastEvent(entity, event, key));
+ broadcastEvents.putLast(event);
} catch (Exception e) {
counter.decrementAndGet();
logger.error("error putting BroadcastEvent", e);
@@ -282,6 +305,40 @@ public class Broadcaster {
return counter.getAndSet(0);
}
+ // ============================================================================
+
+ public static class DefaultSyncErrorHandler implements SyncErrorHandler {
+ Broadcaster broadcaster;
+ int maxRetryTimes;
+
+ @Override
+ public void init(Broadcaster broadcaster) {
+ this.maxRetryTimes = broadcaster.getConfig().getCacheSyncRetrys();
+ this.broadcaster = broadcaster;
+ }
+
+ @Override
+ public void handleAnnounceError(String targetNode, RestClient restClient, BroadcastEvent event) {
+ int retry = event.getRetryTime() + 1;
+
+ // when sync failed, put back to queue to retry
+ if (retry < maxRetryTimes) {
+ event.setRetryTime(retry);
+ event.setTargetNode(targetNode);
+ broadcaster.announce(event);
+ } else {
+ logger.error("Announce broadcast event exceeds retry limit, abandon targetNode {} broadcastEvent {}",
+ targetNode, event);
+ }
+ }
+ }
+
+ public interface SyncErrorHandler {
+ void init(Broadcaster broadcaster);
+
+ void handleAnnounceError(String targetNode, RestClient restClient, BroadcastEvent event);
+ }
+
public enum Event {
CREATE("create"), UPDATE("update"), DROP("drop");
@@ -326,6 +383,8 @@ public class Broadcaster {
public static class BroadcastEvent {
private int retryTime;
+ private String targetNode; // NULL means to all
+
private String entity;
private String event;
private String cacheKey;
@@ -345,6 +404,14 @@ public class Broadcaster {
this.retryTime = retryTime;
}
+ public String getTargetNode() {
+ return targetNode;
+ }
+
+ public void setTargetNode(String targetNode) {
+ this.targetNode = targetNode;
+ }
+
public String getEntity() {
return entity;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/baf7133d/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/SingleValueCache.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/SingleValueCache.java b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/SingleValueCache.java
index 4bfaeae..f803c8b 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/SingleValueCache.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/SingleValueCache.java
@@ -49,9 +49,9 @@ public abstract class SingleValueCache<K, V> extends AbstractCache<K, V> {
innerCache.put(key, value);
if (!exists) {
- getBroadcaster().queue(syncEntity, Broadcaster.Event.CREATE.getType(), key.toString());
+ getBroadcaster().announce(syncEntity, Broadcaster.Event.CREATE.getType(), key.toString());
} else {
- getBroadcaster().queue(syncEntity, Broadcaster.Event.UPDATE.getType(), key.toString());
+ getBroadcaster().announce(syncEntity, Broadcaster.Event.UPDATE.getType(), key.toString());
}
}
@@ -65,7 +65,7 @@ public abstract class SingleValueCache<K, V> extends AbstractCache<K, V> {
innerCache.remove(key);
if (exists) {
- getBroadcaster().queue(syncEntity, Broadcaster.Event.DROP.getType(), key.toString());
+ getBroadcaster().announce(syncEntity, Broadcaster.Event.DROP.getType(), key.toString());
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/baf7133d/core-metadata/src/test/java/org/apache/kylin/metadata/cachesync/BroadcasterTest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/test/java/org/apache/kylin/metadata/cachesync/BroadcasterTest.java b/core-metadata/src/test/java/org/apache/kylin/metadata/cachesync/BroadcasterTest.java
index 80f26f9..762512f 100644
--- a/core-metadata/src/test/java/org/apache/kylin/metadata/cachesync/BroadcasterTest.java
+++ b/core-metadata/src/test/java/org/apache/kylin/metadata/cachesync/BroadcasterTest.java
@@ -21,9 +21,12 @@ package org.apache.kylin.metadata.cachesync;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.kylin.common.restclient.RestClient;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.metadata.cachesync.Broadcaster.BroadcastEvent;
import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
import org.apache.kylin.metadata.cachesync.Broadcaster.Listener;
+import org.apache.kylin.metadata.cachesync.Broadcaster.SyncErrorHandler;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -64,6 +67,7 @@ public class BroadcasterTest extends LocalFileMetadataTestCase {
broadcaster.notifyListener("test", Event.UPDATE, "");
+ broadcaster.stopAnnounce();
Broadcaster.staticListenerMap.clear();
}
@@ -90,6 +94,45 @@ public class BroadcasterTest extends LocalFileMetadataTestCase {
broadcaster.notifyNonStaticListener("test", Event.UPDATE, "");
+ broadcaster.stopAnnounce();
Broadcaster.staticListenerMap.clear();
}
+
+ @Test
+ public void testAnnounceErrorHandler() throws IOException, InterruptedException {
+ System.setProperty("kylin.server.cluster-servers", "localhost:717");
+ System.setProperty("kylin.metadata.sync-error-handler", MockupErrHandler.class.getName());
+ try {
+ Broadcaster broadcaster = Broadcaster.getInstance(getTestConfig());
+
+ broadcaster.announce("all", "update", "all");
+
+ for (int i = 0; i < 30 && MockupErrHandler.atom.get() == 0; i++) {
+ Thread.sleep(1000);
+ }
+
+ broadcaster.stopAnnounce();
+ Broadcaster.staticListenerMap.clear();
+ } finally {
+ System.clearProperty("kylin.server.cluster-servers");
+ System.clearProperty("kylin.metadata.sync-error-handler");
+ }
+
+ Assert.assertTrue(MockupErrHandler.atom.get() > 0);
+ }
+
+ public static class MockupErrHandler implements SyncErrorHandler {
+ static AtomicInteger atom = new AtomicInteger();
+
+ @Override
+ public void init(Broadcaster broadcaster) {
+ }
+
+ @Override
+ public void handleAnnounceError(String targetNode, RestClient restClient, BroadcastEvent event) {
+ Assert.assertEquals("localhost:717", targetNode);
+ atom.incrementAndGet();
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/baf7133d/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java b/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java
index 14052ce..467ef82 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java
@@ -53,7 +53,7 @@ public class InitialTaskManager implements InitializingBean {
for (String taskClass : taskClasses) {
try {
InitialTask task = (InitialTask) Class.forName(taskClass).newInstance();
- logger.info("Running task: " + taskClass);
+ logger.info("Running initial task: " + taskClass);
task.execute();
} catch (Throwable e) {
logger.error("Initial task failed: " + taskClass, e);
http://git-wip-us.apache.org/repos/asf/kylin/blob/baf7133d/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
index 98e06e0..b61309e 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.kylin.metadata.cachesync.Broadcaster;
import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
+import org.apache.kylin.storage.hbase.HBaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
@@ -49,6 +50,7 @@ public class CacheService extends BasicService implements InitializingBean {
@Override
public void onClearAll(Broadcaster broadcaster) throws IOException {
cleanAllDataCache();
+ HBaseConnection.clearConnCache(); // take the chance to clear HBase connection cache as well
}
@Override
@@ -104,7 +106,7 @@ public class CacheService extends BasicService implements InitializingBean {
public void annouceWipeCache(String entity, String event, String cacheKey) {
Broadcaster broadcaster = Broadcaster.getInstance(getConfig());
- broadcaster.queue(entity, event, cacheKey);
+ broadcaster.announce(entity, event, cacheKey);
}
public void notifyMetadataChange(String entity, Event event, String cacheKey) throws IOException {