You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/10/21 00:34:30 UTC

atlas git commit: ATLAS-2220: Active state change listener order made predictable

Repository: atlas
Updated Branches:
  refs/heads/master 5add1aeb7 -> 3959b318e


ATLAS-2220: Active state change listener order made predictable


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/3959b318
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/3959b318
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/3959b318

Branch: refs/heads/master
Commit: 3959b318e4cc1e8df26f03a4632cfb6ecd8cb357
Parents: 5add1ae
Author: Madhan Neethiraj <ma...@apache.org>
Authored: Fri Oct 20 14:51:40 2017 -0700
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Fri Oct 20 16:54:11 2017 -0700

----------------------------------------------------------------------
 .../audit/HBaseBasedAuditRepository.java        |  5 ++++
 .../graph/GraphBackedSearchIndexer.java         |  5 ++++
 .../bootstrap/AtlasTypeDefStoreInitializer.java |  5 ++++
 .../atlas/services/DefaultMetadataService.java  |  5 ++++
 .../listener/ActiveStateChangeHandler.java      | 25 ++++++++++++++++++++
 .../notification/NotificationHookConsumer.java  |  5 ++++
 .../service/ActiveInstanceElectorService.java   | 21 ++++++++++++----
 7 files changed, 67 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/3959b318/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
index 5a5a2c1..774934c 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
@@ -406,4 +406,9 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
     public void instanceIsPassive() {
         LOG.info("Reacting to passive: No action for now.");
     }
+
+    @Override
+    public int getHandlerOrder() {
+        return HandlerOrder.HBASE_AUDIT_REPOSITORY.getOrder();
+    }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/3959b318/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
index 9cd2991..6eee24b 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
@@ -698,6 +698,11 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
     }
 
     @Override
+    public int getHandlerOrder() {
+        return HandlerOrder.GRAPH_BACKED_SEARCH_INDEXER.getOrder();
+    }
+
+    @Override
     public void onChange(ChangedTypeDefs changedTypeDefs) throws AtlasBaseException {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Processing changed typedefs {}", changedTypeDefs);

http://git-wip-us.apache.org/repos/asf/atlas/blob/3959b318/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
index d394810..5f07623 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
@@ -341,6 +341,11 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
         LOG.info("<== AtlasTypeDefStoreInitializer.instanceIsPassive()");
     }
 
+    @Override
+    public int getHandlerOrder() {
+        return HandlerOrder.TYPEDEF_STORE_INITIALIZER.getOrder();
+    }
+
     private static boolean updateTypeAttributes(AtlasStructDef oldStructDef, AtlasStructDef newStructDef, boolean checkTypeVersion) {
         boolean ret = isTypeUpdateApplicable(oldStructDef, newStructDef, checkTypeVersion);
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/3959b318/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
index 99d2107..9eb695c 100755
--- a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
+++ b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
@@ -797,6 +797,11 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang
     }
 
     @Override
+    public int getHandlerOrder() {
+        return HandlerOrder.DEFAULT_METADATA_SERVICE.getOrder();
+    }
+
+    @Override
     public void onChange(ChangedTypeDefs changedTypeDefs) throws AtlasBaseException {
         // All we need here is a restore of the type-system
         LOG.info("TypeSystem reset invoked by TypeRegistry changes");

http://git-wip-us.apache.org/repos/asf/atlas/blob/3959b318/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java
----------------------------------------------------------------------
diff --git a/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java b/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java
index 87a69ef..7d282b3 100644
--- a/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java
+++ b/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java
@@ -26,6 +26,22 @@ import org.apache.atlas.AtlasException;
  * The two state transitions we handle are (1) becoming active and (2) becoming passive.
  */
 public interface ActiveStateChangeHandler {
+    public enum HandlerOrder {
+        HBASE_AUDIT_REPOSITORY(0),
+        GRAPH_BACKED_SEARCH_INDEXER(1),
+        TYPEDEF_STORE_INITIALIZER(2),
+        DEFAULT_METADATA_SERVICE(3),
+        NOTIFICATION_HOOK_CONSUMER(4);
+
+
+        private final int order;
+
+        private HandlerOrder(int order) {
+            this.order = order;
+        }
+
+        public int getOrder() { return order; }
+    }
 
     /**
      * Callback that is invoked on an implementor when this instance of Atlas server is declared the leader.
@@ -46,4 +62,13 @@ public interface ActiveStateChangeHandler {
      * @throws {@link AtlasException} if anything is wrong on shutdown
      */
     void instanceIsPassive() throws AtlasException;
+
+
+    /**
+     * Defines the order in which the handler should be called.
+     *   When state becomes active, the handler will be called from low order to high
+     *   When state becomes passive, the handler will be called from high order to low
+     *
+     */
+    int getHandlerOrder();
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/3959b318/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 6790512..4646bff 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -220,6 +220,11 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
         stop();
     }
 
+    @Override
+    public int getHandlerOrder() {
+        return HandlerOrder.NOTIFICATION_HOOK_CONSUMER.getOrder();
+    }
+
     static class Timer {
         public void sleep(int interval) throws InterruptedException {
             Thread.sleep(interval);

http://git-wip-us.apache.org/repos/asf/atlas/blob/3959b318/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java b/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java
index 5071204..ad0cb84 100644
--- a/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java
+++ b/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java
@@ -34,7 +34,9 @@ import org.springframework.stereotype.Component;
 import javax.inject.Inject;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
 import java.util.Set;
 
 /**
@@ -58,7 +60,7 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene
     private final ServiceState serviceState;
     private final ActiveInstanceState activeInstanceState;
     private Set<ActiveStateChangeHandler> activeStateChangeHandlerProviders;
-    private Collection<ActiveStateChangeHandler> activeStateChangeHandlers;
+    private List<ActiveStateChangeHandler> activeStateChangeHandlers;
     private CuratorFactory curatorFactory;
     private LeaderLatch leaderLatch;
     private String serverId;
@@ -158,6 +160,17 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene
     private void cacheActiveStateChangeHandlers() {
         if (activeStateChangeHandlers.size()==0) {
             activeStateChangeHandlers.addAll(activeStateChangeHandlerProviders);
+
+            LOG.info("activeStateChangeHandlers(): before reorder: " + activeStateChangeHandlers);
+
+            Collections.sort(activeStateChangeHandlers, new Comparator<ActiveStateChangeHandler>() {
+                @Override
+                public int compare(ActiveStateChangeHandler lhs, ActiveStateChangeHandler rhs) {
+                    return Integer.compare(lhs.getHandlerOrder(), rhs.getHandlerOrder());
+                }
+            });
+
+            LOG.info("activeStateChangeHandlers(): after reorder: " + activeStateChangeHandlers);
         }
     }
 
@@ -177,9 +190,9 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene
     public void notLeader() {
         LOG.warn("Server instance with server id {} is removed as leader", serverId);
         serviceState.becomingPassive();
-        for (ActiveStateChangeHandler handler: activeStateChangeHandlers) {
+        for (int idx = activeStateChangeHandlers.size() - 1; idx >= 0; idx--) {
             try {
-                handler.instanceIsPassive();
+                activeStateChangeHandlers.get(idx).instanceIsPassive();
             } catch (AtlasException e) {
                 LOG.error("Error while reacting to passive state.", e);
             }