You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by rm...@apache.org on 2018/04/27 19:29:54 UTC

[14/50] [abbrv] metron git commit: METRON-1491: The indexing topology restart logic is wrong (cstella via mmiklavc) closes apache/metron#964

METRON-1491: The indexing topology restart logic is wrong (cstella via mmiklavc) closes apache/metron#964


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/5ed9631a
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/5ed9631a
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/5ed9631a

Branch: refs/heads/feature/METRON-1416-upgrade-solr
Commit: 5ed9631a2936ec60d0ea6557ca4396cffdadc688
Parents: 3083b47
Author: cstella <ce...@gmail.com>
Authored: Tue Mar 20 16:08:02 2018 -0600
Committer: Michael Miklavcic <mi...@gmail.com>
Committed: Tue Mar 20 16:08:02 2018 -0600

----------------------------------------------------------------------
 .../package/scripts/indexing_commands.py        | 43 ++++++++++++++++----
 1 file changed, 35 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/5ed9631a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py
index 4c862f0..fd78119 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py
@@ -181,7 +181,7 @@ class IndexingCommands:
     def start_batch_indexing_topology(self, env):
         Logger.info('Starting ' + self.__batch_indexing_topology)
 
-        if not self.is_topology_active(env):
+        if not self.is_batch_topology_active(env):
             if self.__params.security_enabled:
                 metron_security.kinit(self.__params.kinit_path_local,
                                       self.__params.metron_keytab_path,
@@ -200,7 +200,7 @@ class IndexingCommands:
     def start_random_access_indexing_topology(self, env):
         Logger.info('Starting ' + self.__random_access_indexing_topology)
 
-        if not self.is_topology_active(env):
+        if not self.is_random_access_topology_active(env):
             if self.__params.security_enabled:
                 metron_security.kinit(self.__params.kinit_path_local,
                                       self.__params.metron_keytab_path,
@@ -263,21 +263,48 @@ class IndexingCommands:
 
     def restart_indexing_topology(self, env):
         Logger.info('Restarting the indexing topologies')
-        self.stop_indexing_topology(env)
+        self.restart_batch_indexing_topology(env)
+        self.restart_random_access_indexing_topology(env)
+
+    def restart_batch_indexing_topology(self, env):
+        Logger.info('Restarting the batch indexing topology')
+        self.stop_batch_indexing_topology(env)
+
+        # Wait for old topology to be cleaned up by Storm, before starting again.
+        retries = 0
+        topology_active = self.is_batch_topology_active(env)
+        while topology_active and retries < 3:
+            Logger.info('Existing batch topology still active. Will wait and retry')
+            time.sleep(10)
+            retries += 1
+            topology_active = self.is_batch_topology_active(env)
+
+        if not topology_active:
+            Logger.info('Waiting for storm kill to complete')
+            time.sleep(30)
+            self.start_batch_indexing_topology(env)
+            Logger.info('Done restarting the batch indexing topology')
+        else:
+            Logger.warning('Retries exhausted. Existing topology not cleaned up.  Aborting topology start.')
+
+    def restart_random_access_indexing_topology(self, env):
+        Logger.info('Restarting the random access indexing topology')
+        self.stop_random_access_indexing_topology(env)
 
         # Wait for old topology to be cleaned up by Storm, before starting again.
         retries = 0
-        topology_active = self.is_topology_active(env)
-        while self.is_topology_active(env) and retries < 3:
-            Logger.info('Existing topology still active. Will wait and retry')
+        topology_active = self.is_random_access_topology_active(env)
+        while topology_active and retries < 3:
+            Logger.info('Existing random access topology still active. Will wait and retry')
             time.sleep(10)
             retries += 1
+            topology_active = self.is_random_access_topology_active(env)
 
         if not topology_active:
             Logger.info('Waiting for storm kill to complete')
             time.sleep(30)
-            self.start_indexing_topology(env)
-            Logger.info('Done restarting the indexing topologies')
+            self.start_random_access_indexing_topology(env)
+            Logger.info('Done restarting the random access indexing topology')
         else:
             Logger.warning('Retries exhausted. Existing topology not cleaned up.  Aborting topology start.')