You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/12/06 08:21:00 UTC

[jira] [Commented] (AIRFLOW-3322) Qubole Hook: Change hook to fetch command args dynamically from qds_sdk

    [ https://issues.apache.org/jira/browse/AIRFLOW-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16711109#comment-16711109 ] 

ASF GitHub Bot commented on AIRFLOW-3322:
-----------------------------------------

msumit closed pull request #4165: [AIRFLOW-3322] Update qubole_hook to fetch command args dynamically from qds_sdk
URL: https://github.com/apache/incubator-airflow/pull/4165
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/contrib/hooks/qubole_hook.py b/airflow/contrib/hooks/qubole_hook.py
index 3df77d3a1f..1c98f26afc 100755
--- a/airflow/contrib/hooks/qubole_hook.py
+++ b/airflow/contrib/hooks/qubole_hook.py
@@ -34,7 +34,6 @@
     PigCommand, ShellCommand, SparkCommand, DbTapQueryCommand, DbExportCommand, \
     DbImportCommand
 
-
 COMMAND_CLASSES = {
     "hivecmd": HiveCommand,
     "prestocmd": PrestoCommand,
@@ -47,35 +46,52 @@
     "dbimportcmd": DbImportCommand
 }
 
-HYPHEN_ARGS = ['cluster_label', 'app_id', 'note_id']
-
-POSITIONAL_ARGS = ['sub_command', 'parameters']
-
-COMMAND_ARGS = {
-    "hivecmd": ['query', 'script_location', 'macros', 'tags', 'sample_size',
-                'cluster_label', 'name'],
-    'prestocmd': ['query', 'script_location', 'macros', 'tags', 'cluster_label', 'name'],
-    'hadoopcmd': ['sub_command', 'tags', 'cluster_label', 'name'],
-    'shellcmd': ['script', 'script_location', 'files', 'archives', 'parameters', 'tags',
-                 'cluster_label', 'name'],
-    'pigcmd': ['script', 'script_location', 'parameters', 'tags', 'cluster_label',
-               'name'],
-    'dbtapquerycmd': ['db_tap_id', 'query', 'macros', 'tags', 'name'],
-    'sparkcmd': ['program', 'cmdline', 'sql', 'script_location', 'macros', 'tags',
-                 'cluster_label', 'language', 'app_id', 'name', 'arguments', 'note_id',
-                 'user_program_arguments'],
-    'dbexportcmd': ['mode', 'hive_table', 'partition_spec', 'dbtap_id', 'db_table',
-                    'db_update_mode', 'db_update_keys', 'export_dir',
-                    'fields_terminated_by', 'tags', 'name', 'customer_cluster_label',
-                    'use_customer_cluster', 'additional_options'],
-    'dbimportcmd': ['mode', 'hive_table', 'dbtap_id', 'db_table', 'where_clause',
-                    'parallelism', 'extract_query', 'boundary_query', 'split_column',
-                    'tags', 'name', 'hive_serde', 'customer_cluster_label',
-                    'use_customer_cluster', 'schema', 'additional_options']
+POSITIONAL_ARGS = {
+    'hadoopcmd': ['sub_command'],
+    'shellcmd': ['parameters'],
+    'pigcmd': ['parameters']
 }
 
 
-class QuboleHook(BaseHook, LoggingMixin):
+def flatten_list(list_of_lists):
+    return [element for array in list_of_lists for element in array]
+
+
+def filter_options(options):
+    options_to_remove = ["help", "print-logs-live", "print-logs"]
+    return [option for option in options if option not in options_to_remove]
+
+
+def get_options_list(command_class):
+    options_list = [option.get_opt_string().strip("--") for option in command_class.optparser.option_list]
+    return filter_options(options_list)
+
+
+def build_command_args():
+    command_args, hyphen_args = {}, set()
+    for cmd in COMMAND_CLASSES:
+
+        # get all available options from the class
+        opts_list = get_options_list(COMMAND_CLASSES[cmd])
+
+        # append positional args if any for the command
+        if cmd in POSITIONAL_ARGS:
+            opts_list += POSITIONAL_ARGS[cmd]
+
+        # get args with a hyphen and replace them with underscore
+        for index, opt in enumerate(opts_list):
+            if "-" in opt:
+                opts_list[index] = opt.replace("-", "_")
+                hyphen_args.add(opts_list[index])
+
+        command_args[cmd] = opts_list
+    return command_args, list(hyphen_args)
+
+
+COMMAND_ARGS, HYPHEN_ARGS = build_command_args()
+
+
+class QuboleHook(BaseHook):
     def __init__(self, *args, **kwargs):
         conn = self.get_connection(kwargs['qubole_conn_id'])
         Qubole.configure(api_token=conn.password, api_url=conn.host)
@@ -189,12 +205,13 @@ def create_cmd_args(self, context):
         cmd_type = self.kwargs['command_type']
         inplace_args = None
         tags = set([self.dag_id, self.task_id, context['run_id']])
+        positional_args_list = flatten_list(POSITIONAL_ARGS.values())
 
         for k, v in self.kwargs.items():
             if k in COMMAND_ARGS[cmd_type]:
                 if k in HYPHEN_ARGS:
                     args.append("--{0}={1}".format(k.replace('_', '-'), v))
-                elif k in POSITIONAL_ARGS:
+                elif k in positional_args_list:
                     inplace_args = v
                 elif k == 'tags':
                     if isinstance(v, six.string_types):
diff --git a/airflow/contrib/operators/qubole_operator.py b/airflow/contrib/operators/qubole_operator.py
index 82ee293b93..7818d961c4 100755
--- a/airflow/contrib/operators/qubole_operator.py
+++ b/airflow/contrib/operators/qubole_operator.py
@@ -43,6 +43,8 @@ class QuboleOperator(BaseOperator):
             :script_location: s3 location containing query statement
             :sample_size: size of sample in bytes on which to run query
             :macros: macro values which were used in query
+            :sample_size: size of sample in bytes on which to run query
+            :hive-version: Specifies the hive version to be used. eg: 0.13,1.2,etc.
         prestocmd:
             :query: inline query statement
             :script_location: s3 location containing query statement
@@ -77,12 +79,14 @@ class QuboleOperator(BaseOperator):
             :arguments: spark-submit command line arguments
             :user_program_arguments: arguments that the user program takes in
             :macros: macro values which were used in query
+            :note_id: Id of the Notebook to run
         dbtapquerycmd:
             :db_tap_id: data store ID of the target database, in Qubole.
             :query: inline query statement
             :macros: macro values which were used in query
         dbexportcmd:
-            :mode: 1 (simple), 2 (advance)
+            :mode: Can be 1 for Hive export or 2 for HDFS/S3 export
+            :schema: Db schema name assumed accordingly by database if not specified
             :hive_table: Name of the hive table
             :partition_spec: partition specification for Hive table.
             :dbtap_id: data store ID of the target database, in Qubole.
@@ -91,9 +95,15 @@ class QuboleOperator(BaseOperator):
             :db_update_keys: columns used to determine the uniqueness of rows
             :export_dir: HDFS/S3 location from which data will be exported.
             :fields_terminated_by: hex of the char used as column separator in the dataset
+            :use_customer_cluster: To use cluster to run command
+            :customer_cluster_label: the label of the cluster to run the command on
+            :additional_options: Additional Sqoop options which are needed enclose options in
+                double or single quotes e.g. '--map-column-hive id=int,data=string'
         dbimportcmd:
             :mode: 1 (simple), 2 (advance)
             :hive_table: Name of the hive table
+            :schema: Db schema name assumed accordingly by database if not specified
+            :hive_serde: Output format of the Hive Table
             :dbtap_id: data store ID of the target database, in Qubole.
             :db_table: name of the db table
             :where_clause: where clause, if any
@@ -102,6 +112,10 @@ class QuboleOperator(BaseOperator):
                 of the where clause.
             :boundary_query: Query to be used get range of row IDs to be extracted
             :split_column: Column used as row ID to split data into ranges (mode 2)
+            :use_customer_cluster: To use cluster to run command
+            :customer_cluster_label: the label of the cluster to run the command on
+            :additional_options: Additional Sqoop options which are needed enclose options in
+                double or single quotes
 
     .. note:: Following fields are template-supported : ``query``, ``script_location``,
         ``sub_command``, ``script``, ``files``, ``archives``, ``program``, ``cmdline``,
diff --git a/setup.py b/setup.py
index d093d4971f..65208ecf48 100644
--- a/setup.py
+++ b/setup.py
@@ -216,7 +216,7 @@ def write_version(filename=os.path.join(*['airflow',
 ]
 pinot = ['pinotdb>=0.1.1']
 postgres = ['psycopg2-binary>=2.7.4']
-qds = ['qds-sdk>=1.9.6']
+qds = ['qds-sdk>=1.10.4']
 rabbitmq = ['librabbitmq>=1.6.1']
 redis = ['redis>=2.10.5']
 s3 = ['boto3>=1.7.0, <1.8.0']


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Qubole Hook: Change hook to fetch command args dynamically from qds_sdk 
> ------------------------------------------------------------------------
>
>                 Key: AIRFLOW-3322
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3322
>             Project: Apache Airflow
>          Issue Type: Improvement
>            Reporter: Joy Lal Chattaraj
>            Assignee: Joy Lal Chattaraj
>            Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)