You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2018/02/04 00:51:32 UTC

[29/50] [abbrv] kylin git commit: KYLIN-3158 Retry failed the metadata sync event on the failed node only

KYLIN-3158 Retry failed the metadata sync event on the failed node only


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/baf7133d
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/baf7133d
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/baf7133d

Branch: refs/heads/sync
Commit: baf7133dabd8f1e53f5235b61da2bc5da334d870
Parents: 0fce61c
Author: Li Yang <li...@apache.org>
Authored: Fri Jan 12 21:46:47 2018 +0800
Committer: Li Yang <li...@apache.org>
Committed: Fri Jan 26 22:54:58 2018 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |   4 +
 .../kylin/metadata/cachesync/Broadcaster.java   | 127 ++++++++++++++-----
 .../metadata/cachesync/SingleValueCache.java    |   6 +-
 .../metadata/cachesync/BroadcasterTest.java     |  43 +++++++
 .../kylin/rest/init/InitialTaskManager.java     |   2 +-
 .../apache/kylin/rest/service/CacheService.java |   4 +-
 6 files changed, 151 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/baf7133d/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index b053daa..5045b2f 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -303,6 +303,10 @@ abstract public class KylinConfigBase implements Serializable {
         return Integer.parseInt(getOptional("kylin.metadata.sync-retries", "3"));
     }
 
+    public String getCacheSyncErrorHandler() {
+        return getOptional("kylin.metadata.sync-error-handler");
+    }
+    
     // for test only
     public void setMetadataUrl(String metadataUrl) {
         setProperty("kylin.metadata.url", metadataUrl);

http://git-wip-us.apache.org/repos/asf/kylin/blob/baf7133d/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
index 05910ea..6462a27 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
@@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.restclient.RestClient;
+import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.DaemonThreadFactory;
 import org.apache.kylin.metadata.project.ProjectManager;
 import org.slf4j.Logger;
@@ -68,7 +69,7 @@ public class Broadcaster {
     public static Broadcaster getInstance(KylinConfig config) {
         return config.getManager(Broadcaster.class);
     }
-    
+
     // called by reflection
     static Broadcaster newInstance(KylinConfig config) {
         return new Broadcaster(config);
@@ -79,13 +80,19 @@ public class Broadcaster {
     static final Map<String, List<Listener>> staticListenerMap = Maps.newConcurrentMap();
 
     private KylinConfig config;
+    private ExecutorService announceMainLoop;
+    private ExecutorService announceThreadPool;
+    private SyncErrorHandler syncErrorHandler;
     private BlockingDeque<BroadcastEvent> broadcastEvents = new LinkedBlockingDeque<>();
     private Map<String, List<Listener>> listenerMap = Maps.newConcurrentMap();
-    private AtomicLong counter = new AtomicLong();
-
+    private AtomicLong counter = new AtomicLong(); // a counter for testing purpose
+    
     private Broadcaster(final KylinConfig config) {
         this.config = config;
-        final int retryLimitTimes = config.getCacheSyncRetrys();
+        this.syncErrorHandler = getSyncErrorHandler(config);
+        this.announceMainLoop = Executors.newSingleThreadExecutor(new DaemonThreadFactory());
+        this.announceThreadPool = new ThreadPoolExecutor(1, 10, 60L, TimeUnit.SECONDS,
+                new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory());
 
         final String[] nodes = config.getRestServers();
         if (nodes == null || nodes.length < 1) {
@@ -93,21 +100,14 @@ public class Broadcaster {
         }
         logger.debug(nodes.length + " nodes in the cluster: " + Arrays.toString(nodes));
 
-        Executors.newSingleThreadExecutor(new DaemonThreadFactory()).execute(new Runnable() {
+        announceMainLoop.execute(new Runnable() {
             @Override
             public void run() {
                 final Map<String, RestClient> restClientMap = Maps.newHashMap();
-                final ExecutorService wipingCachePool = new ThreadPoolExecutor(1, 10, 60L, TimeUnit.SECONDS,
-                        new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory());
 
-                while (true) {
+                while (!announceThreadPool.isShutdown()) {
                     try {
                         final BroadcastEvent broadcastEvent = broadcastEvents.takeFirst();
-                        broadcastEvent.setRetryTime(broadcastEvent.getRetryTime() + 1);
-                        if (broadcastEvent.getRetryTime() > retryLimitTimes) {
-                            logger.info("broadcastEvent retry up to limit times, broadcastEvent:{}", broadcastEvent);
-                            continue;
-                        }
 
                         String[] restServers = config.getRestServers();
                         logger.debug("Servers in the cluster: " + Arrays.toString(restServers));
@@ -117,25 +117,27 @@ public class Broadcaster {
                             }
                         }
 
-                        logger.debug("Announcing new broadcast event: " + broadcastEvent);
+                        String toWhere = broadcastEvent.getTargetNode();
+                        if (toWhere == null)
+                            toWhere = "all";
+                        logger.debug("Announcing new broadcast to " + toWhere + ": " + broadcastEvent);
+                        
                         for (final String node : restServers) {
-                            wipingCachePool.execute(new Runnable() {
+                            if (!(toWhere.equals("all") || toWhere.equals(node)))
+                                continue;
+                            
+                            announceThreadPool.execute(new Runnable() {
                                 @Override
                                 public void run() {
+                                    RestClient restClient = restClientMap.get(node);
                                     try {
-                                        restClientMap.get(node).wipeCache(broadcastEvent.getEntity(),
-                                                broadcastEvent.getEvent(), broadcastEvent.getCacheKey());
+                                        restClient.wipeCache(broadcastEvent.getEntity(), broadcastEvent.getEvent(),
+                                                broadcastEvent.getCacheKey());
                                     } catch (IOException e) {
-                                        logger.warn("Thread failed during wipe cache at {}, error msg: {}",
-                                                broadcastEvent, e);
-                                        // when sync failed, put back to queue
-                                        try {
-                                            broadcastEvents.putLast(broadcastEvent);
-                                        } catch (InterruptedException ex) {
-                                            logger.warn(
-                                                    "error reentry failed broadcastEvent to queue, broacastEvent:{}, error: {} ",
-                                                    broadcastEvent, ex);
-                                        }
+                                        logger.error(
+                                                "Announce broadcast event failed, targetNode {} broadcastEvent {}, error msg: {}",
+                                                node, broadcastEvent, e);
+                                        syncErrorHandler.handleAnnounceError(node, restClient, broadcastEvent);
                                     }
                                 }
                             });
@@ -148,6 +150,23 @@ public class Broadcaster {
         });
     }
 
+    private SyncErrorHandler getSyncErrorHandler(KylinConfig config) {
+        String clzName = config.getCacheSyncErrorHandler();
+        if (StringUtils.isEmpty(clzName)) {
+            clzName = DefaultSyncErrorHandler.class.getName();
+        }
+        return (SyncErrorHandler) ClassUtil.newInstance(clzName);
+    }
+
+    public KylinConfig getConfig() {
+        return config;
+    }
+    
+    public void stopAnnounce() {
+        announceThreadPool.shutdown();
+        announceMainLoop.shutdown();
+    }
+
     // static listener survives cache wipe and goes after normal listeners
     public void registerStaticListener(Listener listener, String... entities) {
         doRegisterListener(staticListenerMap, listener, entities);
@@ -263,15 +282,19 @@ public class Broadcaster {
     }
 
     /**
-     * Broadcast an event out
+     * Announce an event out to peer kylin servers
      */
-    public void queue(String entity, String event, String key) {
+    public void announce(String entity, String event, String key) {
+        announce(new BroadcastEvent(entity, event, key));
+    }
+
+    public void announce(BroadcastEvent event) {
         if (broadcastEvents == null)
             return;
 
         try {
             counter.incrementAndGet();
-            broadcastEvents.putLast(new BroadcastEvent(entity, event, key));
+            broadcastEvents.putLast(event);
         } catch (Exception e) {
             counter.decrementAndGet();
             logger.error("error putting BroadcastEvent", e);
@@ -282,6 +305,40 @@ public class Broadcaster {
         return counter.getAndSet(0);
     }
 
+    // ============================================================================
+
+    public static class DefaultSyncErrorHandler implements SyncErrorHandler {
+        Broadcaster broadcaster;
+        int maxRetryTimes;
+
+        @Override
+        public void init(Broadcaster broadcaster) {
+            this.maxRetryTimes = broadcaster.getConfig().getCacheSyncRetrys();
+            this.broadcaster = broadcaster;
+        }
+
+        @Override
+        public void handleAnnounceError(String targetNode, RestClient restClient, BroadcastEvent event) {
+            int retry = event.getRetryTime() + 1;
+
+            // when sync failed, put back to queue to retry
+            if (retry < maxRetryTimes) {
+                event.setRetryTime(retry);
+                event.setTargetNode(targetNode);
+                broadcaster.announce(event);
+            } else {
+                logger.error("Announce broadcast event exceeds retry limit, abandon targetNode {} broadcastEvent {}",
+                        targetNode, event);
+            }
+        }
+    }
+
+    public interface SyncErrorHandler {
+        void init(Broadcaster broadcaster);
+
+        void handleAnnounceError(String targetNode, RestClient restClient, BroadcastEvent event);
+    }
+
     public enum Event {
 
         CREATE("create"), UPDATE("update"), DROP("drop");
@@ -326,6 +383,8 @@ public class Broadcaster {
 
     public static class BroadcastEvent {
         private int retryTime;
+        private String targetNode; // NULL means to all
+        
         private String entity;
         private String event;
         private String cacheKey;
@@ -345,6 +404,14 @@ public class Broadcaster {
             this.retryTime = retryTime;
         }
 
+        public String getTargetNode() {
+            return targetNode;
+        }
+
+        public void setTargetNode(String targetNode) {
+            this.targetNode = targetNode;
+        }
+
         public String getEntity() {
             return entity;
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/baf7133d/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/SingleValueCache.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/SingleValueCache.java b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/SingleValueCache.java
index 4bfaeae..f803c8b 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/SingleValueCache.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/SingleValueCache.java
@@ -49,9 +49,9 @@ public abstract class SingleValueCache<K, V> extends AbstractCache<K, V> {
         innerCache.put(key, value);
 
         if (!exists) {
-            getBroadcaster().queue(syncEntity, Broadcaster.Event.CREATE.getType(), key.toString());
+            getBroadcaster().announce(syncEntity, Broadcaster.Event.CREATE.getType(), key.toString());
         } else {
-            getBroadcaster().queue(syncEntity, Broadcaster.Event.UPDATE.getType(), key.toString());
+            getBroadcaster().announce(syncEntity, Broadcaster.Event.UPDATE.getType(), key.toString());
         }
     }
 
@@ -65,7 +65,7 @@ public abstract class SingleValueCache<K, V> extends AbstractCache<K, V> {
         innerCache.remove(key);
 
         if (exists) {
-            getBroadcaster().queue(syncEntity, Broadcaster.Event.DROP.getType(), key.toString());
+            getBroadcaster().announce(syncEntity, Broadcaster.Event.DROP.getType(), key.toString());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/baf7133d/core-metadata/src/test/java/org/apache/kylin/metadata/cachesync/BroadcasterTest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/test/java/org/apache/kylin/metadata/cachesync/BroadcasterTest.java b/core-metadata/src/test/java/org/apache/kylin/metadata/cachesync/BroadcasterTest.java
index 80f26f9..762512f 100644
--- a/core-metadata/src/test/java/org/apache/kylin/metadata/cachesync/BroadcasterTest.java
+++ b/core-metadata/src/test/java/org/apache/kylin/metadata/cachesync/BroadcasterTest.java
@@ -21,9 +21,12 @@ package org.apache.kylin.metadata.cachesync;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.kylin.common.restclient.RestClient;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.metadata.cachesync.Broadcaster.BroadcastEvent;
 import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
 import org.apache.kylin.metadata.cachesync.Broadcaster.Listener;
+import org.apache.kylin.metadata.cachesync.Broadcaster.SyncErrorHandler;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -64,6 +67,7 @@ public class BroadcasterTest extends LocalFileMetadataTestCase {
 
         broadcaster.notifyListener("test", Event.UPDATE, "");
 
+        broadcaster.stopAnnounce();
         Broadcaster.staticListenerMap.clear();
     }
 
@@ -90,6 +94,45 @@ public class BroadcasterTest extends LocalFileMetadataTestCase {
 
         broadcaster.notifyNonStaticListener("test", Event.UPDATE, "");
 
+        broadcaster.stopAnnounce();
         Broadcaster.staticListenerMap.clear();
     }
+
+    @Test
+    public void testAnnounceErrorHandler() throws IOException, InterruptedException {
+        System.setProperty("kylin.server.cluster-servers", "localhost:717");
+        System.setProperty("kylin.metadata.sync-error-handler", MockupErrHandler.class.getName());
+        try {
+            Broadcaster broadcaster = Broadcaster.getInstance(getTestConfig());
+
+            broadcaster.announce("all", "update", "all");
+            
+            for (int i = 0; i < 30 && MockupErrHandler.atom.get() == 0; i++) {
+                Thread.sleep(1000);
+            }
+
+            broadcaster.stopAnnounce();
+            Broadcaster.staticListenerMap.clear();
+        } finally {
+            System.clearProperty("kylin.server.cluster-servers");
+            System.clearProperty("kylin.metadata.sync-error-handler");
+        }
+        
+        Assert.assertTrue(MockupErrHandler.atom.get() > 0);
+    }
+    
+    public static class MockupErrHandler implements SyncErrorHandler {
+        static AtomicInteger atom = new AtomicInteger();
+        
+        @Override
+        public void init(Broadcaster broadcaster) {
+        }
+
+        @Override
+        public void handleAnnounceError(String targetNode, RestClient restClient, BroadcastEvent event) {
+            Assert.assertEquals("localhost:717", targetNode);
+            atom.incrementAndGet();
+        }
+        
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/baf7133d/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java b/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java
index 14052ce..467ef82 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java
@@ -53,7 +53,7 @@ public class InitialTaskManager implements InitializingBean {
             for (String taskClass : taskClasses) {
                 try {
                     InitialTask task = (InitialTask) Class.forName(taskClass).newInstance();
-                    logger.info("Running task: " + taskClass);
+                    logger.info("Running initial task: " + taskClass);
                     task.execute();
                 } catch (Throwable e) {
                     logger.error("Initial task failed: " + taskClass, e);

http://git-wip-us.apache.org/repos/asf/kylin/blob/baf7133d/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
index 98e06e0..b61309e 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import org.apache.kylin.metadata.cachesync.Broadcaster;
 import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
+import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.InitializingBean;
@@ -49,6 +50,7 @@ public class CacheService extends BasicService implements InitializingBean {
         @Override
         public void onClearAll(Broadcaster broadcaster) throws IOException {
             cleanAllDataCache();
+            HBaseConnection.clearConnCache(); // take the chance to clear HBase connection cache as well
         }
 
         @Override
@@ -104,7 +106,7 @@ public class CacheService extends BasicService implements InitializingBean {
 
     public void annouceWipeCache(String entity, String event, String cacheKey) {
         Broadcaster broadcaster = Broadcaster.getInstance(getConfig());
-        broadcaster.queue(entity, event, cacheKey);
+        broadcaster.announce(entity, event, cacheKey);
     }
 
     public void notifyMetadataChange(String entity, Event event, String cacheKey) throws IOException {