You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/12/30 14:09:47 UTC

[GitHub] [airflow] forestlzj opened a new issue #13391: MySqlToHiveOperator , ”invalid path“ while loding the extracted csv to hive

forestlzj opened a new issue #13391:
URL: https://github.com/apache/airflow/issues/13391


   <!--
   
   Welcome to Apache Airflow!  For a smooth issue process, try to answer the following questions.
   Don't worry if they're not all applicable; just try to include what you can :-)
   
   If you need to include code snippets or logs, please put them in fenced code
   blocks.  If they're super-long, please use the details tag like
   <details><summary>super-long log</summary> lots of stuff </details>
   
   Please delete these comment blocks before submitting the issue.
   
   -->
   
   <!--
   
   IMPORTANT!!!
   
   PLEASE CHECK "SIMILAR TO X EXISTING ISSUES" OPTION IF VISIBLE
   NEXT TO "SUBMIT NEW ISSUE" BUTTON!!!
   
   PLEASE CHECK IF THIS ISSUE HAS BEEN REPORTED PREVIOUSLY USING SEARCH!!!
   
   Please complete the next sections or the issue will be closed.
   These questions are the first thing we need to know to understand the context.
   
   -->
   
   **Apache Airflow version**: 2.0
   
   
   **Kubernetes version (if you are using kubernetes)** (use `kubectl version`):
   
   **Environment**:
   
   - **Cloud provider or hardware configuration**:
   - **OS** (e.g. from /etc/os-release):
   - **Kernel** (e.g. `uname -a`):
   - **Install tools**:
   - **Others**:
   
   **What happened**:
   I am trying to get data from mysql to hive , with the code:
   ```
   t2 = MySqlToHiveOperator(
       task_id='mysql_to_hive',
       sql='select caseNo as case_num from t_ca_detected_case',
   	hive_table='dsj.casenum_temp',
   	create=True,
   	recreate=True,
       delimiter=',',
   	mysql_conn_id='mysql_dsj',
   	hive_cli_conn_id='hive_cli_default',
       start_date=days_ago(2),
       owner='airflow',
       dag=dag
   )
   ```
   
   <!-- (please include exact error messages if you can) -->
   [2020-12-30 21:53:21,631] {taskinstance.py:1038} INFO - Executing <Task(MySqlToHiveOperator): mysql_to_hive> on 2020-12-30T13:53:20.354276+00:00
   [2020-12-30 21:53:21,638] {standard_task_runner.py:51} INFO - Started process 54185 to run task
   [2020-12-30 21:53:21,643] {standard_task_runner.py:75} INFO - Running: ['airflow', 'tasks', 'run', 'hello2', 'mysql_to_hive', '2020-12-30T13:53:20.354276+00:00', '--job-id', '44', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/test/hello2.py', '--cfg-path', '/tmp/tmp1qiqqk_2']
   [2020-12-30 21:53:21,647] {standard_task_runner.py:76} INFO - Job 44: Subtask mysql_to_hive
   [2020-12-30 21:53:21,696] {logging_mixin.py:103} INFO - Running <TaskInstance: hello2.mysql_to_hive 2020-12-30T13:53:20.354276+00:00 [running]> on host serv98.
   [2020-12-30 21:53:21,744] {taskinstance.py:1232} INFO - Exporting the following env vars:
   AIRFLOW_CTX_DAG_OWNER=airflow
   AIRFLOW_CTX_DAG_ID=hello2
   AIRFLOW_CTX_TASK_ID=mysql_to_hive
   AIRFLOW_CTX_EXECUTION_DATE=2020-12-30T13:53:20.354276+00:00
   AIRFLOW_CTX_DAG_RUN_ID=manual__2020-12-30T13:53:20.354276+00:00
   [2020-12-30 21:53:21,763] {base.py:74} INFO - Using connection to: id: hive_cli_default. Host: 10.2.20.11, Port: 21066, Schema: default, Login: airflow, Password: XXXXXXXX, extra: XXXXXXXX
   [2020-12-30 21:53:21,765] {mysql_to_hive.py:141} INFO - Dumping MySQL query results to local file
   [2020-12-30 21:53:21,776] {base.py:74} INFO - Using connection to: id: mysql_dsj. Host: 192.168.2.178, Port: 3306, Schema: djzs_db, Login: root, Password: XXXXXXXX, extra: None
   [2020-12-30 21:53:21,798] {mysql_to_hive.py:161} INFO - Loading file into Hive
   [2020-12-30 21:53:21,798] {hive.py:445} INFO - DROP TABLE IF EXISTS dsj.casenum_temp;
   CREATE TABLE IF NOT EXISTS dsj.casenum_temp (
   `case_num` STRING)
   ROW FORMAT DELIMITED
   FIELDS TERMINATED BY ','
   STORED AS textfile
   ;
   [2020-12-30 21:53:21,799] {hive.py:248} INFO - beeline -u "jdbc:hive2://10.2.20.11:21066/default" -n airflow -p Ygnet123# -hiveconf airflow.ctx.dag_id=hello2 -hiveconf airflow.ctx.task_id=mysql_to_hive -hiveconf airflow.ctx.execution_date=2020-12-30T13:53:20.354276+00:00 -hiveconf airflow.ctx.dag_run_id=manual__2020-12-30T13:53:20.354276+00:00 -hiveconf airflow.ctx.dag_owner=airflow -hiveconf airflow.ctx.dag_email= -f /tmp/airflow_hiveop_ka2jjicc/tmpnyz766bk
   [2020-12-30 21:53:24,679] {hive.py:260} INFO - SLF4J: Class path contains multiple SLF4J bindings.
   [2020-12-30 21:53:24,679] {hive.py:260} INFO - SLF4J: Found binding in [jar:file:/app/fi65clients/Hive/Beeline/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   [2020-12-30 21:53:24,679] {hive.py:260} INFO - SLF4J: Found binding in [jar:file:/app/fi65clients/Hive/Beeline/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   [2020-12-30 21:53:24,679] {hive.py:260} INFO - SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
   [2020-12-30 21:53:24,692] {hive.py:260} INFO - SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
   [2020-12-30 21:53:26,776] {hive.py:260} INFO - Connecting to jdbc:hive2://10.2.20.11:21066/default
   [2020-12-30 21:53:27,079] {hive.py:260} INFO - Connected to: Apache Hive (version 3.1.0)
   [2020-12-30 21:53:27,080] {hive.py:260} INFO - Driver: Hive JDBC (version 3.1.0)
   [2020-12-30 21:53:27,080] {hive.py:260} INFO - Transaction isolation: TRANSACTION_REPEATABLE_READ
   [2020-12-30 21:53:27,188] {hive.py:260} INFO - 0: jdbc:hive2://10.2.20.11:21066/default> INFO  : Compiling command(queryId=omm_20201230213941_8e02d3e0-56e1-4ad6-aae4-35c2b7baf598): USE default--0; Current sessionId=701dd932-ae51-4edc-84bf-c1575a5536a6
   [2020-12-30 21:53:27,188] {hive.py:260} INFO - INFO  : Concurrency mode is disabled, not creating a lock manager
   [2020-12-30 21:53:27,188] {hive.py:260} INFO - INFO  : Semantic Analysis Completed (retrial = false)
   [2020-12-30 21:53:27,188] {hive.py:260} INFO - INFO  : Returning Hive schema: Schema(fieldSchemas:null, properties:null)
   [2020-12-30 21:53:27,189] {hive.py:260} INFO - INFO  : Completed compiling command(queryId=omm_20201230213941_8e02d3e0-56e1-4ad6-aae4-35c2b7baf598); Time taken: 0.003 seconds
   [2020-12-30 21:53:27,189] {hive.py:260} INFO - INFO  : Concurrency mode is disabled, not creating a lock manager
   [2020-12-30 21:53:27,189] {hive.py:260} INFO - INFO  : Executing command(queryId=omm_20201230213941_8e02d3e0-56e1-4ad6-aae4-35c2b7baf598): USE default--0; Current sessionId=701dd932-ae51-4edc-84bf-c1575a5536a6
   [2020-12-30 21:53:27,189] {hive.py:260} INFO - INFO  : Starting task [Stage-0:DDL] in serial mode
   [2020-12-30 21:53:27,189] {hive.py:260} INFO - INFO  : Completed executing command(queryId=omm_20201230213941_8e02d3e0-56e1-4ad6-aae4-35c2b7baf598); Time taken: 0.004 seconds
   [2020-12-30 21:53:27,189] {hive.py:260} INFO - INFO  : OK
   [2020-12-30 21:53:27,189] {hive.py:260} INFO - INFO  : Concurrency mode is disabled, not creating a lock manager
   [2020-12-30 21:53:27,190] {hive.py:260} INFO - No rows affected (0.079 seconds)
   [2020-12-30 21:53:27,364] {hive.py:260} INFO - 0: jdbc:hive2://10.2.20.11:21066/default> INFO  : Compiling command(queryId=omm_20201230213941_88bf0601-a224-4a6f-acd7-96bed9dc977d): DROP TABLE IF EXISTS dsj.casenum_temp--0; Current sessionId=701dd932-ae51-4edc-84bf-c1575a5536a6
   [2020-12-30 21:53:27,364] {hive.py:260} INFO - INFO  : Concurrency mode is disabled, not creating a lock manager
   [2020-12-30 21:53:27,364] {hive.py:260} INFO - INFO  : Semantic Analysis Completed (retrial = false)
   [2020-12-30 21:53:27,364] {hive.py:260} INFO - INFO  : Returning Hive schema: Schema(fieldSchemas:null, properties:null)
   [2020-12-30 21:53:27,364] {hive.py:260} INFO - INFO  : Completed compiling command(queryId=omm_20201230213941_88bf0601-a224-4a6f-acd7-96bed9dc977d); Time taken: 0.01 seconds
   [2020-12-30 21:53:27,364] {hive.py:260} INFO - INFO  : Concurrency mode is disabled, not creating a lock manager
   [2020-12-30 21:53:27,365] {hive.py:260} INFO - INFO  : Executing command(queryId=omm_20201230213941_88bf0601-a224-4a6f-acd7-96bed9dc977d): DROP TABLE IF EXISTS dsj.casenum_temp--0; Current sessionId=701dd932-ae51-4edc-84bf-c1575a5536a6
   [2020-12-30 21:53:27,365] {hive.py:260} INFO - INFO  : Starting task [Stage-0:DDL] in serial mode
   [2020-12-30 21:53:27,365] {hive.py:260} INFO - INFO  : Completed executing command(queryId=omm_20201230213941_88bf0601-a224-4a6f-acd7-96bed9dc977d); Time taken: 0.149 seconds
   [2020-12-30 21:53:27,365] {hive.py:260} INFO - INFO  : OK
   [2020-12-30 21:53:27,365] {hive.py:260} INFO - INFO  : Concurrency mode is disabled, not creating a lock manager
   [2020-12-30 21:53:27,365] {hive.py:260} INFO - No rows affected (0.165 seconds)
   [2020-12-30 21:53:27,408] {hive.py:260} INFO - 0: jdbc:hive2://10.2.20.11:21066/default> . . . . . . . . . . . . . . . . . . . . > . . . . . . . . . . . . . . . . . . . . > . . . . . . . . . . . . . . . . . . . . > . . . . . . . . . . . . . . . . . . . . > . . . . . . . . . . . . . . . . . . . . > INFO  : Compiling command(queryId=omm_20201230213941_9258986c-dbe2-4f5f-8a45-41e9db9aea4c): CREATE TABLE IF NOT EXISTS dsj.casenum_temp (
   [2020-12-30 21:53:27,408] {hive.py:260} INFO - `case_num` STRING)
   [2020-12-30 21:53:27,408] {hive.py:260} INFO - ROW FORMAT DELIMITED
   [2020-12-30 21:53:27,408] {hive.py:260} INFO - FIELDS TERMINATED BY ','
   [2020-12-30 21:53:27,408] {hive.py:260} INFO - STORED AS textfile--0; Current sessionId=701dd932-ae51-4edc-84bf-c1575a5536a6
   [2020-12-30 21:53:27,409] {hive.py:260} INFO - INFO  : Concurrency mode is disabled, not creating a lock manager
   [2020-12-30 21:53:27,409] {hive.py:260} INFO - INFO  : Semantic Analysis Completed (retrial = false)
   [2020-12-30 21:53:27,409] {hive.py:260} INFO - INFO  : Returning Hive schema: Schema(fieldSchemas:null, properties:null)
   [2020-12-30 21:53:27,409] {hive.py:260} INFO - INFO  : Completed compiling command(queryId=omm_20201230213941_9258986c-dbe2-4f5f-8a45-41e9db9aea4c); Time taken: 0.004 seconds
   [2020-12-30 21:53:27,409] {hive.py:260} INFO - INFO  : Concurrency mode is disabled, not creating a lock manager
   [2020-12-30 21:53:27,409] {hive.py:260} INFO - INFO  : Executing command(queryId=omm_20201230213941_9258986c-dbe2-4f5f-8a45-41e9db9aea4c): CREATE TABLE IF NOT EXISTS dsj.casenum_temp (
   [2020-12-30 21:53:27,409] {hive.py:260} INFO - `case_num` STRING)
   [2020-12-30 21:53:27,409] {hive.py:260} INFO - ROW FORMAT DELIMITED
   [2020-12-30 21:53:27,409] {hive.py:260} INFO - FIELDS TERMINATED BY ','
   [2020-12-30 21:53:27,409] {hive.py:260} INFO - STORED AS textfile--0; Current sessionId=701dd932-ae51-4edc-84bf-c1575a5536a6
   [2020-12-30 21:53:27,409] {hive.py:260} INFO - INFO  : Starting task [Stage-0:DDL] in serial mode
   [2020-12-30 21:53:27,409] {hive.py:260} INFO - INFO  : Completed executing command(queryId=omm_20201230213941_9258986c-dbe2-4f5f-8a45-41e9db9aea4c); Time taken: 0.028 seconds
   [2020-12-30 21:53:27,410] {hive.py:260} INFO - INFO  : OK
   [2020-12-30 21:53:27,410] {hive.py:260} INFO - INFO  : Concurrency mode is disabled, not creating a lock manager
   [2020-12-30 21:53:27,410] {hive.py:260} INFO - No rows affected (0.041 seconds)
   [2020-12-30 21:53:27,418] {hive.py:260} INFO - 0: jdbc:hive2://10.2.20.11:21066/default> 0: jdbc:hive2://10.2.20.11:21066/default> Closing: 0: jdbc:hive2://10.2.20.11:21066/default
   [2020-12-30 21:53:27,445] {hive.py:459} INFO - LOAD DATA LOCAL INPATH '/tmp/tmpscnxvwz8' OVERWRITE INTO TABLE dsj.casenum_temp ;
   
   [2020-12-30 21:53:27,447] {hive.py:248} INFO - beeline -u "jdbc:hive2://10.2.20.11:21066/default" -n airflow -p Ygnet123# -hiveconf airflow.ctx.dag_id=hello2 -hiveconf airflow.ctx.task_id=mysql_to_hive -hiveconf airflow.ctx.execution_date=2020-12-30T13:53:20.354276+00:00 -hiveconf airflow.ctx.dag_run_id=manual__2020-12-30T13:53:20.354276+00:00 -hiveconf airflow.ctx.dag_owner=airflow -hiveconf airflow.ctx.dag_email= -f /tmp/airflow_hiveop_hpm6bvo4/tmpuszfib5a
   [2020-12-30 21:53:30,344] {hive.py:260} INFO - SLF4J: Class path contains multiple SLF4J bindings.
   [2020-12-30 21:53:30,345] {hive.py:260} INFO - SLF4J: Found binding in [jar:file:/app/fi65clients/Hive/Beeline/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   [2020-12-30 21:53:30,345] {hive.py:260} INFO - SLF4J: Found binding in [jar:file:/app/fi65clients/Hive/Beeline/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   [2020-12-30 21:53:30,345] {hive.py:260} INFO - SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
   [2020-12-30 21:53:30,349] {hive.py:260} INFO - SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
   [2020-12-30 21:53:32,333] {hive.py:260} INFO - Connecting to jdbc:hive2://10.2.20.11:21066/default
   [2020-12-30 21:53:32,673] {hive.py:260} INFO - Connected to: Apache Hive (version 3.1.0)
   [2020-12-30 21:53:32,674] {hive.py:260} INFO - Driver: Hive JDBC (version 3.1.0)
   [2020-12-30 21:53:32,674] {hive.py:260} INFO - Transaction isolation: TRANSACTION_REPEATABLE_READ
   [2020-12-30 21:53:32,785] {hive.py:260} INFO - 0: jdbc:hive2://10.2.20.11:21066/default> INFO  : Compiling command(queryId=omm_20201230213946_f6efcb01-0914-41c2-a8a6-88e8420c4332): USE default--0; Current sessionId=c4df2d8c-553b-41a4-9998-c1b3572fa0c8
   [2020-12-30 21:53:32,785] {hive.py:260} INFO - INFO  : Concurrency mode is disabled, not creating a lock manager
   [2020-12-30 21:53:32,785] {hive.py:260} INFO - INFO  : Semantic Analysis Completed (retrial = false)
   [2020-12-30 21:53:32,785] {hive.py:260} INFO - INFO  : Returning Hive schema: Schema(fieldSchemas:null, properties:null)
   [2020-12-30 21:53:32,785] {hive.py:260} INFO - INFO  : Completed compiling command(queryId=omm_20201230213946_f6efcb01-0914-41c2-a8a6-88e8420c4332); Time taken: 0.003 seconds
   [2020-12-30 21:53:32,785] {hive.py:260} INFO - INFO  : Concurrency mode is disabled, not creating a lock manager
   [2020-12-30 21:53:32,786] {hive.py:260} INFO - INFO  : Executing command(queryId=omm_20201230213946_f6efcb01-0914-41c2-a8a6-88e8420c4332): USE default--0; Current sessionId=c4df2d8c-553b-41a4-9998-c1b3572fa0c8
   [2020-12-30 21:53:32,786] {hive.py:260} INFO - INFO  : Starting task [Stage-0:DDL] in serial mode
   [2020-12-30 21:53:32,786] {hive.py:260} INFO - INFO  : Completed executing command(queryId=omm_20201230213946_f6efcb01-0914-41c2-a8a6-88e8420c4332); Time taken: 0.004 seconds
   [2020-12-30 21:53:32,786] {hive.py:260} INFO - INFO  : OK
   [2020-12-30 21:53:32,786] {hive.py:260} INFO - INFO  : Concurrency mode is disabled, not creating a lock manager
   [2020-12-30 21:53:32,787] {hive.py:260} INFO - No rows affected (0.082 seconds)
   [2020-12-30 21:53:32,859] {hive.py:260} INFO - 0: jdbc:hive2://10.2.20.11:21066/default> Error: Error while compiling statement: FAILED: SemanticException Line 1:23 Invalid path ''/tmp/tmpscnxvwz8'': No files matching path file:/tmp/tmpscnxvwz8 (state=42000,code=40000)
   [2020-12-30 21:53:32,862] {hive.py:260} INFO - Closing: 0: jdbc:hive2://10.2.20.11:21066/default
   [2020-12-30 21:53:32,905] {taskinstance.py:1396} ERROR - SLF4J: Class path contains multiple SLF4J bindings.
   SLF4J: Found binding in [jar:file:/app/fi65clients/Hive/Beeline/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   SLF4J: Found binding in [jar:file:/app/fi65clients/Hive/Beeline/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
   SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
   Connecting to jdbc:hive2://10.2.20.11:21066/default
   Connected to: Apache Hive (version 3.1.0)
   Driver: Hive JDBC (version 3.1.0)
   Transaction isolation: TRANSACTION_REPEATABLE_READ
   0: jdbc:hive2://10.2.20.11:21066/default> INFO  : Compiling command(queryId=omm_20201230213946_f6efcb01-0914-41c2-a8a6-88e8420c4332): USE default--0; Current sessionId=c4df2d8c-553b-41a4-9998-c1b3572fa0c8
   INFO  : Concurrency mode is disabled, not creating a lock manager
   INFO  : Semantic Analysis Completed (retrial = false)
   INFO  : Returning Hive schema: Schema(fieldSchemas:null, properties:null)
   INFO  : Completed compiling command(queryId=omm_20201230213946_f6efcb01-0914-41c2-a8a6-88e8420c4332); Time taken: 0.003 seconds
   INFO  : Concurrency mode is disabled, not creating a lock manager
   INFO  : Executing command(queryId=omm_20201230213946_f6efcb01-0914-41c2-a8a6-88e8420c4332): USE default--0; Current sessionId=c4df2d8c-553b-41a4-9998-c1b3572fa0c8
   INFO  : Starting task [Stage-0:DDL] in serial mode
   INFO  : Completed executing command(queryId=omm_20201230213946_f6efcb01-0914-41c2-a8a6-88e8420c4332); Time taken: 0.004 seconds
   INFO  : OK
   INFO  : Concurrency mode is disabled, not creating a lock manager
   No rows affected (0.082 seconds)
   0: jdbc:hive2://10.2.20.11:21066/default> Error: Error while compiling statement: FAILED: SemanticException Line 1:23 Invalid path ''/tmp/tmpscnxvwz8'': No files matching path file:/tmp/tmpscnxvwz8 (state=42000,code=40000)
   Closing: 0: jdbc:hive2://10.2.20.11:21066/default
   
   **What you expected to happen**:
   Load mysql table to hive
   
   <!-- What do you think went wrong? -->
   file should be extracted but no found in expected path
   
   **How to reproduce it**:
   <!---
   
   As minimally and precisely as possible. Keep in mind we do not have access to your cluster or dags.
   
   If you are using kubernetes, please attempt to recreate the issue using minikube or kind.
   
   ## Install minikube/kind
   
   - Minikube https://minikube.sigs.k8s.io/docs/start/
   - Kind https://kind.sigs.k8s.io/docs/user/quick-start/
   
   If this is a UI bug, please provide a screenshot of the bug or a link to a youtube video of the bug in action
   
   You can include images using the .md style of
   ![alt text](http://url/to/img.png)
   
   To record a screencast, mac users can use QuickTime and then create an unlisted youtube video with the resulting .mov file.
   
   --->
   
   
   **Anything else we need to know**:
   
   <!--
   
   How often does this problem occur? Once? Every time etc?
   
   Any relevant logs to include? Put them here in side a detail tag:
   <details><summary>x.log</summary> lots of stuff </details>
   
   -->
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] easontm edited a comment on issue #13391: MySqlToHiveOperator , ”invalid path“ while loding the extracted csv to hive

Posted by GitBox <gi...@apache.org>.
easontm edited a comment on issue #13391:
URL: https://github.com/apache/airflow/issues/13391#issuecomment-948333552


   Sure, here they are. This particular log run is missing the `.tell()` calls, but I confirmed early on that there actually was data in the MySQL query and that the csv writer was writing. The `sleep(90)` at the end is just to keep the Kubernetes pod alive so I could ssh into it and verify the file name and contents.
   
   <details><summary>operator</summary>
   code
   
   
   ```python
   from collections import OrderedDict
   from tempfile import NamedTemporaryFile
   from time import sleep
   from typing import Dict, Optional
   
   import MySQLdb
   import unicodecsv as csv
   
   from airflow.models import BaseOperator
   from airflow.providers.apache.hive.hooks.hive import HiveCliHook
   from airflow.providers.mysql.hooks.mysql import MySqlHook
   
   
   class CustomMySqlToHiveOperator(BaseOperator):
       
       template_fields = ('sql', 'partition', 'hive_table')
       template_ext = ('.sql',)
       ui_color = '#a0e08c'
   
       def __init__(
           self,
           *,
           sql: str,
           hive_table: str,
           create: bool = True,
           recreate: bool = False,
           partition: Optional[Dict] = None,
           delimiter: str = chr(1),
           quoting: Optional[str] = None,
           quotechar: str = '"',
           escapechar: Optional[str] = None,
           mysql_conn_id: str = 'mysql_default',
           hive_cli_conn_id: str = 'hive_cli_default',
           tblproperties: Optional[Dict] = None,
           **kwargs,
       ) -> None:
           super().__init__(**kwargs)
           self.sql = sql
           self.hive_table = hive_table
           self.partition = partition
           self.create = create
           self.recreate = recreate
           self.delimiter = str(delimiter)
           self.quoting = quoting or csv.QUOTE_MINIMAL
           self.quotechar = quotechar
           self.escapechar = escapechar
           self.mysql_conn_id = mysql_conn_id
           self.hive_cli_conn_id = hive_cli_conn_id
           self.partition = partition or {}
           self.tblproperties = tblproperties
   
       @classmethod
       def type_map(cls, mysql_type: int) -> str:
           """Maps MySQL type to Hive type."""
           types = MySQLdb.constants.FIELD_TYPE
           type_map = {
               types.BIT: 'INT',
               types.DECIMAL: 'DOUBLE',
               types.NEWDECIMAL: 'DOUBLE',
               types.DOUBLE: 'DOUBLE',
               types.FLOAT: 'DOUBLE',
               types.INT24: 'INT',
               types.LONG: 'BIGINT',
               types.LONGLONG: 'DECIMAL(38,0)',
               types.SHORT: 'INT',
               types.TINY: 'SMALLINT',
               types.YEAR: 'INT',
               types.TIMESTAMP: 'TIMESTAMP',
           }
           return type_map.get(mysql_type, 'STRING')
   
       def execute(self, context: Dict[str, str]):
           hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id)
           mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id)
           
           import os
           failed = False
           fname = None
           field_dict = OrderedDict()
           def UmaskNamedTemporaryFile(*args, **kargs):
               # This code shamelessly lifted from S/O for debugging
               fdesc = NamedTemporaryFile(*args, **kargs)
               umask = os.umask(0o666)
               os.umask(umask)
               os.chmod(fdesc.name, 0o666 & ~umask)
               return fdesc
           
           self.log.info("Dumping MySQL query results to local file")
           conn = mysql.get_conn()
           cursor = conn.cursor()
           cursor.execute(self.sql)
           with UmaskNamedTemporaryFile("wb", delete=False) as f:
               csv_writer = csv.writer(
                   f,
                   delimiter=self.delimiter,
                   quoting=self.quoting,
                   quotechar=self.quotechar,
                   escapechar=self.escapechar,
                   encoding="utf-8",
               )
               # field_dict = OrderedDict()
               for field in cursor.description:
                   field_dict[field[0]] = self.type_map(field[1])
               csv_writer.writerows(cursor)
               f.flush()
               print(f.name)
               print(f.tell()) # writer offset
               fname = f.name
               cursor.close()
               conn.close()
               self.log.info("Loading file into Hive")
               sleep(10) # want to make sure everything is finished writing
               try:
                   hive.load_file(
                       f.name,
                       self.hive_table,
                       field_dict=field_dict,
                       create=self.create,
                       partition=self.partition,
                       delimiter=self.delimiter,
                       recreate=self.recreate,
                       tblproperties=self.tblproperties,
                   )
               except Exception:
                   print("hive failed")
                   failed = True
   
           try:
               hive.load_file(
                   f.name,
                   self.hive_table,
                   field_dict=field_dict,
                   create=self.create,
                   partition=self.partition,
                   delimiter=self.delimiter,
                   recreate=self.recreate,
                   tblproperties=self.tblproperties,
               )
           except Exception:
               print("hive failed AGAIN")
               failed = True
   
           sleep(90)
           if failed:
               raise Exception("still broken")
   
   
   ```
   </details>
   <details><summary>logs</summary>
   
   
   ```
   [2021-10-21 04:55:41,821] {taskinstance.py:1115} INFO - Executing <Task(CustomMySqlToHiveOperator): REDACTED> on 2021-10-17T15:00:00+00:00
   [2021-10-21 04:55:41,825] {standard_task_runner.py:52} INFO - Started process 16 to run task
   [2021-10-21 04:55:41,828] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'REDACTED', 'REDACTED', '2021-10-17T15:00:00+00:00', '--job-id', '772796', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/REDACTED.py', '--cfg-path', '/tmp/tmpefopql15', '--error-file', '/tmp/tmpd9rg6cle']
   [2021-10-21 04:55:41,829] {standard_task_runner.py:77} INFO - Job 772796: Subtask REDACTED
   [2021-10-21 04:55:41,969] {logging_mixin.py:109} INFO - Running <TaskInstance: REDACTED.REDACTED 2021-10-17T15:00:00+00:00 [running]> on host REDACTED
   [2021-10-21 04:55:42,136] {taskinstance.py:1254} INFO - Exporting the following env vars:
   REDACTED
   [2021-10-21 04:55:42,163] {base.py:79} INFO - Using connection to: id: hive_cli_default. Host: REDACTED, Port: 10000, Schema: default, Login: REDACTED, Password: REDACTED, extra: {'use_beeline': True, 'hive_cli_params': '--silent=true --hiveconf hive.exec.stagingdir=s3://REDACTED'}
   [2021-10-21 04:55:42,163] {custom_mysql_to_hive.py:155} INFO - Dumping MySQL query results to local file
   [2021-10-21 04:55:42,189] {base.py:79} INFO - Using connection to: id: mysql_rds. Host: REDACTED, Port: None, Schema: REDACTED Login: REDACTED, Password: REDACTED, extra: {}
   [2021-10-21 04:55:42,238] {logging_mixin.py:109} INFO - /tmp/tmphdv9t5yh
   [2021-10-21 04:55:42,239] {custom_mysql_to_hive.py:176} INFO - Loading file into Hive
   [2021-10-21 04:55:42,239] {hive.py:458} INFO - LOAD DATA LOCAL INPATH '/tmp/tmphdv9t5yh' OVERWRITE INTO TABLE REDACTED.REDACTED PARTITION (dt='2021-10-17', hh='15');
   
   [2021-10-21 04:55:42,239] {hive.py:247} INFO - beeline -u "jdbc:hive2://REDACTED:10000/default;auth=noSasl" -n hive --silent=true --hiveconf hive.exec.stagingdir=s3://REDACTED/ -hiveconf airflow.ctx.dag_id=REDACTED -hiveconf airflow.ctx.task_id=REDACTED -hiveconf airflow.ctx.execution_date=2021-10-17T15:00:00+00:00 -hiveconf airflow.ctx.dag_run_id=scheduled__2021-10-17T15:00:00+00:00 -hiveconf airflow.ctx.dag_owner=REDACTED -hiveconf airflow.ctx.dag_email=REDACTED -f /tmp/airflow_hiveop_wly5owxt/tmprxsx_v66
   [2021-10-21 04:55:42,254] {hive.py:259} INFO - /bin/bash: warning: setlocale: LC_ALL: cannot change locale (en_US.UTF-8)
   [2021-10-21 04:55:42,648] {hive.py:259} INFO - -hiveconf (No such file or directory)
   [2021-10-21 04:55:42,649] {hive.py:259} INFO - airflow.ctx.dag_id=REDACTED (No such file or directory)
   [2021-10-21 04:55:42,649] {hive.py:259} INFO - -hiveconf (No such file or directory)
   [2021-10-21 04:55:42,649] {hive.py:259} INFO - airflow.ctx.task_id=REDACTED (No such file or directory)
   [2021-10-21 04:55:42,649] {hive.py:259} INFO - -hiveconf (No such file or directory)
   [2021-10-21 04:55:42,650] {hive.py:259} INFO - airflow.ctx.execution_date=2021-10-17T15:00:00+00:00 (No such file or directory)
   [2021-10-21 04:55:42,650] {hive.py:259} INFO - -hiveconf (No such file or directory)
   [2021-10-21 04:55:42,650] {hive.py:259} INFO - airflow.ctx.dag_run_id=scheduled__2021-10-17T15:00:00+00:00 (No such file or directory)
   [2021-10-21 04:55:42,650] {hive.py:259} INFO - -hiveconf (No such file or directory)
   [2021-10-21 04:55:42,650] {hive.py:259} INFO - airflow.ctx.dag_owner=REDACTED (No such file or directory)
   [2021-10-21 04:55:42,651] {hive.py:259} INFO - -hiveconf (No such file or directory)
   [2021-10-21 04:55:42,651] {hive.py:259} INFO - airflow.ctx.dag_email=REDACTED (No such file or directory)
   [2021-10-21 04:55:42,759] {hive.py:259} INFO - Error: Error while compiling statement: FAILED: SemanticException Line 1:23 Invalid path ''/tmp/tmphdv9t5yh'': No files matching path file:/tmp/tmphdv9t5yh (state=42000,code=40000)
   [2021-10-21 04:55:42,783] {logging_mixin.py:109} INFO - hive failed
   [2021-10-21 04:56:42,841] {taskinstance.py:1463} ERROR - Task failed with exception
   Traceback (most recent call last):
     File "/usr/local/lib/python3.7/dist-packages/airflow/models/taskinstance.py", line 1165, in _run_raw_task
       self._prepare_and_execute_task_with_callbacks(context, task)
     File "/usr/local/lib/python3.7/dist-packages/airflow/models/taskinstance.py", line 1283, in _prepare_and_execute_task_with_callbacks
       result = self._execute_task(context, task_copy)
     File "/usr/local/lib/python3.7/dist-packages/airflow/models/taskinstance.py", line 1313, in _execute_task
       result = task_copy.execute(context=context)
     File "/usr/local/airflow/dags/operators/custom_mysql_to_hive.py", line 193, in execute
       raise Exception("still broken")
   Exception: still broken
   [2021-10-21 04:56:42,851] {taskinstance.py:1513} INFO - Marking task as FAILED. dag_id=REDACTED, task_id=REDACTED, execution_date=20211017T150000, start_date=20211021T045541, end_date=20211021T045642
   [2021-10-21 04:56:42,927] {local_task_job.py:151} INFO - Task exited with return code 1
   ```
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] easontm edited a comment on issue #13391: MySqlToHiveOperator , ”invalid path“ while loding the extracted csv to hive

Posted by GitBox <gi...@apache.org>.
easontm edited a comment on issue #13391:
URL: https://github.com/apache/airflow/issues/13391#issuecomment-948333552


   Sure, here they are. This particular log run is missing the `.tell()` calls, but I confirmed early on that there actually was data in the MySQL query and that the csv writer was writing. The `sleep(90)` at the end is just to keep the Kubernetes pod alive so I could ssh into it and verify the file name and contents.
   
   <details><summary>operator</summary>
   code
   
   
   ```python
   from collections import OrderedDict
   from tempfile import NamedTemporaryFile
   from time import sleep
   from typing import Dict, Optional
   
   import MySQLdb
   import unicodecsv as csv
   
   from airflow.models import BaseOperator
   from airflow.providers.apache.hive.hooks.hive import HiveCliHook
   from airflow.providers.mysql.hooks.mysql import MySqlHook
   
   
   class CustomMySqlToHiveOperator(BaseOperator):
       
       template_fields = ('sql', 'partition', 'hive_table')
       template_ext = ('.sql',)
       ui_color = '#a0e08c'
   
       def __init__(
           self,
           *,
           sql: str,
           hive_table: str,
           create: bool = True,
           recreate: bool = False,
           partition: Optional[Dict] = None,
           delimiter: str = chr(1),
           quoting: Optional[str] = None,
           quotechar: str = '"',
           escapechar: Optional[str] = None,
           mysql_conn_id: str = 'mysql_default',
           hive_cli_conn_id: str = 'hive_cli_default',
           tblproperties: Optional[Dict] = None,
           **kwargs,
       ) -> None:
           super().__init__(**kwargs)
           self.sql = sql
           self.hive_table = hive_table
           self.partition = partition
           self.create = create
           self.recreate = recreate
           self.delimiter = str(delimiter)
           self.quoting = quoting or csv.QUOTE_MINIMAL
           self.quotechar = quotechar
           self.escapechar = escapechar
           self.mysql_conn_id = mysql_conn_id
           self.hive_cli_conn_id = hive_cli_conn_id
           self.partition = partition or {}
           self.tblproperties = tblproperties
   
       @classmethod
       def type_map(cls, mysql_type: int) -> str:
           """Maps MySQL type to Hive type."""
           types = MySQLdb.constants.FIELD_TYPE
           type_map = {
               types.BIT: 'INT',
               types.DECIMAL: 'DOUBLE',
               types.NEWDECIMAL: 'DOUBLE',
               types.DOUBLE: 'DOUBLE',
               types.FLOAT: 'DOUBLE',
               types.INT24: 'INT',
               types.LONG: 'BIGINT',
               types.LONGLONG: 'DECIMAL(38,0)',
               types.SHORT: 'INT',
               types.TINY: 'SMALLINT',
               types.YEAR: 'INT',
               types.TIMESTAMP: 'TIMESTAMP',
           }
           return type_map.get(mysql_type, 'STRING')
   
       def execute(self, context: Dict[str, str]):
           hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id)
           mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id)
           
           import os
           failed = False
           fname = None
           field_dict = OrderedDict()
           def UmaskNamedTemporaryFile(*args, **kargs):
               # This code shamelessly lifted from S/O for debugging
               fdesc = NamedTemporaryFile(*args, **kargs)
               umask = os.umask(0o666)
               os.umask(umask)
               os.chmod(fdesc.name, 0o666 & ~umask)
               return fdesc
           
           self.log.info("Dumping MySQL query results to local file")
           conn = mysql.get_conn()
           cursor = conn.cursor()
           cursor.execute(self.sql)
           with UmaskNamedTemporaryFile("wb", delete=False) as f:
               csv_writer = csv.writer(
                   f,
                   delimiter=self.delimiter,
                   quoting=self.quoting,
                   quotechar=self.quotechar,
                   escapechar=self.escapechar,
                   encoding="utf-8",
               )
               # field_dict = OrderedDict()
               for field in cursor.description:
                   field_dict[field[0]] = self.type_map(field[1])
               csv_writer.writerows(cursor)
               f.flush()
               print(f.name)
               print(f.tell()) # writer offset
               fname = f.name
               cursor.close()
               conn.close()
               self.log.info("Loading file into Hive")
               sleep(10) # want to make sure everything is finished writing
               try:
                   hive.load_file(
                       f.name,
                       self.hive_table,
                       field_dict=field_dict,
                       create=self.create,
                       partition=self.partition,
                       delimiter=self.delimiter,
                       recreate=self.recreate,
                       tblproperties=self.tblproperties,
                   )
               except Exception:
                   print("hive failed")
                   failed = True
   
           try:
               hive.load_file(
                   fname,
                   self.hive_table,
                   field_dict=field_dict,
                   create=self.create,
                   partition=self.partition,
                   delimiter=self.delimiter,
                   recreate=self.recreate,
                   tblproperties=self.tblproperties,
               )
           except Exception:
               print("hive failed AGAIN")
               failed = True
   
           sleep(90)
           if failed:
               raise Exception("still broken")
   
   
   ```
   </details>
   <details><summary>logs</summary>
   
   
   ```
   [2021-10-21 04:55:41,821] {taskinstance.py:1115} INFO - Executing <Task(CustomMySqlToHiveOperator): REDACTED> on 2021-10-17T15:00:00+00:00
   [2021-10-21 04:55:41,825] {standard_task_runner.py:52} INFO - Started process 16 to run task
   [2021-10-21 04:55:41,828] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'REDACTED', 'REDACTED', '2021-10-17T15:00:00+00:00', '--job-id', '772796', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/REDACTED.py', '--cfg-path', '/tmp/tmpefopql15', '--error-file', '/tmp/tmpd9rg6cle']
   [2021-10-21 04:55:41,829] {standard_task_runner.py:77} INFO - Job 772796: Subtask REDACTED
   [2021-10-21 04:55:41,969] {logging_mixin.py:109} INFO - Running <TaskInstance: REDACTED.REDACTED 2021-10-17T15:00:00+00:00 [running]> on host REDACTED
   [2021-10-21 04:55:42,136] {taskinstance.py:1254} INFO - Exporting the following env vars:
   REDACTED
   [2021-10-21 04:55:42,163] {base.py:79} INFO - Using connection to: id: hive_cli_default. Host: REDACTED, Port: 10000, Schema: default, Login: REDACTED, Password: REDACTED, extra: {'use_beeline': True, 'hive_cli_params': '--silent=true --hiveconf hive.exec.stagingdir=s3://REDACTED'}
   [2021-10-21 04:55:42,163] {custom_mysql_to_hive.py:155} INFO - Dumping MySQL query results to local file
   [2021-10-21 04:55:42,189] {base.py:79} INFO - Using connection to: id: mysql_rds. Host: REDACTED, Port: None, Schema: REDACTED Login: REDACTED, Password: REDACTED, extra: {}
   [2021-10-21 04:55:42,238] {logging_mixin.py:109} INFO - /tmp/tmphdv9t5yh
   [2021-10-21 04:55:42,239] {custom_mysql_to_hive.py:176} INFO - Loading file into Hive
   [2021-10-21 04:55:42,239] {hive.py:458} INFO - LOAD DATA LOCAL INPATH '/tmp/tmphdv9t5yh' OVERWRITE INTO TABLE REDACTED.REDACTED PARTITION (dt='2021-10-17', hh='15');
   
   [2021-10-21 04:55:42,239] {hive.py:247} INFO - beeline -u "jdbc:hive2://REDACTED:10000/default;auth=noSasl" -n hive --silent=true --hiveconf hive.exec.stagingdir=s3://REDACTED/ -hiveconf airflow.ctx.dag_id=REDACTED -hiveconf airflow.ctx.task_id=REDACTED -hiveconf airflow.ctx.execution_date=2021-10-17T15:00:00+00:00 -hiveconf airflow.ctx.dag_run_id=scheduled__2021-10-17T15:00:00+00:00 -hiveconf airflow.ctx.dag_owner=REDACTED -hiveconf airflow.ctx.dag_email=REDACTED -f /tmp/airflow_hiveop_wly5owxt/tmprxsx_v66
   [2021-10-21 04:55:42,254] {hive.py:259} INFO - /bin/bash: warning: setlocale: LC_ALL: cannot change locale (en_US.UTF-8)
   [2021-10-21 04:55:42,648] {hive.py:259} INFO - -hiveconf (No such file or directory)
   [2021-10-21 04:55:42,649] {hive.py:259} INFO - airflow.ctx.dag_id=REDACTED (No such file or directory)
   [2021-10-21 04:55:42,649] {hive.py:259} INFO - -hiveconf (No such file or directory)
   [2021-10-21 04:55:42,649] {hive.py:259} INFO - airflow.ctx.task_id=REDACTED (No such file or directory)
   [2021-10-21 04:55:42,649] {hive.py:259} INFO - -hiveconf (No such file or directory)
   [2021-10-21 04:55:42,650] {hive.py:259} INFO - airflow.ctx.execution_date=2021-10-17T15:00:00+00:00 (No such file or directory)
   [2021-10-21 04:55:42,650] {hive.py:259} INFO - -hiveconf (No such file or directory)
   [2021-10-21 04:55:42,650] {hive.py:259} INFO - airflow.ctx.dag_run_id=scheduled__2021-10-17T15:00:00+00:00 (No such file or directory)
   [2021-10-21 04:55:42,650] {hive.py:259} INFO - -hiveconf (No such file or directory)
   [2021-10-21 04:55:42,650] {hive.py:259} INFO - airflow.ctx.dag_owner=REDACTED (No such file or directory)
   [2021-10-21 04:55:42,651] {hive.py:259} INFO - -hiveconf (No such file or directory)
   [2021-10-21 04:55:42,651] {hive.py:259} INFO - airflow.ctx.dag_email=REDACTED (No such file or directory)
   [2021-10-21 04:55:42,759] {hive.py:259} INFO - Error: Error while compiling statement: FAILED: SemanticException Line 1:23 Invalid path ''/tmp/tmphdv9t5yh'': No files matching path file:/tmp/tmphdv9t5yh (state=42000,code=40000)
   [2021-10-21 04:55:42,783] {logging_mixin.py:109} INFO - hive failed
   [2021-10-21 04:56:42,841] {taskinstance.py:1463} ERROR - Task failed with exception
   Traceback (most recent call last):
     File "/usr/local/lib/python3.7/dist-packages/airflow/models/taskinstance.py", line 1165, in _run_raw_task
       self._prepare_and_execute_task_with_callbacks(context, task)
     File "/usr/local/lib/python3.7/dist-packages/airflow/models/taskinstance.py", line 1283, in _prepare_and_execute_task_with_callbacks
       result = self._execute_task(context, task_copy)
     File "/usr/local/lib/python3.7/dist-packages/airflow/models/taskinstance.py", line 1313, in _execute_task
       result = task_copy.execute(context=context)
     File "/usr/local/airflow/dags/operators/custom_mysql_to_hive.py", line 193, in execute
       raise Exception("still broken")
   Exception: still broken
   [2021-10-21 04:56:42,851] {taskinstance.py:1513} INFO - Marking task as FAILED. dag_id=REDACTED, task_id=REDACTED, execution_date=20211017T150000, start_date=20211021T045541, end_date=20211021T045642
   [2021-10-21 04:56:42,927] {local_task_job.py:151} INFO - Task exited with return code 1
   ```
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] pateash edited a comment on issue #13391: MySqlToHiveOperator , ”invalid path“ while loding the extracted csv to hive

Posted by GitBox <gi...@apache.org>.
pateash edited a comment on issue #13391:
URL: https://github.com/apache/airflow/issues/13391#issuecomment-948274910


   @easontm could you please share your custom debug operator,
   And logs you got.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] pateash commented on issue #13391: MySqlToHiveOperator , ”invalid path“ while loding the extracted csv to hive

Posted by GitBox <gi...@apache.org>.
pateash commented on issue #13391:
URL: https://github.com/apache/airflow/issues/13391#issuecomment-828980255


   @vikramkoka  could you please assign this to me.
   i would like to work on it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] easontm commented on issue #13391: MySqlToHiveOperator , ”invalid path“ while loding the extracted csv to hive

Posted by GitBox <gi...@apache.org>.
easontm commented on issue #13391:
URL: https://github.com/apache/airflow/issues/13391#issuecomment-948273319


   I'm getting this error on 2.1.4 and Kubernetes executor. I wrote a custom version of `MySqlToHiveOperator` with some modifications to debug. Things I have checked
   1. The file exists
   2. still broken even if you give everyone `r` on the file
   3. If you set `delete=False` so that `NamedTemporaryFile` leaves it intact, and then try to reference the file after the `NamedTemporaryFile` context, still broken
   4. if you run `.tell()` and `sleep` on the tempfile before the Hive load you can see that the writing is complete before attempting to load


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] pateash edited a comment on issue #13391: MySqlToHiveOperator , ”invalid path“ while loding the extracted csv to hive

Posted by GitBox <gi...@apache.org>.
pateash edited a comment on issue #13391:
URL: https://github.com/apache/airflow/issues/13391#issuecomment-948274910


   @easontm could you please share your custom debug operator.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] fjmacagno commented on issue #13391: MySqlToHiveOperator , ”invalid path“ while loding the extracted csv to hive

Posted by GitBox <gi...@apache.org>.
fjmacagno commented on issue #13391:
URL: https://github.com/apache/airflow/issues/13391#issuecomment-949999148


   I am seeing the same issue using `HiveCliHook.load_df()`:
   ```
       for design in designs:
           for platformSegment in design["platformSegments"]:
               design_rows.append(
                   [design["designId"], design["fullName"], design["taxonomyId"], design["taxonomySegmentId"],
                    platformSegment["platformId"], platformSegment["platformSegmentId"]])
   
       hive = HiveCliHook(hive_cli_conn_id=constants.HIVE_CONN)
       hive.load_df(pandas.DataFrame(design_rows), table=segment_definition_table, delimiter="\t", create=False)
   ```
   Error: `Error: Error while compiling statement: FAILED: SemanticException Line 1:23 Invalid path ''/tmp/airflow_hiveop_twe9yc4g/tmpervj2lgz'': No files matching path file:/tmp/airflow_hiveop_twe9yc4g/tmpervj2lgz (state=42000,code=40000)`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] pateash edited a comment on issue #13391: MySqlToHiveOperator , ”invalid path“ while loding the extracted csv to hive

Posted by GitBox <gi...@apache.org>.
pateash edited a comment on issue #13391:
URL: https://github.com/apache/airflow/issues/13391#issuecomment-828980255


   @vikramkoka  @eladkal  could you please assign this to me.
   I would like to work on it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] easontm edited a comment on issue #13391: MySqlToHiveOperator , ”invalid path“ while loding the extracted csv to hive

Posted by GitBox <gi...@apache.org>.
easontm edited a comment on issue #13391:
URL: https://github.com/apache/airflow/issues/13391#issuecomment-948273319


   I'm getting this error on 2.1.4 and Kubernetes executor. I wrote a custom version of `MySqlToHiveOperator` with some modifications to debug. Things I have checked
   1. The file exists
   2. still broken even if you give everyone `r` on the file (unmasked in operator code)
   3. If you set `delete=False` so that `NamedTemporaryFile` leaves it intact, and then try to reference the file after the `NamedTemporaryFile` context, still broken
   4. if you run `.tell()` and `sleep` on the tempfile before the Hive load you can see that the writing is complete before attempting to load


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on issue #13391: MySqlToHiveOperator , ”invalid path“ while loding the extracted csv to hive

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on issue #13391:
URL: https://github.com/apache/airflow/issues/13391#issuecomment-901510398


   This issue has been closed because it has not received response from the issue author.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] pateash commented on issue #13391: MySqlToHiveOperator , ”invalid path“ while loding the extracted csv to hive

Posted by GitBox <gi...@apache.org>.
pateash commented on issue #13391:
URL: https://github.com/apache/airflow/issues/13391#issuecomment-948274910


   @easontm could you please provide your custom debug operator.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] pateash commented on issue #13391: MySqlToHiveOperator , ”invalid path“ while loding the extracted csv to hive

Posted by GitBox <gi...@apache.org>.
pateash commented on issue #13391:
URL: https://github.com/apache/airflow/issues/13391#issuecomment-869369961


   @forestlzj,
   I was not able to reproduce this, 
   could you please provide more information on reproducing it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on issue #13391: MySqlToHiveOperator , ”invalid path“ while loding the extracted csv to hive

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on issue #13391:
URL: https://github.com/apache/airflow/issues/13391#issuecomment-897241641


   This issue has been automatically marked as stale because it has been open for 30 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] closed issue #13391: MySqlToHiveOperator , ”invalid path“ while loding the extracted csv to hive

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed issue #13391:
URL: https://github.com/apache/airflow/issues/13391


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] easontm commented on issue #13391: MySqlToHiveOperator , ”invalid path“ while loding the extracted csv to hive

Posted by GitBox <gi...@apache.org>.
easontm commented on issue #13391:
URL: https://github.com/apache/airflow/issues/13391#issuecomment-948333552


   Sure, here they are. This particular log run is missing the `.tell()` calls, but I confirmed early on that there actually was data in the MySQL query and that the writer was writing. The `sleep(90)` at the end is just to keep the Kubernetes pod alive so I could ssh into it and verify the file name and contents.
   
   <details><summary>operator</summary>
   code
   
   
   ```python
   from collections import OrderedDict
   from tempfile import NamedTemporaryFile
   from time import sleep
   from typing import Dict, Optional
   
   import MySQLdb
   import unicodecsv as csv
   
   from airflow.models import BaseOperator
   from airflow.providers.apache.hive.hooks.hive import HiveCliHook
   from airflow.providers.mysql.hooks.mysql import MySqlHook
   
   
   class CustomMySqlToHiveOperator(BaseOperator):
       
       template_fields = ('sql', 'partition', 'hive_table')
       template_ext = ('.sql',)
       ui_color = '#a0e08c'
   
       def __init__(
           self,
           *,
           sql: str,
           hive_table: str,
           create: bool = True,
           recreate: bool = False,
           partition: Optional[Dict] = None,
           delimiter: str = chr(1),
           quoting: Optional[str] = None,
           quotechar: str = '"',
           escapechar: Optional[str] = None,
           mysql_conn_id: str = 'mysql_default',
           hive_cli_conn_id: str = 'hive_cli_default',
           tblproperties: Optional[Dict] = None,
           **kwargs,
       ) -> None:
           super().__init__(**kwargs)
           self.sql = sql
           self.hive_table = hive_table
           self.partition = partition
           self.create = create
           self.recreate = recreate
           self.delimiter = str(delimiter)
           self.quoting = quoting or csv.QUOTE_MINIMAL
           self.quotechar = quotechar
           self.escapechar = escapechar
           self.mysql_conn_id = mysql_conn_id
           self.hive_cli_conn_id = hive_cli_conn_id
           self.partition = partition or {}
           self.tblproperties = tblproperties
   
       @classmethod
       def type_map(cls, mysql_type: int) -> str:
           """Maps MySQL type to Hive type."""
           types = MySQLdb.constants.FIELD_TYPE
           type_map = {
               types.BIT: 'INT',
               types.DECIMAL: 'DOUBLE',
               types.NEWDECIMAL: 'DOUBLE',
               types.DOUBLE: 'DOUBLE',
               types.FLOAT: 'DOUBLE',
               types.INT24: 'INT',
               types.LONG: 'BIGINT',
               types.LONGLONG: 'DECIMAL(38,0)',
               types.SHORT: 'INT',
               types.TINY: 'SMALLINT',
               types.YEAR: 'INT',
               types.TIMESTAMP: 'TIMESTAMP',
           }
           return type_map.get(mysql_type, 'STRING')
   
       def execute(self, context: Dict[str, str]):
           hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id)
           mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id)
           
           import os
           failed = False
           fname = None
           field_dict = OrderedDict()
           def UmaskNamedTemporaryFile(*args, **kargs):
               # This code shamelessly lifted from S/O for debugging
               fdesc = NamedTemporaryFile(*args, **kargs)
               umask = os.umask(0o666)
               os.umask(umask)
               os.chmod(fdesc.name, 0o666 & ~umask)
               return fdesc
           
           self.log.info("Dumping MySQL query results to local file")
           conn = mysql.get_conn()
           cursor = conn.cursor()
           cursor.execute(self.sql)
           with UmaskNamedTemporaryFile("wb", delete=False) as f:
               csv_writer = csv.writer(
                   f,
                   delimiter=self.delimiter,
                   quoting=self.quoting,
                   quotechar=self.quotechar,
                   escapechar=self.escapechar,
                   encoding="utf-8",
               )
               # field_dict = OrderedDict()
               for field in cursor.description:
                   field_dict[field[0]] = self.type_map(field[1])
               csv_writer.writerows(cursor)
               f.flush()
               print(f.name)
               print(f.tell()) # writer offset
               fname = f.name
               cursor.close()
               conn.close()
               self.log.info("Loading file into Hive")
               sleep(10) # want to make sure everything is finished writing
               try:
                   hive.load_file(
                       f.name,
                       self.hive_table,
                       field_dict=field_dict,
                       create=self.create,
                       partition=self.partition,
                       delimiter=self.delimiter,
                       recreate=self.recreate,
                       tblproperties=self.tblproperties,
                   )
               except Exception:
                   print("hive failed")
                   failed = True
   
           try:
               hive.load_file(
                   f.name,
                   self.hive_table,
                   field_dict=field_dict,
                   create=self.create,
                   partition=self.partition,
                   delimiter=self.delimiter,
                   recreate=self.recreate,
                   tblproperties=self.tblproperties,
               )
           except Exception:
               print("hive failed AGAIN")
               failed = True
   
           sleep(90)
           if failed:
               raise Exception("still broken")
   
   
   ```
   </details>
   <details><summary>logs</summary>
   
   
   ```
   [2021-10-21 04:55:41,821] {taskinstance.py:1115} INFO - Executing <Task(CustomMySqlToHiveOperator): REDACTED> on 2021-10-17T15:00:00+00:00
   [2021-10-21 04:55:41,825] {standard_task_runner.py:52} INFO - Started process 16 to run task
   [2021-10-21 04:55:41,828] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'REDACTED', 'REDACTED', '2021-10-17T15:00:00+00:00', '--job-id', '772796', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/REDACTED.py', '--cfg-path', '/tmp/tmpefopql15', '--error-file', '/tmp/tmpd9rg6cle']
   [2021-10-21 04:55:41,829] {standard_task_runner.py:77} INFO - Job 772796: Subtask REDACTED
   [2021-10-21 04:55:41,969] {logging_mixin.py:109} INFO - Running <TaskInstance: REDACTED.REDACTED 2021-10-17T15:00:00+00:00 [running]> on host REDACTED
   [2021-10-21 04:55:42,136] {taskinstance.py:1254} INFO - Exporting the following env vars:
   REDACTED
   [2021-10-21 04:55:42,163] {base.py:79} INFO - Using connection to: id: hive_cli_default. Host: REDACTED, Port: 10000, Schema: default, Login: REDACTED, Password: REDACTED, extra: {'use_beeline': True, 'hive_cli_params': '--silent=true --hiveconf hive.exec.stagingdir=s3://REDACTED'}
   [2021-10-21 04:55:42,163] {custom_mysql_to_hive.py:155} INFO - Dumping MySQL query results to local file
   [2021-10-21 04:55:42,189] {base.py:79} INFO - Using connection to: id: mysql_rds. Host: REDACTED, Port: None, Schema: REDACTED Login: REDACTED, Password: REDACTED, extra: {}
   [2021-10-21 04:55:42,238] {logging_mixin.py:109} INFO - /tmp/tmphdv9t5yh
   [2021-10-21 04:55:42,239] {custom_mysql_to_hive.py:176} INFO - Loading file into Hive
   [2021-10-21 04:55:42,239] {hive.py:458} INFO - LOAD DATA LOCAL INPATH '/tmp/tmphdv9t5yh' OVERWRITE INTO TABLE REDACTED.REDACTED PARTITION (dt='2021-10-17', hh='15');
   
   [2021-10-21 04:55:42,239] {hive.py:247} INFO - beeline -u "jdbc:hive2://REDACTED:10000/default;auth=noSasl" -n hive --silent=true --hiveconf hive.exec.stagingdir=s3://REDACTED/ -hiveconf airflow.ctx.dag_id=REDACTED -hiveconf airflow.ctx.task_id=REDACTED -hiveconf airflow.ctx.execution_date=2021-10-17T15:00:00+00:00 -hiveconf airflow.ctx.dag_run_id=scheduled__2021-10-17T15:00:00+00:00 -hiveconf airflow.ctx.dag_owner=REDACTED -hiveconf airflow.ctx.dag_email=REDACTED -f /tmp/airflow_hiveop_wly5owxt/tmprxsx_v66
   [2021-10-21 04:55:42,254] {hive.py:259} INFO - /bin/bash: warning: setlocale: LC_ALL: cannot change locale (en_US.UTF-8)
   [2021-10-21 04:55:42,648] {hive.py:259} INFO - -hiveconf (No such file or directory)
   [2021-10-21 04:55:42,649] {hive.py:259} INFO - airflow.ctx.dag_id=REDACTED (No such file or directory)
   [2021-10-21 04:55:42,649] {hive.py:259} INFO - -hiveconf (No such file or directory)
   [2021-10-21 04:55:42,649] {hive.py:259} INFO - airflow.ctx.task_id=REDACTED (No such file or directory)
   [2021-10-21 04:55:42,649] {hive.py:259} INFO - -hiveconf (No such file or directory)
   [2021-10-21 04:55:42,650] {hive.py:259} INFO - airflow.ctx.execution_date=2021-10-17T15:00:00+00:00 (No such file or directory)
   [2021-10-21 04:55:42,650] {hive.py:259} INFO - -hiveconf (No such file or directory)
   [2021-10-21 04:55:42,650] {hive.py:259} INFO - airflow.ctx.dag_run_id=scheduled__2021-10-17T15:00:00+00:00 (No such file or directory)
   [2021-10-21 04:55:42,650] {hive.py:259} INFO - -hiveconf (No such file or directory)
   [2021-10-21 04:55:42,650] {hive.py:259} INFO - airflow.ctx.dag_owner=REDACTED (No such file or directory)
   [2021-10-21 04:55:42,651] {hive.py:259} INFO - -hiveconf (No such file or directory)
   [2021-10-21 04:55:42,651] {hive.py:259} INFO - airflow.ctx.dag_email=REDACTED (No such file or directory)
   [2021-10-21 04:55:42,759] {hive.py:259} INFO - Error: Error while compiling statement: FAILED: SemanticException Line 1:23 Invalid path ''/tmp/tmphdv9t5yh'': No files matching path file:/tmp/tmphdv9t5yh (state=42000,code=40000)
   [2021-10-21 04:55:42,783] {logging_mixin.py:109} INFO - hive failed
   [2021-10-21 04:56:42,841] {taskinstance.py:1463} ERROR - Task failed with exception
   Traceback (most recent call last):
     File "/usr/local/lib/python3.7/dist-packages/airflow/models/taskinstance.py", line 1165, in _run_raw_task
       self._prepare_and_execute_task_with_callbacks(context, task)
     File "/usr/local/lib/python3.7/dist-packages/airflow/models/taskinstance.py", line 1283, in _prepare_and_execute_task_with_callbacks
       result = self._execute_task(context, task_copy)
     File "/usr/local/lib/python3.7/dist-packages/airflow/models/taskinstance.py", line 1313, in _execute_task
       result = task_copy.execute(context=context)
     File "/usr/local/airflow/dags/operators/custom_mysql_to_hive.py", line 193, in execute
       raise Exception("still broken")
   Exception: still broken
   [2021-10-21 04:56:42,851] {taskinstance.py:1513} INFO - Marking task as FAILED. dag_id=REDACTED, task_id=REDACTED, execution_date=20211017T150000, start_date=20211021T045541, end_date=20211021T045642
   [2021-10-21 04:56:42,927] {local_task_job.py:151} INFO - Task exited with return code 1
   ```
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org