You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ja...@apache.org on 2016/02/10 00:33:17 UTC

ambari git commit: AMBARI-14959: Implement service check for secured PXF service (lavjain via jaoki)

Repository: ambari
Updated Branches:
  refs/heads/trunk 73fbe14c2 -> feb50e3a3


AMBARI-14959: Implement service check for secured PXF service (lavjain via jaoki)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/feb50e3a
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/feb50e3a
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/feb50e3a

Branch: refs/heads/trunk
Commit: feb50e3a3f5edb1105780d06254bb9538d19063e
Parents: 73fbe14
Author: Jun Aoki <ja...@apache.org>
Authored: Tue Feb 9 15:33:08 2016 -0800
Committer: Jun Aoki <ja...@apache.org>
Committed: Tue Feb 9 15:33:08 2016 -0800

----------------------------------------------------------------------
 .../PXF/3.0.0/package/scripts/params.py         | 17 +++-
 .../PXF/3.0.0/package/scripts/service_check.py  | 81 +++++++++++++++-----
 2 files changed, 76 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/feb50e3a/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/params.py b/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/params.py
index 7749de7..b3e85e4 100644
--- a/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/params.py
+++ b/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/params.py
@@ -22,6 +22,7 @@ from resource_management import Script
 from resource_management.libraries.functions.default import default
 from resource_management.libraries.functions import get_kinit_path
 from resource_management.libraries.resources.hdfs_resource import HdfsResource
+from resource_management.libraries.functions.namenode_ha_utils import get_active_namenode
 
 config = Script.get_config()
 
@@ -31,9 +32,10 @@ stack_name = str(config["hostLevelParams"]["stack_name"])
 # Users and Groups
 pxf_user = "pxf"
 pxf_group = pxf_user
-hdfs_superuser = config['configurations']['hadoop-env']['hdfs_user']
+hdfs_user = config['configurations']['hadoop-env']['hdfs_user']
 hdfs_superuser_group = config["configurations"]["hdfs-site"]["dfs.permissions.superusergroup"]
 user_group = config["configurations"]["cluster-env"]["user_group"]
+hbase_user = default('configurations/hbase-env/hbase_user', None)
 hive_user = default('configurations/hive-env/hive_user', None)
 tomcat_group = "tomcat"
 
@@ -60,14 +62,21 @@ is_hive_installed = default("/clusterHostInfo/hive_server_host", None) is not No
 # HDFS
 hdfs_site = config['configurations']['hdfs-site']
 default_fs = config['configurations']['core-site']['fs.defaultFS']
+namenode_path =  default('/configurations/hdfs-site/dfs.namenode.http-address', None)
+dfs_nameservice = default('/configurations/hdfs-site/dfs.nameservices', None)
+if dfs_nameservice:
+  namenode_path =  get_active_namenode(hdfs_site, security_enabled, hdfs_user)[1]
 
-hdfs_user_keytab = config['configurations']['hadoop-env']['hdfs_user_keytab']
+# keytabs and principals
 kinit_path_local = get_kinit_path(default('/configurations/kerberos-env/executable_search_paths', None))
-hdfs_principal_name = config['configurations']['hadoop-env']['hdfs_principal_name']
+hdfs_user_keytab = default('configurations/hadoop-env/hdfs_user_keytab', None)
+hdfs_principal_name = default('configurations/hadoop-env/hdfs_principal_name', None)
+hbase_user_keytab = default('configurations/hbase-env/hbase_user_keytab', None)
+hbase_principal_name = default('configurations/hbase-env/hbase_principal_name', None)
 
 # HDFSResource partial function
 HdfsResource = functools.partial(HdfsResource,
-    user=hdfs_superuser,
+    user=hdfs_user,
     security_enabled=security_enabled,
     keytab=hdfs_user_keytab,
     kinit_path_local=kinit_path_local,

http://git-wip-us.apache.org/repos/asf/ambari/blob/feb50e3a/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/service_check.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/service_check.py b/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/service_check.py
index 064be04..21b7c5d 100644
--- a/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/service_check.py
+++ b/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/service_check.py
@@ -15,15 +15,19 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 """
+import json
+
 from resource_management.libraries.script import Script
 from resource_management.core.exceptions import Fail
 from resource_management.core.logger import Logger
 from resource_management.core.system import System
 from resource_management.core.resources.system import Execute
-
+from resource_management.core.environment import Environment
+from resource_management.libraries.functions.curl_krb_request import curl_krb_request
 from pxf_utils import makeHTTPCall, runLocalCmd
 import pxf_constants
 
+
 class PXFServiceCheck(Script):
   """
   Runs a set of simple PXF tests to verify if the service has been setup correctly
@@ -44,11 +48,19 @@ class PXFServiceCheck(Script):
 
 
   def service_check(self, env):
-    Logger.info("Starting PXF service checks..")
-
+    """
+    Runs the service check for PXF
+    """
     import params
-    self.pxf_version = self.__get_pxf_protocol_version()
+    Logger.info("Starting PXF service checks..")
     try:
+      # Get delegation token if security is enabled
+      if params.security_enabled:
+        token = self.__get_delegation_token(params.hdfs_user, params.hdfs_user_keytab,
+                                            params.hdfs_principal_name, params.kinit_path_local)
+        self.commonPXFHeaders.update({"X-GP-TOKEN": token})
+
+      self.pxf_version = self.__get_pxf_protocol_version()
       self.run_hdfs_tests()
       if params.is_hbase_installed:
         self.run_hbase_tests()
@@ -59,7 +71,10 @@ class PXFServiceCheck(Script):
       Logger.error(msg)
       raise Fail(msg)
     finally:
-      self.cleanup_test_data()
+      try:
+        self.cleanup_test_data()
+      except Exception as e:
+        Logger.error(e)
 
     Logger.info("Service check completed successfully")
 
@@ -111,9 +126,28 @@ class PXFServiceCheck(Script):
         raise
     except:
       msg = "PXF data read failed"
+      Logger.error(msg)
       raise Fail(msg)
 
 
+  def __get_delegation_token(self, user, keytab, principal, kinit_path):
+    """
+    Gets the kerberos delegation token from name node
+    """
+    import params
+    url = params.namenode_path + "/webhdfs/v1/?op=GETDELEGATIONTOKEN"
+    Logger.info("Getting delegation token from {0}".format(url))
+    response, _, _  = curl_krb_request(Environment.get_instance().tmp_dir, keytab, principal,
+        url, "get_delegation_token", kinit_path, False, "Delegation Token", user)
+    json_response = json.loads(response)
+    if json_response['Token'] and json_response['Token']['urlString']:
+      return json_response['Token']['urlString']
+
+    msg = "Unable to get delegation token"
+    Logger.error(msg)
+    raise Fail(msg)
+
+
   # HDFS Routines
   def run_hdfs_tests(self):
     """
@@ -136,20 +170,20 @@ class PXFServiceCheck(Script):
         type="directory",
         action="create_on_execute",
         mode=0777
-        )
-
+    )
     params.HdfsResource(pxf_constants.pxf_hdfs_read_test_file,
         type="file",
         source="/etc/passwd",
         action="create_on_execute"
-        )
+    )
+    params.HdfsResource(None, action="execute")
 
   def __check_pxf_hdfs_read(self):
     """
     Reads the test HDFS data through PXF
     """
     Logger.info("Testing PXF HDFS read")
-    headers = { 
+    headers = {
         "X-GP-DATA-DIR": pxf_constants.pxf_hdfs_test_dir,
         "X-GP-profile": "HdfsTextSimple",
         }
@@ -182,6 +216,7 @@ class PXFServiceCheck(Script):
         raise 
     except:
       msg = "PXF HDFS data write test failed"
+      Logger.error(msg)
       raise Fail(msg)
 
   def __cleanup_hdfs_data(self):
@@ -193,11 +228,12 @@ class PXFServiceCheck(Script):
     params.HdfsResource(pxf_constants.pxf_hdfs_read_test_file,
         type="file",
         action="delete_on_execute"
-        )
+    )
     params.HdfsResource(pxf_constants.pxf_hdfs_test_dir,
         type="directory",
         action="delete_on_execute"
-        )
+    )
+    params.HdfsResource(None, action="execute")
 
 
   # HBase Routines
@@ -205,7 +241,11 @@ class PXFServiceCheck(Script):
     """
     Runs a set of PXF HBase checks
     """
+    import params
     Logger.info("Running PXF HBase checks")
+    if params.security_enabled:
+      Execute("{0} -kt {1} {2}".format(params.kinit_path_local, params.hbase_user_keytab, params.hbase_principal_name),
+              user = params.hbase_user)
     self.__cleanup_hbase_data()
     self.__check_if_client_exists("HBase")
     self.__write_hbase_data()
@@ -215,9 +255,12 @@ class PXFServiceCheck(Script):
     """
     Creates a temporary HBase table for the service checks
     """
+    import params
     Logger.info("Creating temporary HBase test data")
-    Execute("echo \"create '" + pxf_constants.pxf_hbase_test_table + "', 'cf'\"|hbase shell", logoutput = True)
-    Execute("echo \"put '" + pxf_constants.pxf_hbase_test_table + "', 'row1', 'cf:a', 'value1'; put '" + pxf_constants.pxf_hbase_test_table + "', 'row1', 'cf:b', 'value2'\" | hbase shell", logoutput = True)
+    cmd = "echo \"create '{0}', 'cf'\" | hbase shell".format(pxf_constants.pxf_hbase_test_table)
+    Execute(cmd, logoutput = True, user = params.hbase_user)
+    cmd = "echo \"put '{0}', 'row1', 'cf:a', 'value1'; put '{0}', 'row1', 'cf:b', 'value2'\" | hbase shell".format(pxf_constants.pxf_hbase_test_table)
+    Execute(cmd, logoutput = True, user = params.hbase_user)
 
   def __check_pxf_hbase_read(self):
     """
@@ -229,16 +272,18 @@ class PXFServiceCheck(Script):
         "X-GP-profile": "HBase",
         }
     headers.update(self.commonPXFHeaders)
-
     self.__check_pxf_read(headers)
 
   def __cleanup_hbase_data(self):
     """
     Cleans up the test HBase data
     """
+    import params
     Logger.info("Cleaning up HBase test data")
-    Execute("echo \"disable '" + pxf_constants.pxf_hbase_test_table + "'\"|hbase shell > /dev/null 2>&1", logoutput = True)
-    Execute("echo \"drop '" + pxf_constants.pxf_hbase_test_table + "'\"|hbase shell > /dev/null 2>&1", logoutput = True)
+    cmd = "echo \"disable '{0}'\" | hbase shell > /dev/null 2>&1".format(pxf_constants.pxf_hbase_test_table)
+    Execute(cmd, logoutput = True, user = params.hbase_user)
+    cmd = "echo \"drop '{0}'\" | hbase shell > /dev/null 2>&1".format(pxf_constants.pxf_hbase_test_table)
+    Execute(cmd, logoutput = True, user = params.hbase_user)
 
 
   # Hive Routines
@@ -259,7 +304,7 @@ class PXFServiceCheck(Script):
     import params
     Logger.info("Creating temporary Hive test data")
     cmd = "hive -e 'CREATE TABLE IF NOT EXISTS {0} (id INT); INSERT INTO {0} VALUES (1);'".format(pxf_constants.pxf_hive_test_table)
-    Execute(cmd, logoutput = True, user = params.hive_user)
+    Execute(cmd, logoutput = True, user = params.hdfs_user)
 
   def __check_pxf_hive_read(self):
     """
@@ -280,7 +325,7 @@ class PXFServiceCheck(Script):
     import params
     Logger.info("Cleaning up Hive test data")
     cmd = "hive -e 'DROP TABLE IF EXISTS {0};'".format(pxf_constants.pxf_hive_test_table)
-    Execute(cmd, logoutput = True, user = params.hive_user)
+    Execute(cmd, logoutput = True, user = params.hdfs_user)
 
 
   # Package Routines