You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/10/21 00:34:30 UTC
atlas git commit: ATLAS-2220: Active state change listener order made
predictable
Repository: atlas
Updated Branches:
refs/heads/master 5add1aeb7 -> 3959b318e
ATLAS-2220: Active state change listener order made predictable
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/3959b318
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/3959b318
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/3959b318
Branch: refs/heads/master
Commit: 3959b318e4cc1e8df26f03a4632cfb6ecd8cb357
Parents: 5add1ae
Author: Madhan Neethiraj <ma...@apache.org>
Authored: Fri Oct 20 14:51:40 2017 -0700
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Fri Oct 20 16:54:11 2017 -0700
----------------------------------------------------------------------
.../audit/HBaseBasedAuditRepository.java | 5 ++++
.../graph/GraphBackedSearchIndexer.java | 5 ++++
.../bootstrap/AtlasTypeDefStoreInitializer.java | 5 ++++
.../atlas/services/DefaultMetadataService.java | 5 ++++
.../listener/ActiveStateChangeHandler.java | 25 ++++++++++++++++++++
.../notification/NotificationHookConsumer.java | 5 ++++
.../service/ActiveInstanceElectorService.java | 21 ++++++++++++----
7 files changed, 67 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/atlas/blob/3959b318/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
index 5a5a2c1..774934c 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
@@ -406,4 +406,9 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
public void instanceIsPassive() {
LOG.info("Reacting to passive: No action for now.");
}
+
+ @Override
+ public int getHandlerOrder() {
+ return HandlerOrder.HBASE_AUDIT_REPOSITORY.getOrder();
+ }
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/3959b318/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
index 9cd2991..6eee24b 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
@@ -698,6 +698,11 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
}
@Override
+ public int getHandlerOrder() {
+ return HandlerOrder.GRAPH_BACKED_SEARCH_INDEXER.getOrder();
+ }
+
+ @Override
public void onChange(ChangedTypeDefs changedTypeDefs) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing changed typedefs {}", changedTypeDefs);
http://git-wip-us.apache.org/repos/asf/atlas/blob/3959b318/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
index d394810..5f07623 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
@@ -341,6 +341,11 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
LOG.info("<== AtlasTypeDefStoreInitializer.instanceIsPassive()");
}
+ @Override
+ public int getHandlerOrder() {
+ return HandlerOrder.TYPEDEF_STORE_INITIALIZER.getOrder();
+ }
+
private static boolean updateTypeAttributes(AtlasStructDef oldStructDef, AtlasStructDef newStructDef, boolean checkTypeVersion) {
boolean ret = isTypeUpdateApplicable(oldStructDef, newStructDef, checkTypeVersion);
http://git-wip-us.apache.org/repos/asf/atlas/blob/3959b318/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
index 99d2107..9eb695c 100755
--- a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
+++ b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
@@ -797,6 +797,11 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang
}
@Override
+ public int getHandlerOrder() {
+ return HandlerOrder.DEFAULT_METADATA_SERVICE.getOrder();
+ }
+
+ @Override
public void onChange(ChangedTypeDefs changedTypeDefs) throws AtlasBaseException {
// All we need here is a restore of the type-system
LOG.info("TypeSystem reset invoked by TypeRegistry changes");
http://git-wip-us.apache.org/repos/asf/atlas/blob/3959b318/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java
----------------------------------------------------------------------
diff --git a/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java b/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java
index 87a69ef..7d282b3 100644
--- a/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java
+++ b/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java
@@ -26,6 +26,22 @@ import org.apache.atlas.AtlasException;
* The two state transitions we handle are (1) becoming active and (2) becoming passive.
*/
public interface ActiveStateChangeHandler {
+ public enum HandlerOrder {
+ HBASE_AUDIT_REPOSITORY(0),
+ GRAPH_BACKED_SEARCH_INDEXER(1),
+ TYPEDEF_STORE_INITIALIZER(2),
+ DEFAULT_METADATA_SERVICE(3),
+ NOTIFICATION_HOOK_CONSUMER(4);
+
+
+ private final int order;
+
+ private HandlerOrder(int order) {
+ this.order = order;
+ }
+
+ public int getOrder() { return order; }
+ }
/**
* Callback that is invoked on an implementor when this instance of Atlas server is declared the leader.
@@ -46,4 +62,13 @@ public interface ActiveStateChangeHandler {
* @throws {@link AtlasException} if anything is wrong on shutdown
*/
void instanceIsPassive() throws AtlasException;
+
+
+ /**
+ * Defines the order in which the handler should be called.
+ * When state becomes active, the handler will be called from low order to high
+ * When state becomes passive, the handler will be called from high order to low
+ *
+ */
+ int getHandlerOrder();
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/3959b318/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 6790512..4646bff 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -220,6 +220,11 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
stop();
}
+ @Override
+ public int getHandlerOrder() {
+ return HandlerOrder.NOTIFICATION_HOOK_CONSUMER.getOrder();
+ }
+
static class Timer {
public void sleep(int interval) throws InterruptedException {
Thread.sleep(interval);
http://git-wip-us.apache.org/repos/asf/atlas/blob/3959b318/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java b/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java
index 5071204..ad0cb84 100644
--- a/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java
+++ b/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java
@@ -34,7 +34,9 @@ import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
import java.util.Set;
/**
@@ -58,7 +60,7 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene
private final ServiceState serviceState;
private final ActiveInstanceState activeInstanceState;
private Set<ActiveStateChangeHandler> activeStateChangeHandlerProviders;
- private Collection<ActiveStateChangeHandler> activeStateChangeHandlers;
+ private List<ActiveStateChangeHandler> activeStateChangeHandlers;
private CuratorFactory curatorFactory;
private LeaderLatch leaderLatch;
private String serverId;
@@ -158,6 +160,17 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene
private void cacheActiveStateChangeHandlers() {
if (activeStateChangeHandlers.size()==0) {
activeStateChangeHandlers.addAll(activeStateChangeHandlerProviders);
+
+ LOG.info("activeStateChangeHandlers(): before reorder: " + activeStateChangeHandlers);
+
+ Collections.sort(activeStateChangeHandlers, new Comparator<ActiveStateChangeHandler>() {
+ @Override
+ public int compare(ActiveStateChangeHandler lhs, ActiveStateChangeHandler rhs) {
+ return Integer.compare(lhs.getHandlerOrder(), rhs.getHandlerOrder());
+ }
+ });
+
+ LOG.info("activeStateChangeHandlers(): after reorder: " + activeStateChangeHandlers);
}
}
@@ -177,9 +190,9 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene
public void notLeader() {
LOG.warn("Server instance with server id {} is removed as leader", serverId);
serviceState.becomingPassive();
- for (ActiveStateChangeHandler handler: activeStateChangeHandlers) {
+ for (int idx = activeStateChangeHandlers.size() - 1; idx >= 0; idx--) {
try {
- handler.instanceIsPassive();
+ activeStateChangeHandlers.get(idx).instanceIsPassive();
} catch (AtlasException e) {
LOG.error("Error while reacting to passive state.", e);
}