You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2017/10/16 21:53:54 UTC
metron git commit: METRON-1249 Improve Metron MPack Service Checks
(nickwallen) closes apache/metron#799
Repository: metron
Updated Branches:
refs/heads/master 4c908b95b -> fef8833c1
METRON-1249 Improve Metron MPack Service Checks (nickwallen) closes apache/metron#799
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/fef8833c
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/fef8833c
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/fef8833c
Branch: refs/heads/master
Commit: fef8833c153fabef597084f4aace8303d9f7116e
Parents: 4c908b9
Author: nickwallen <ni...@nickallen.org>
Authored: Mon Oct 16 17:53:20 2017 -0400
Committer: nickallen <ni...@apache.org>
Committed: Mon Oct 16 17:53:20 2017 -0400
----------------------------------------------------------------------
.../package/scripts/enrichment_commands.py | 62 +++-
.../package/scripts/indexing_commands.py | 41 ++-
.../package/scripts/management_ui_commands.py | 26 ++
.../package/scripts/management_ui_master.py | 12 +-
.../CURRENT/package/scripts/metron_service.py | 281 ++++++++++++++++---
.../CURRENT/package/scripts/parser_commands.py | 66 +++--
.../package/scripts/profiler_commands.py | 44 ++-
.../CURRENT/package/scripts/rest_commands.py | 29 +-
.../CURRENT/package/scripts/service_check.py | 49 +++-
9 files changed, 520 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/fef8833c/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py
index 794b6a5..90a690e 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py
@@ -18,6 +18,7 @@ limitations under the License.
import os
import time
from datetime import datetime
+from resource_management.core.exceptions import Fail
from resource_management.core.logger import Logger
from resource_management.core.resources.system import Execute, File
@@ -47,6 +48,12 @@ class EnrichmentCommands:
self.__hbase_acl_configured = os.path.isfile(self.__params.enrichment_hbase_acl_configured_flag_file)
self.__geo_configured = os.path.isfile(self.__params.enrichment_geo_configured_flag_file)
+ def __get_topics(self):
+ return [self.__enrichment_topic, self.__params.enrichment_error_topic]
+
+ def __get_kafka_acl_groups(self):
+ return [self.__enrichment_topic]
+
def is_kafka_configured(self):
return self.__kafka_configured
@@ -105,15 +112,15 @@ class EnrichmentCommands:
def init_kafka_topics(self):
Logger.info('Creating Kafka topics for enrichment')
# All errors go to indexing topics, so create it here if it's not already
- metron_service.init_kafka_topics(self.__params, [self.__enrichment_topic, self.__params.enrichment_error_topic])
+ metron_service.init_kafka_topics(self.__params, self.__get_topics())
self.set_kafka_configured()
def init_kafka_acls(self):
Logger.info('Creating Kafka ACls for enrichment')
+ metron_service.init_kafka_acls(self.__params, self.__get_topics())
+
# Enrichment topic names matches group
- metron_service.init_kafka_acls(self.__params,
- [self.__enrichment_topic, self.__params.enrichment_error_topic],
- [self.__enrichment_topic])
+ metron_service.init_kafka_acl_groups(self.__params, self.__get_kafka_acl_groups())
self.set_kafka_acl_configured()
@@ -182,6 +189,7 @@ class EnrichmentCommands:
self.__params.hbase_keytab_path,
self.__params.hbase_principal_name,
execute_user=self.__params.hbase_user)
+
cmd = "echo \"create '{0}','{1}'\" | hbase shell -n"
add_enrichment_cmd = cmd.format(self.__params.enrichment_hbase_table, self.__params.enrichment_hbase_cf)
Execute(add_enrichment_cmd,
@@ -211,6 +219,7 @@ class EnrichmentCommands:
self.__params.hbase_keytab_path,
self.__params.hbase_principal_name,
execute_user=self.__params.hbase_user)
+
cmd = "echo \"grant '{0}', 'RW', '{1}'\" | hbase shell -n"
add_enrichment_acl_cmd = cmd.format(self.__params.metron_user, self.__params.enrichment_hbase_table)
Execute(add_enrichment_acl_cmd,
@@ -232,3 +241,48 @@ class EnrichmentCommands:
Logger.info("Done setting HBase ACLs")
self.set_hbase_acl_configured()
+
+ def service_check(self, env):
+ """
+ Performs a service check for Enrichment.
+ :param env: Environment
+ """
+ Logger.info("Checking for Geo database")
+ metron_service.check_hdfs_file_exists(self.__params, self.__params.geoip_hdfs_dir + "/GeoLite2-City.mmdb.gz")
+
+ Logger.info('Checking Kafka topics for Enrichment')
+ metron_service.check_kafka_topics(self.__params, self.__get_topics())
+
+ Logger.info("Checking HBase for Enrichment")
+ metron_service.check_hbase_table(
+ self.__params,
+ self.__params.enrichment_hbase_table)
+ metron_service.check_hbase_column_family(
+ self.__params,
+ self.__params.enrichment_hbase_table,
+ self.__params.enrichment_hbase_cf)
+
+ Logger.info("Checking HBase for Threat Intel")
+ metron_service.check_hbase_table(
+ self.__params,
+ self.__params.threatintel_hbase_table)
+ metron_service.check_hbase_column_family(
+ self.__params,
+ self.__params.threatintel_hbase_table,
+ self.__params.threatintel_hbase_cf)
+
+ if self.__params.security_enabled:
+
+ Logger.info('Checking Kafka ACLs for Enrichment')
+ metron_service.check_kafka_acls(self.__params, self.__get_topics())
+ metron_service.check_kafka_acl_groups(self.__params, self.__get_kafka_acl_groups())
+
+ Logger.info("Checking HBase ACLs for Enrichment")
+ metron_service.check_hbase_acls(self.__params, self.__params.enrichment_hbase_table)
+ metron_service.check_hbase_acls(self.__params, self.__params.threatintel_hbase_table)
+
+ Logger.info("Checking for Enrichment topology")
+ if not self.is_topology_active(env):
+ raise Fail("Enrichment topology not running")
+
+ Logger.info("Enrichment service check completed successfully")
http://git-wip-us.apache.org/repos/asf/metron/blob/fef8833c/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py
index 50457d0..17374eb 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py
@@ -19,6 +19,7 @@ import os
import time
from datetime import datetime
+from resource_management.core.exceptions import Fail
from resource_management.core.logger import Logger
from resource_management.core.resources.system import Execute, File
@@ -49,6 +50,13 @@ class IndexingCommands:
self.__hbase_acl_configured = os.path.isfile(self.__params.indexing_hbase_acl_configured_flag_file)
self.__hdfs_perm_configured = os.path.isfile(self.__params.indexing_hdfs_perm_configured_flag_file)
+ def __get_topics(self):
+ return [self.__indexing_topic]
+
+ def __get_kafka_acl_groups(self):
+ # Indexed topic names matches the group
+ return [self.__indexing_topic]
+
def is_configured(self):
return self.__configured
@@ -121,12 +129,12 @@ class IndexingCommands:
def init_kafka_topics(self):
Logger.info('Creating Kafka topics for indexing')
- metron_service.init_kafka_topics(self.__params, [self.__indexing_topic])
+ metron_service.init_kafka_topics(self.__params, self.__get_topics())
def init_kafka_acls(self):
Logger.info('Creating Kafka ACLs for indexing')
- # Indexed topic names matches the group
- metron_service.init_kafka_acls(self.__params, [self.__indexing_topic], [self.__indexing_topic])
+ metron_service.init_kafka_acls(self.__params, self.__get_topics())
+ metron_service.init_kafka_acl_groups(self.__params, self.__get_kafka_acl_groups())
def init_hdfs_dir(self):
Logger.info('Setting up HDFS indexing directory')
@@ -213,3 +221,30 @@ class IndexingCommands:
is_running = topologies[self.__indexing_topology] in ['ACTIVE', 'REBALANCING']
active &= is_running
return active
+
+ def service_check(self, env):
+ """
+ Performs a service check for Indexing.
+ :param env: Environment
+ """
+ Logger.info('Checking Kafka topics for Indexing')
+ metron_service.check_kafka_topics(self.__params, self.__get_topics())
+
+ Logger.info("Checking HBase for Indexing")
+ metron_service.check_hbase_table(self.__params, self.__params.update_hbase_table)
+ metron_service.check_hbase_column_family(self.__params, self.__params.update_hbase_table, self.__params.update_hbase_cf)
+
+ if self.__params.security_enabled:
+
+ Logger.info('Checking Kafka ACLs for Indexing')
+ metron_service.check_kafka_acls(self.__params, self.__get_topics())
+ metron_service.check_kafka_acl_groups(self.__params, self.__get_kafka_acl_groups())
+
+ Logger.info("Checking HBase ACLs for Indexing")
+ metron_service.check_hbase_acls(self.__params, self.__params.update_hbase_table)
+
+ Logger.info("Checking for Indexing topology")
+ if not self.is_topology_active(env):
+ raise Fail("Indexing topology not running")
+
+ Logger.info("Indexing service check completed successfully")
http://git-wip-us.apache.org/repos/asf/metron/blob/fef8833c/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/management_ui_commands.py
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/management_ui_commands.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/management_ui_commands.py
index de67f64..7427046 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/management_ui_commands.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/management_ui_commands.py
@@ -20,6 +20,9 @@ limitations under the License.
from resource_management.core.logger import Logger
from resource_management.core.resources.system import Execute, File
+from resource_management.core.exceptions import ComponentIsNotRunning
+from resource_management.core.exceptions import ExecutionFailed
+from resource_management.libraries.functions.get_user_call_output import get_user_call_output
# Wrap major operations and functionality in this class
class ManagementUICommands:
@@ -44,3 +47,26 @@ class ManagementUICommands:
Logger.info('Restarting the Management UI')
Execute('service metron-management-ui restart')
Logger.info('Done restarting the Management UI')
+
+ def check_status(self, env):
+ Logger.info('Status check the Management UI')
+ cmd = "curl --max-time 3 {0}:{1}"
+ try:
+ Execute(
+ cmd.format(self.__params.hostname, self.__params.metron_management_ui_port),
+ tries=3,
+ try_sleep=5,
+ logoutput=False,
+ user=self.__params.metron_user)
+ except:
+ raise ComponentIsNotRunning()
+
+ def service_check(self, env):
+ """
+ Performs a service check for the Management UI
+ :param env: Environment
+ """
+ from params import status_params
+ env.set_params(status_params)
+ self.check_status(env)
+ Logger.info("Management UI service check completed successfully")
http://git-wip-us.apache.org/repos/asf/metron/blob/fef8833c/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/management_ui_master.py
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/management_ui_master.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/management_ui_master.py
index 54e91aa..15bcd94 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/management_ui_master.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/management_ui_master.py
@@ -17,16 +17,12 @@ limitations under the License.
"""
-from resource_management.core.exceptions import ComponentIsNotRunning
-from resource_management.core.exceptions import ExecutionFailed
from resource_management.core.resources.system import Directory
from resource_management.core.resources.system import File
from resource_management.core.source import Template
from resource_management.libraries.functions.format import format
-from resource_management.libraries.functions.get_user_call_output import get_user_call_output
from resource_management.libraries.script import Script
from resource_management.core.resources.system import Execute
-
from resource_management.core.logger import Logger
from management_ui_commands import ManagementUICommands
@@ -40,7 +36,6 @@ class ManagementUIMaster(Script):
self.install_packages(env)
def configure(self, env, upgrade_type=None, config_dir=None):
- print 'configure managment_ui'
from params import params
env.set_params(params)
File(format("/etc/default/metron"),
@@ -77,11 +72,8 @@ class ManagementUIMaster(Script):
def status(self, env):
from params import status_params
env.set_params(status_params)
- cmd = format('curl --max-time 3 {hostname}:{metron_management_ui_port}')
- try:
- get_user_call_output(cmd, user=status_params.metron_user)
- except ExecutionFailed:
- raise ComponentIsNotRunning()
+ commands = ManagementUICommands(status_params)
+ commands.check_status(env)
def restart(self, env):
from params import params
http://git-wip-us.apache.org/repos/asf/metron/blob/fef8833c/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py
index 84dc805..2ae0b08 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py
@@ -20,12 +20,12 @@ import subprocess
from datetime import datetime
from resource_management.core.logger import Logger
+from resource_management.core.exceptions import Fail
from resource_management.core.resources.system import Directory, File
from resource_management.core.resources.system import Execute
from resource_management.core.source import InlineTemplate
from resource_management.libraries.functions import format as ambari_format
-from resource_management.libraries.functions.get_user_call_output import \
- get_user_call_output
+from resource_management.libraries.functions.get_user_call_output import get_user_call_output
from metron_security import kinit
@@ -134,7 +134,6 @@ def refresh_configs(params):
def get_running_topologies(params):
Logger.info('Getting Running Storm Topologies from Storm REST Server')
-
Logger.info('Security enabled? ' + str(params.security_enabled))
# Want to sudo to the metron user and kinit as them so we aren't polluting root with Metron's Kerberos tickets.
@@ -201,36 +200,246 @@ def init_kafka_topics(params, topics):
user=params.kafka_user, tries=3, try_sleep=5, logoutput=True)
Logger.info("Done creating Kafka topics")
-def init_kafka_acls(params, topics, groups):
- Logger.info('Creating Kafka ACLs')
-
- acl_template = """{0}/kafka-acls.sh \
- --authorizer kafka.security.auth.SimpleAclAuthorizer \
- --authorizer-properties zookeeper.connect={1} \
- --add \
- --allow-principal User:{2} \
- --topic {3}"""
-
- for topic in topics:
- Logger.info("Creating ACL for topic '{0}'".format(topic))
- Execute(acl_template.format(params.kafka_bin_dir,
- params.zookeeper_quorum,
- params.metron_user,
- topic),
- user=params.kafka_user, tries=3, try_sleep=5, logoutput=True)
-
- acl_template = """{0}/kafka-acls.sh \
- --authorizer kafka.security.auth.SimpleAclAuthorizer \
- --authorizer-properties zookeeper.connect={1} \
- --add \
- --allow-principal User:{2} \
- --group {3}"""
-
- for group in groups:
- Logger.info("Creating ACL for group '{0}'".format(group))
- Execute(acl_template.format(params.kafka_bin_dir,
- params.zookeeper_quorum,
- params.metron_user,
- group),
- user=params.kafka_user, tries=3, try_sleep=5, logoutput=True)
- Logger.info("Done creating Kafka ACLs")
+def init_kafka_acls(params, topics):
+ Logger.info('Creating Kafka topic ACLs')
+ acl_template = """{0}/kafka-acls.sh \
+ --authorizer kafka.security.auth.SimpleAclAuthorizer \
+ --authorizer-properties zookeeper.connect={1} \
+ --add \
+ --allow-principal User:{2} \
+ --topic {3}"""
+
+ for topic in topics:
+ Logger.info("Creating ACL for topic '{0}'".format(topic))
+ Execute(acl_template.format(params.kafka_bin_dir,
+ params.zookeeper_quorum,
+ params.metron_user,
+ topic),
+ user=params.kafka_user, tries=3, try_sleep=5, logoutput=True)
+
+def init_kafka_acl_groups(params, groups):
+ Logger.info('Creating Kafka group ACLs')
+ acl_template = """{0}/kafka-acls.sh \
+ --authorizer kafka.security.auth.SimpleAclAuthorizer \
+ --authorizer-properties zookeeper.connect={1} \
+ --add \
+ --allow-principal User:{2} \
+ --group {3}"""
+
+ for group in groups:
+ Logger.info("Creating ACL for group '{0}'".format(group))
+ Execute(acl_template.format(params.kafka_bin_dir,
+ params.zookeeper_quorum,
+ params.metron_user,
+ group),
+ user=params.kafka_user, tries=3, try_sleep=5, logoutput=True)
+
+def execute(cmd, user, err_msg=None, tries=3, try_sleep=5, logoutput=True, path='/usr/sbin:/sbin:/usr/local/bin:/bin:/usr/bin'):
+ """
+ Executes a command and raises an appropriate error message if the command
+ fails.
+ :param cmd: The command to execute.
+ :param user: The user to execute the command as.
+ :param err_msg: The error message to display if the command fails.
+ :param tries: The number of attempts to execute the command.
+ :param try_sleep: The time between attempts.
+ :param logoutput: If true, log the command output.
+ :param path: The path use when running the command.
+ :return:
+ """
+ try:
+ Execute(cmd, tries=tries, try_sleep=try_sleep, logoutput=logoutput, user=user, path=path)
+ except:
+ if err_msg is None:
+ err_msg = "Execution failed: cmd={0}, user={1}".format(cmd, user)
+ raise Fail(err_msg)
+
+def check_kafka_topics(params, topics):
+ """
+ Validates that the Kafka topics exist. An exception is raised if any of the
+ topics do not exist.
+ :param params:
+ :param topics: A list of topic names.
+ """
+
+ # if needed kinit as 'metron'
+ if params.security_enabled:
+ kinit(params.kinit_path_local,
+ params.metron_keytab_path,
+ params.metron_principal_name,
+ execute_user=params.metron_user)
+
+ template = """{0}/kafka-topics.sh \
+ --zookeeper {1} \
+ --list | \
+ awk 'BEGIN {{cnt=0;}} /{2}/ {{cnt++}} END {{if (cnt > 0) {{exit 0}} else {{exit 1}}}}'"""
+
+ for topic in topics:
+ Logger.info("Checking existence of Kafka topic '{0}'".format(topic))
+ cmd = template.format(params.kafka_bin_dir, params.zookeeper_quorum, topic)
+ err_msg = "Missing Kafka topic; topic={0}".format(topic)
+ execute(cmd, user=params.kafka_user, err_msg=err_msg)
+
+def check_hbase_table(params, table):
+ """
+ Validates that an HBase table exists. An exception is raised if the table
+ does not exist.
+ :param params:
+ :param table: The name of the HBase table.
+ """
+ Logger.info("Checking HBase table '{0}'".format(table))
+
+ # if needed kinit as 'hbase'
+ if params.security_enabled:
+ kinit(params.kinit_path_local,
+ params.hbase_keytab_path,
+ params.hbase_principal_name,
+ execute_user=params.hbase_user)
+
+ template = "echo \"exists '{0}'\" | hbase shell -n | grep 'Table {1} does exist'"
+ cmd = template.format(table, table)
+ err_msg = "Missing HBase table; table={0}".format(table)
+ execute(cmd, user=params.hbase_user, err_msg=err_msg)
+
+def check_hbase_column_family(params, table, column_family):
+ """
+ Validates that an HBase column family exists. An exception is raised if the
+ column family does not exist.
+ :param params:
+ :param table: The name of the HBase table.
+ :param column_family: The name of the HBase column family.
+ """
+ Logger.info("Checking column family '{0}:{1}'".format(table, column_family))
+
+ # if needed kinit as 'hbase'
+ if params.security_enabled:
+ kinit(params.kinit_path_local,
+ params.hbase_keytab_path,
+ params.hbase_principal_name,
+ execute_user=params.hbase_user)
+
+ template = "echo \"desc '{0}'\" | hbase shell -n | grep \"NAME => '{1}'\""
+ cmd = template.format(table, column_family)
+ err_msg = "Missing HBase column family; table={0}, cf={1}".format(table, column_family)
+ execute(cmd, user=params.hbase_user, err_msg=err_msg)
+
+def check_hbase_acls(params, table, user=None, permissions="READ,WRITE"):
+ """
+ Validates that HBase table permissions exist for a user. An exception is
+ raised if the permissions do not exist.
+ :param params:
+ :param table: The name of the HBase table.
+ :param user: The name of the user.
+ :param permissions: The permissions that should exist.
+ """
+ if user is None:
+ user = params.metron_user
+ Logger.info("Checking HBase ACLs; table={0}, user={1}, permissions={2}".format(table, user, permissions))
+
+ # if needed kinit as 'hbase'
+ if params.security_enabled:
+ kinit(params.kinit_path_local,
+ params.hbase_keytab_path,
+ params.hbase_principal_name,
+ execute_user=params.hbase_user)
+
+
+
+ template = """echo "user_permission '{0}'" | \
+ hbase shell -n | \
+ grep " {1} " | \
+ grep "actions={2}"
+ """
+ cmd = template.format(table, user, permissions)
+ err_msg = "Missing HBase access; table={0}, user={1}, permissions={2}".format(table, user, permissions)
+ execute(cmd, user=params.hbase_user, err_msg=err_msg)
+
+def check_hdfs_dir_exists(params, path, user=None):
+ """
+ Validate that a directory exists in HDFS.
+ :param params:
+ :param path: The directory path in HDFS.
+ :param user: The user to execute the check under.
+ """
+ if user is None:
+ user = params.metron_user
+ Logger.info("Checking HDFS; directory={0} user={1}".format(path, user))
+
+ # if needed kinit as 'metron'
+ if params.security_enabled:
+ kinit(params.kinit_path_local,
+ params.metron_keytab_path,
+ params.metron_principal_name,
+ execute_user=params.metron_user)
+
+ template = "{0}/hdfs dfs -test -d {1}"
+ cmd = template.format(params.hadoop_bin_dir, path)
+ err_msg = "Missing directory in HDFS: directory={0} user={1}".format(path, user)
+ execute(cmd, user=params.metron_user, err_msg=err_msg)
+
+def check_hdfs_file_exists(params, path, user=None):
+ """
+ Validate that a file exists in HDFS.
+ :param params:
+ :param path: The file path in HDFS.
+ :param user: The user to execute the check under.
+ """
+ if user is None:
+ user = params.metron_user
+ Logger.info("Checking HDFS; file={0}, user={1}".format(path, user))
+
+ # if needed kinit as 'metron'
+ if params.security_enabled:
+ kinit(params.kinit_path_local,
+ params.metron_keytab_path,
+ params.metron_principal_name,
+ execute_user=params.metron_user)
+
+ template = "{0}/hdfs dfs -test -f {1}"
+ cmd = template.format(params.hadoop_bin_dir, path)
+ err_msg = "Missing file in HDFS; file={0}".format(path)
+ execute(cmd, user=user, err_msg=err_msg)
+
+def check_kafka_acls(params, topics, user=None):
+ """
+ Validate that permissions have been granted for a list of Kakfa topics.
+ :param params:
+ :param topics: A list of topic names.
+ :param user: The user whose access is checked.
+ """
+ if user is None:
+ user = params.metron_user
+
+ template = """{0}/kafka-acls.sh \
+ --authorizer kafka.security.auth.SimpleAclAuthorizer \
+ --authorizer-properties zookeeper.connect={1} \
+ --topic {2} \
+ --list | grep 'User:{3}'"""
+
+ for topic in topics:
+ Logger.info("Checking ACL; topic={0}, user={1}'".format(topic, user))
+ cmd = template.format(params.kafka_bin_dir, params.zookeeper_quorum, topic, user)
+ err_msg = "Missing Kafka access; topic={0}, user={1}".format(topic, user)
+ execute(cmd, user=params.kafka_user, err_msg=err_msg)
+
+def check_kafka_acl_groups(params, groups, user=None):
+ """
+ Validate that Kafka group permissions have been granted.
+ :param params:
+ :param groups: A list of group name.
+ :param user: The user whose access is checked.
+ """
+ if user is None:
+ user = params.metron_user
+
+ template = """{0}/kafka-acls.sh \
+ --authorizer kafka.security.auth.SimpleAclAuthorizer \
+ --authorizer-properties zookeeper.connect={1} \
+ --group {2} \
+ --list | grep 'User:{3}'"""
+
+ for group in groups:
+ Logger.info("Checking group ACL for topic '{0}'".format(group))
+ cmd = template.format(params.kafka_bin_dir, params.zookeeper_quorum, group, user)
+ err_msg = "Missing Kafka group access; group={0}, user={1}".format(group, user)
+ execute(cmd, user=params.kafka_user, err_msg=err_msg)
http://git-wip-us.apache.org/repos/asf/metron/blob/fef8833c/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py
index 9483498..274306a 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py
@@ -24,6 +24,7 @@ import subprocess
import time
from datetime import datetime
+from resource_management.core.exceptions import Fail
from resource_management.core.logger import Logger
from resource_management.core.resources.system import Execute, File
@@ -50,6 +51,17 @@ class ParserCommands:
def __get_parsers(self, params):
return params.parsers.replace(' ', '').split(',')
+ def __get_topics(self):
+ # All errors go to indexing topics, so create it here if it's not already
+ # Getting topics this way is a bit awkward, but I don't want to append to actual list, so copy it
+ topics = list(self.get_parser_list())
+ topics.append(self.__params.enrichment_error_topic)
+ return topics
+
+ def __get_kafka_acl_groups(self):
+ # Parser group is the parser name + '_parser'
+ return [parser + '_parser' for parser in self.get_parser_list()]
+
def is_configured(self):
return self.__configured
@@ -63,9 +75,13 @@ class ParserCommands:
metron_service.set_configured(self.__params.metron_user, self.__params.parsers_acl_configured_flag_file, "Setting Parsers ACL configured to true")
def init_parsers(self):
- Logger.info(
- "Copying grok patterns from local directory '{0}' to HDFS '{1}'".format(self.__params.local_grok_patterns_dir,
- self.__params.hdfs_grok_patterns_dir))
+ self.init_grok_patterns()
+ Logger.info("Done initializing parser configuration")
+
+ def init_grok_patterns(self):
+ Logger.info("Copying grok patterns from local directory '{0}' to HDFS '{1}'"
+ .format(self.__params.local_grok_patterns_dir,
+ self.__params.hdfs_grok_patterns_dir))
self.__params.HdfsResource(self.__params.hdfs_grok_patterns_dir,
type="directory",
@@ -75,29 +91,17 @@ class ParserCommands:
source=self.__params.local_grok_patterns_dir,
recursive_chown = True)
- Logger.info("Done initializing parser configuration")
-
def get_parser_list(self):
return self.__parser_list
def init_kafka_topics(self):
- Logger.info('Creating Kafka topics for parsers')
- # All errors go to indexing topics, so create it here if it's not already
- # Getting topics this way is a bit awkward, but I don't want to append to actual list, so copy it
- topics = list(self.get_parser_list())
- topics.append(self.__params.enrichment_error_topic)
- metron_service.init_kafka_topics(self.__params, topics)
+ Logger.info('Creating Kafka topics for Parsers')
+ metron_service.init_kafka_topics(self.__params, self.__get_topics())
def init_kafka_acls(self):
- Logger.info('Creating Kafka ACLs for parsers')
-
- # Getting topics this way is a bit awkward, but I don't want to modify the actual list, so copy it
- topics = list(self.get_parser_list())
- topics.append(self.__params.enrichment_error_topic)
- # Parser group is the parser name + '_parser'
- metron_service.init_kafka_acls(self.__params,
- topics,
- [parser + '_parser' for parser in self.get_parser_list()])
+ Logger.info('Creating Kafka ACLs for Parsers')
+ metron_service.init_kafka_acls(self.__params, self.__get_topics())
+ metron_service.init_kafka_acl_groups(self.__params, self.__get_kafka_acl_groups())
def start_parser_topologies(self, env):
Logger.info("Starting Metron parser topologies: {0}".format(self.get_parser_list()))
@@ -212,3 +216,25 @@ class ParserCommands:
def __is_running(self, status):
return status in ['ACTIVE', 'REBALANCING']
+
+ def service_check(self, env):
+ """
+ Performs a service check for the Parsers.
+ :param env: Environment
+ """
+ Logger.info("Checking for grok patterns in HDFS for Parsers")
+ metron_service.check_hdfs_dir_exists(self.__params, self.__params.hdfs_grok_patterns_dir)
+
+ Logger.info('Checking Kafka topics for Parsers')
+ metron_service.check_kafka_topics(self.__params, self.__get_topics())
+
+ if self.__params.security_enabled:
+ Logger.info('Checking Kafka ACLs for Parsers')
+ metron_service.check_kafka_acls(self.__params, self.__get_topics())
+ metron_service.check_kafka_acl_groups(self.__params, self.__get_kafka_acl_groups())
+
+ Logger.info("Checking for Parser topologies")
+ if not self.topologies_running(env):
+ raise Fail("Parser topologies not running")
+
+ Logger.info("Parser service check completed successfully")
http://git-wip-us.apache.org/repos/asf/metron/blob/fef8833c/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/profiler_commands.py
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/profiler_commands.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/profiler_commands.py
index 21c1225..41cab06 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/profiler_commands.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/profiler_commands.py
@@ -19,6 +19,7 @@ import os
import time
from datetime import datetime
+from resource_management.core.exceptions import Fail
from resource_management.core.logger import Logger
from resource_management.core.resources.system import Execute, File
@@ -47,6 +48,12 @@ class ProfilerCommands:
self.__hbase_configured = os.path.isfile(self.__params.profiler_hbase_configured_flag_file)
self.__hbase_acl_configured = os.path.isfile(self.__params.profiler_hbase_acl_configured_flag_file)
+ def __get_topics(self):
+ return [self.__profiler_topic]
+
+ def __get_kafka_acl_groups(self):
+ return ['profiler']
+
def is_configured(self):
return self.__configured
@@ -72,7 +79,7 @@ class ProfilerCommands:
metron_service.set_configured(self.__params.metron_user, self.__params.profiler_hbase_acl_configured_flag_file, "Setting HBase ACL configured to True for profiler")
def create_hbase_tables(self):
- Logger.info("Creating HBase Tables for profiler")
+ Logger.info("Creating HBase table '{0}' for profiler".format(self.__params.profiler_hbase_table))
if self.__params.security_enabled:
metron_security.kinit(self.__params.kinit_path_local,
self.__params.hbase_keytab_path,
@@ -88,12 +95,13 @@ class ProfilerCommands:
user=self.__params.hbase_user
)
- Logger.info("Done creating HBase Tables for profiler")
self.set_hbase_configured()
+ Logger.info("Done creating HBase Tables for profiler")
def init_kafka_acls(self):
Logger.info('Creating Kafka ACls for profiler')
- metron_service.init_kafka_acls(self.__params, [self.__profiler_topic], ['profiler'])
+ metron_service.init_kafka_acls(self.__params, self.__get_topics())
+ metron_service.init_kafka_acl_groups(self.__params, self.__get_kafka_acl_groups())
def set_hbase_acls(self):
Logger.info("Setting HBase ACLs for profiler")
@@ -102,6 +110,7 @@ class ProfilerCommands:
self.__params.hbase_keytab_path,
self.__params.hbase_principal_name,
execute_user=self.__params.hbase_user)
+
cmd = "echo \"grant '{0}', 'RW', '{1}'\" | hbase shell -n"
add_table_acl_cmd = cmd.format(self.__params.metron_user, self.__params.profiler_hbase_table)
Execute(add_table_acl_cmd,
@@ -112,8 +121,8 @@ class ProfilerCommands:
user=self.__params.hbase_user
)
- Logger.info("Done setting HBase ACLs for profiler")
self.set_hbase_acl_configured()
+ Logger.info("Done setting HBase ACLs for profiler")
def start_profiler_topology(self, env):
Logger.info('Starting ' + self.__profiler_topology)
@@ -182,3 +191,30 @@ class ProfilerCommands:
is_running = topologies[self.__profiler_topology] in ['ACTIVE', 'REBALANCING']
active &= is_running
return active
+
+ def service_check(self, env):
+ """
+ Performs a service check for the Profiler.
+ :param env: Environment
+ """
+ Logger.info('Checking Kafka topics for Profiler')
+ metron_service.check_kafka_topics(self.__params, [self.__params.profiler_input_topic])
+
+ Logger.info("Checking HBase table for profiler")
+ metron_service.check_hbase_table(self.__params, self.__params.profiler_hbase_table)
+ metron_service.check_hbase_column_family(self.__params, self.__params.profiler_hbase_table, self.__params.profiler_hbase_cf)
+
+ if self.__params.security_enabled:
+
+ Logger.info('Checking Kafka ACLs for Profiler')
+ metron_service.check_kafka_acls(self.__params, self.__get_topics())
+ metron_service.check_kafka_acl_groups(self.__params, self.__get_kafka_acl_groups())
+
+ Logger.info('Checking Kafka ACLs for Profiler')
+ metron_service.check_hbase_acls(self.__params, self.__params.profiler_hbase_table)
+
+ Logger.info("Checking for Profiler topology")
+ if not self.is_topology_active(env):
+ raise Fail("Profiler topology not running")
+
+ Logger.info("Profiler service check completed successfully")
http://git-wip-us.apache.org/repos/asf/metron/blob/fef8833c/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_commands.py
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_commands.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_commands.py
index 09d7106..542fc08 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_commands.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_commands.py
@@ -59,16 +59,22 @@ class RestCommands:
def set_acl_configured(self):
metron_service.set_configured(self.__params.metron_user, self.__params.rest_acl_configured_flag_file, "Setting REST ACL configured to true")
+ def __get_topics(self):
+ return [self.__params.metron_escalation_topic]
+
def init_kafka_topics(self):
Logger.info('Creating Kafka topics for rest')
- topics = [self.__params.metron_escalation_topic]
- metron_service.init_kafka_topics(self.__params, topics)
+ metron_service.init_kafka_topics(self.__params, self.__get_topics())
def init_kafka_acls(self):
Logger.info('Creating Kafka ACLs for rest')
+
# The following topics must be permissioned for the rest application list operation
- topics = [self.__params.ambari_kafka_service_check_topic, self.__params.consumer_offsets_topic, self.__params.metron_escalation_topic]
- metron_service.init_kafka_acls(self.__params, topics, ['metron-rest'])
+ topics = self.__get_topics() + [self.__params.ambari_kafka_service_check_topic, self.__params.consumer_offsets_topic]
+ metron_service.init_kafka_acls(self.__params, topics)
+
+ groups = ['metron-rest']
+ metron_service.init_kafka_acl_groups(self.__params, groups)
def start_rest_application(self):
Logger.info('Starting REST application')
@@ -151,3 +157,18 @@ class RestCommands:
self.stop_rest_application()
self.start_rest_application()
Logger.info('Done restarting the REST application')
+
+ def service_check(self, env):
+ """
+ Performs a service check for the Metron API.
+ :param env: Environment
+ """
+ Logger.info('Checking Kafka topics for the REST application')
+ metron_service.check_kafka_topics(self.__params, self.__get_topics())
+
+ if self.__params.security_enabled:
+
+ Logger.info('Checking Kafka topic ACL for the REST application')
+ metron_service.check_kafka_acls(self.__params, self.__get_topics())
+
+ Logger.info("REST application service check completed successfully")
http://git-wip-us.apache.org/repos/asf/metron/blob/fef8833c/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/service_check.py
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/service_check.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/service_check.py
index 7dd9dfb..1aebecb 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/service_check.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/service_check.py
@@ -19,22 +19,53 @@ limitations under the License.
"""
from __future__ import print_function
+from resource_management.core.logger import Logger
from resource_management.libraries.script import Script
-from indexing_commands import IndexingCommands
from parser_commands import ParserCommands
-
+from enrichment_commands import EnrichmentCommands
+from indexing_commands import IndexingCommands
+from profiler_commands import ProfilerCommands
+from rest_commands import RestCommands
+from management_ui_commands import ManagementUICommands
class ServiceCheck(Script):
+
def service_check(self, env):
from params import params
- parsercommands = ParserCommands(params)
- indexingcommands = IndexingCommands(params)
- all_found = parsercommands.topologies_running(env) and indexingcommands.is_topology_active(env)
- if all_found:
- exit(0)
- else:
- exit(1)
+
+ # check the parsers
+ Logger.info("Performing Parser service check")
+ parser_cmds = ParserCommands(params)
+ parser_cmds.service_check(env)
+
+ # check enrichment
+ Logger.info("Performing Enrichment service check")
+ enrichment_cmds = EnrichmentCommands(params)
+ enrichment_cmds.service_check(env)
+
+ # check indexing
+ Logger.info("Performing Indexing service check")
+ indexing_cmds = IndexingCommands(params)
+ indexing_cmds.service_check(env)
+
+ # check the profiler
+ Logger.info("Performing Profiler service check")
+ profiler_cmds = ProfilerCommands(params)
+ profiler_cmds.service_check(env)
+
+ # check the rest api
+ Logger.info("Performing REST application service check")
+ rest_cmds = RestCommands(params)
+ rest_cmds.service_check(env)
+
+ # check the management UI
+ Logger.info("Performing Management UI service check")
+ mgmt_cmds = ManagementUICommands(params)
+ mgmt_cmds.service_check(env)
+
+ Logger.info("Metron service check completed successfully")
+ exit(0)
if __name__ == "__main__":