You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ja...@apache.org on 2016/02/22 23:43:40 UTC

ambari git commit: AMBARI-15061: PXF Service checks fails with timeout (bhuvnesh2703 via jaoki)

Repository: ambari
Updated Branches:
  refs/heads/branch-2.2 990f19b8a -> 8430fbd39


AMBARI-15061: PXF Service checks fails with timeout (bhuvnesh2703 via jaoki)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/8430fbd3
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/8430fbd3
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/8430fbd3

Branch: refs/heads/branch-2.2
Commit: 8430fbd3929383a6d40dc17b7a58e63a4f780e3c
Parents: 990f19b
Author: Jun Aoki <ja...@apache.org>
Authored: Mon Feb 22 14:43:33 2016 -0800
Committer: Jun Aoki <ja...@apache.org>
Committed: Mon Feb 22 14:43:33 2016 -0800

----------------------------------------------------------------------
 .../PXF/3.0.0/package/scripts/params.py         |   1 +
 .../PXF/3.0.0/package/scripts/pxf_constants.py  |   3 +
 .../PXF/3.0.0/package/scripts/service_check.py  | 167 ++++++++++---------
 3 files changed, 96 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/8430fbd3/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/params.py b/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/params.py
index b3e85e4..1dbed45 100644
--- a/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/params.py
+++ b/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/params.py
@@ -42,6 +42,7 @@ tomcat_group = "tomcat"
 # Directories
 pxf_conf_dir = "/etc/pxf/conf"
 pxf_instance_dir = "/var/pxf"
+exec_tmp_dir = Script.get_tmp_dir()
 
 # Java home path
 java_home = config["hostLevelParams"]["java_home"] if "java_home" in config["hostLevelParams"] else None

http://git-wip-us.apache.org/repos/asf/ambari/blob/8430fbd3/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/pxf_constants.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/pxf_constants.py b/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/pxf_constants.py
index 9d93a38..1d88893 100644
--- a/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/pxf_constants.py
+++ b/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/pxf_constants.py
@@ -24,4 +24,7 @@ pxf_hdfs_test_dir = "/pxf_hdfs_smoke_test"
 pxf_hdfs_read_test_file = pxf_hdfs_test_dir + "/pxf_smoke_test_read_data"
 pxf_hdfs_write_test_file = pxf_hdfs_test_dir + "/pxf_smoke_test_write_data"
 pxf_hbase_test_table = "pxf_hbase_smoke_test_table"
+hbase_populate_data_script = "hbase-populate-data.sh"
+hbase_cleanup_data_script = "hbase-cleanup-data.sh"
 pxf_hive_test_table = "pxf_hive_smoke_test_table"
+hive_populate_data_script = "hive-populate-data.hql"

http://git-wip-us.apache.org/repos/asf/ambari/blob/8430fbd3/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/service_check.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/service_check.py b/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/service_check.py
index 21b7c5d..6f60661 100644
--- a/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/service_check.py
+++ b/ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/service_check.py
@@ -16,18 +16,20 @@ See the License for the specific language governing permissions and
 limitations under the License.
 """
 import json
+import os
 
 from resource_management.libraries.script import Script
 from resource_management.core.exceptions import Fail
 from resource_management.core.logger import Logger
 from resource_management.core.system import System
-from resource_management.core.resources.system import Execute
+from resource_management.core.resources.system import Execute, File
 from resource_management.core.environment import Environment
 from resource_management.libraries.functions.curl_krb_request import curl_krb_request
+from resource_management.core.source import InlineTemplate
+from resource_management.libraries.functions.default import default
 from pxf_utils import makeHTTPCall, runLocalCmd
 import pxf_constants
 
-
 class PXFServiceCheck(Script):
   """
   Runs a set of simple PXF tests to verify if the service has been setup correctly
@@ -46,13 +48,12 @@ class PXFServiceCheck(Script):
     "X-GP-URL-HOST": pxf_constants.service_check_hostname
   }
 
-
   def service_check(self, env):
     """
     Runs the service check for PXF
     """
     import params
-    Logger.info("Starting PXF service checks..")
+    Logger.info("Starting PXF service checks")
     try:
       # Get delegation token if security is enabled
       if params.security_enabled:
@@ -66,33 +67,13 @@ class PXFServiceCheck(Script):
         self.run_hbase_tests()
       if params.is_hive_installed:
         self.run_hive_tests()
-    except:
-      msg = "PXF service check failed"
-      Logger.error(msg)
-      raise Fail(msg)
-    finally:
-      try:
-        self.cleanup_test_data()
-      except Exception as e:
-        Logger.error(e)
+    except Exception, ex:
+      Logger.error("Exception received during service check execution:\n{0}".format(ex))
+      Logger.error("PXF service check failed.")
+      raise
 
     Logger.info("Service check completed successfully")
 
-
-  def cleanup_test_data(self):
-    """
-    Cleans up the temporary test data generated for service check
-    """
-    Logger.info("Cleaning up PXF smoke check temporary data")
-
-    import params
-    self.__cleanup_hdfs_data()
-    if params.is_hbase_installed:
-      self.__cleanup_hbase_data()
-    if params.is_hive_installed:
-      self.__cleanup_hive_data()
-
-
   def __get_pxf_protocol_version(self):
     """
     Gets the pxf protocol version number
@@ -113,7 +94,6 @@ class PXFServiceCheck(Script):
     Logger.error(msg)
     raise Fail(msg)
 
-
   def __check_pxf_read(self, headers):
     """
     Performs a generic PXF read
@@ -122,12 +102,13 @@ class PXFServiceCheck(Script):
     try:
       response = makeHTTPCall(url, headers)
       if not "PXFFragments" in response:
-        Logger.error("Unable to find PXFFragments in the response")
+        Logger.error("Unable to find PXFFragments in the response. Response received from the server:\n{0}".format(response))
         raise
     except:
       msg = "PXF data read failed"
       Logger.error(msg)
       raise Fail(msg)
+    Logger.info("PXF data read successful")
 
 
   def __get_delegation_token(self, user, keytab, principal, kinit_path):
@@ -153,12 +134,15 @@ class PXFServiceCheck(Script):
     """
     Runs a set of PXF HDFS checks
     """
-    Logger.info("Running PXF HDFS checks")
+    Logger.info("Running PXF HDFS service checks")
     self.__check_if_client_exists("Hadoop-HDFS")
     self.__cleanup_hdfs_data()
-    self.__write_hdfs_data()
-    self.__check_pxf_hdfs_read()
-    self.__check_pxf_hdfs_write()
+    try:
+      self.__write_hdfs_data()
+      self.__check_pxf_hdfs_read()
+      self.__check_pxf_hdfs_write()
+    finally:
+      self.__cleanup_hdfs_data()
 
   def __write_hdfs_data(self):
     """
@@ -235,32 +219,52 @@ class PXFServiceCheck(Script):
     )
     params.HdfsResource(None, action="execute")
 
-
   # HBase Routines
   def run_hbase_tests(self):
     """
     Runs a set of PXF HBase checks
     """
     import params
-    Logger.info("Running PXF HBase checks")
-    if params.security_enabled:
-      Execute("{0} -kt {1} {2}".format(params.kinit_path_local, params.hbase_user_keytab, params.hbase_principal_name),
-              user = params.hbase_user)
-    self.__cleanup_hbase_data()
+    Logger.info("Running PXF HBase service checks")
     self.__check_if_client_exists("HBase")
-    self.__write_hbase_data()
-    self.__check_pxf_hbase_read()
+    self.__create_hbase_scripts()
+    kinit_cmd = "{0} -kt {1} {2};".format(params.kinit_path_local, params.hbase_user_keytab, params.hbase_principal_name) if params.security_enabled else ""
+    try:
+      message = "Creating temporary HBase smoke test table with data"
+      self.__run_hbase_script(pxf_constants.hbase_populate_data_script, kinit_cmd, message)
+      self.__check_pxf_hbase_read()
+    finally:
+      message = "Cleaning up HBase smoke test table"
+      self.__run_hbase_script(pxf_constants.hbase_cleanup_data_script, kinit_cmd, message)
 
-  def __write_hbase_data(self):
+  def __create_hbase_scripts(self):
     """
-    Creates a temporary HBase table for the service checks
+    Create file holding hbase commands
     """
     import params
-    Logger.info("Creating temporary HBase test data")
-    cmd = "echo \"create '{0}', 'cf'\" | hbase shell".format(pxf_constants.pxf_hbase_test_table)
-    Execute(cmd, logoutput = True, user = params.hbase_user)
-    cmd = "echo \"put '{0}', 'row1', 'cf:a', 'value1'; put '{0}', 'row1', 'cf:b', 'value2'\" | hbase shell".format(pxf_constants.pxf_hbase_test_table)
-    Execute(cmd, logoutput = True, user = params.hbase_user)
+    hbase_populate_data_cmds = "disable '{0}'\n" \
+                               "drop '{0}'\n" \
+                               "create '{0}', 'cf'\n" \
+                               "put '{0}', 'row1', 'cf:a', 'value1'\n" \
+                               "put '{0}', 'row1', 'cf:b', 'value2'".format(pxf_constants.pxf_hbase_test_table)
+
+    File("{0}".format(os.path.join(params.exec_tmp_dir, pxf_constants.hbase_populate_data_script)),
+         content=InlineTemplate("{0}".format(hbase_populate_data_cmds)))
+
+    hbase_cleanup_data_cmds = "disable '{0}'\n" \
+                              "drop '{0}'".format(pxf_constants.pxf_hbase_test_table)
+
+    File("{0}".format(os.path.join(params.exec_tmp_dir, pxf_constants.hbase_cleanup_data_script)),
+         content=InlineTemplate("{0}".format(hbase_cleanup_data_cmds)))
+
+  def __run_hbase_script(self, script, kinit_cmd, message):
+    """
+    Executes hbase shell command
+    """
+    import params
+    Logger.info(message)
+    hbase_shell_cmd = "{0} hbase shell {1}".format(kinit_cmd, os.path.join(params.exec_tmp_dir, script))
+    Execute(hbase_shell_cmd, user=params.hbase_user, logoutput=True)
 
   def __check_pxf_hbase_read(self):
     """
@@ -274,37 +278,53 @@ class PXFServiceCheck(Script):
     headers.update(self.commonPXFHeaders)
     self.__check_pxf_read(headers)
 
-  def __cleanup_hbase_data(self):
-    """
-    Cleans up the test HBase data
-    """
-    import params
-    Logger.info("Cleaning up HBase test data")
-    cmd = "echo \"disable '{0}'\" | hbase shell > /dev/null 2>&1".format(pxf_constants.pxf_hbase_test_table)
-    Execute(cmd, logoutput = True, user = params.hbase_user)
-    cmd = "echo \"drop '{0}'\" | hbase shell > /dev/null 2>&1".format(pxf_constants.pxf_hbase_test_table)
-    Execute(cmd, logoutput = True, user = params.hbase_user)
-
-
   # Hive Routines
   def run_hive_tests(self):
     """
     Runs a set of PXF Hive checks
     """
-    Logger.info("Running PXF Hive checks")
+    import params
+    Logger.info("Running PXF Hive service checks")
     self.__check_if_client_exists("Hive")
-    self.__cleanup_hive_data()
-    self.__write_hive_data()
-    self.__check_pxf_hive_read()
 
-  def __write_hive_data(self):
+    # Create file holding hive query commands
+    hive_populate_data_cmds = "DROP TABLE IF EXISTS {0};\n" \
+                         "CREATE TABLE {0} (id INT);\n" \
+                         "INSERT INTO {0} VALUES (1);".format(pxf_constants.pxf_hive_test_table)
+    File("{0}/{1}".format(params.exec_tmp_dir, pxf_constants.hive_populate_data_script),
+         content=InlineTemplate("{0}".format(hive_populate_data_cmds)))
+
+    # Get the parameters required to create jdbc url for beeline
+    hive_server_port = default("/configurations/hive-site/hive.server2.thrift.port", None)
+    hive_server_host = default("/clusterHostInfo/hive_server_host", None)
+    if hive_server_host is None or hive_server_port is None:
+      raise Fail("Input parameters are invalid for beeline connection string, both hive_server_host and " \
+            "hive.server2.thrift.port should be not None. Current values are:\nhive_server_host={0}\n" \
+            "hive.server2.thrift.port={1}".format(hive_server_host, hive_server_port))
+    jdbc_url = "jdbc:hive2://{0}:{1}/default".format(hive_server_host[0], hive_server_port)
+    beeline_conn_cmd = "beeline -u '{0}'".format(jdbc_url)
+
+    if params.security_enabled:
+      hive_server_principal = default('/configurations/hive-site/hive.server2.authentication.kerberos.principal', None)
+      if hive_server_principal is None:
+        raise Fail("Input parameter invalid for beeline connection string, hive.server2.authentication.kerberos.principal " \
+              "should be not None")
+      beeline_conn_cmd = "beeline -u '{0};principal={1}'".format(jdbc_url, hive_server_principal)
+
+    try:
+      self.__write_hive_data(beeline_conn_cmd)
+      self.__check_pxf_hive_read()
+    finally:
+      self.__cleanup_hive_data(beeline_conn_cmd)
+
+  def __write_hive_data(self, beeline_conn_cmd):
     """
     Creates a temporary Hive table for the service checks
     """
     import params
-    Logger.info("Creating temporary Hive test data")
-    cmd = "hive -e 'CREATE TABLE IF NOT EXISTS {0} (id INT); INSERT INTO {0} VALUES (1);'".format(pxf_constants.pxf_hive_test_table)
-    Execute(cmd, logoutput = True, user = params.hdfs_user)
+    Logger.info("Creating temporary Hive smoke test table with data")
+    cmd = "{0} -f {1}".format(beeline_conn_cmd, os.path.join(params.exec_tmp_dir, pxf_constants.hive_populate_data_script))
+    Execute(cmd, logoutput=True, user=params.hdfs_user)
 
   def __check_pxf_hive_read(self):
     """
@@ -318,15 +338,14 @@ class PXFServiceCheck(Script):
     headers.update(self.commonPXFHeaders)
     self.__check_pxf_read(headers)
 
-  def __cleanup_hive_data(self):
+  def __cleanup_hive_data(self, beeline_conn_cmd):
     """
     Cleans up the test Hive data
     """
     import params
     Logger.info("Cleaning up Hive test data")
-    cmd = "hive -e 'DROP TABLE IF EXISTS {0};'".format(pxf_constants.pxf_hive_test_table)
-    Execute(cmd, logoutput = True, user = params.hdfs_user)
-
+    cmd = "{0} -e 'DROP TABLE IF EXISTS {1};'".format(beeline_conn_cmd, pxf_constants.pxf_hive_test_table)
+    Execute(cmd, logoutput=True, user=params.hdfs_user)
 
   # Package Routines
   def __package_exists(self, pkg):
@@ -338,7 +357,6 @@ class PXFServiceCheck(Script):
     else:
       return not runLocalCmd("yum list installed | egrep -i ^" + pkg)
 
-
   def __check_if_client_exists(self, serviceName):
     Logger.info("Checking if " + serviceName + " client libraries exist")
     if not self.__package_exists(serviceName):
@@ -349,4 +367,3 @@ class PXFServiceCheck(Script):
 
 if __name__ == "__main__":
   PXFServiceCheck().execute()
-