You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by 李立伟 <hi...@gmail.com> on 2018/10/10 01:32:47 UTC

airflow BashOperator use beeline to insert data to hive with chinese,data be garbled

with BashOperator ,i use beeline to insert data into hive ,hql with chinese
characters ,after dag run success,hive data contain unreadable code.

python :

# -*- coding: utf-8 -*-
import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import BranchPythonOperator
from datetime import datetime
import time
from datetime import timedelta
import sys

import pendulum
local_tz = pendulum.timezone("Asia/Shanghai")

reload(sys)
sys.setdefaultencoding('utf-8')

default_args = {
              'owner': 'airflow',
              'depends_on_past':False,
              'start_date':datetime(2018,10,9,19,22,20,tzinfo=local_tz),
              'retries':0
}

dag = DAG(
            'inserthiveutf8',
             default_args=default_args,
             description='null',
              catchup=False,
             schedule_interval=None
)


adf37 = r"""
beeline -u "jdbc:hive2://10.138.***.***:30010/di_zz" -n "*****" -p
"*****"  -e 'insert into   di_zz.tt_wms_inout_detail_new(fac_id)
values ("中")'

  """
abcd8491539084126613 =BashOperator(
               task_id='abcd8491539084126613',
               bash_command=adf37,
               dag=dag)

i have tried this:

abcd8491539084126613 =BashOperator(
               task_id='abcd8491539084126613',
               bash_command="sh  ~/insert.sh  ",
               dag=dag)

this:

export LANG=en_US.UTF-8
 beeline -u "jdbc:hive2://10.138.***.***:30010/di_zz" -n "*****" -p
"*****"  -e 'insert into   di_zz.tt_wms_inout_detail_new(fac_id)
values ("中")'

this:

beeline -u "jdbc:hive2://10.138.***.***:30010/di_zz" -n "*****" -p
"*****"  -f ~/hql.sql

log:

[2018-10-09 21:00:58,485] {bash_operator.py:110} INFO - INFO  :
Compiling command(queryId=hive_20181009210000_89390a92-c4de-413f-9958-4d7da1065ef9):
insert into   di_zz.tt_wms_inout_detail_new(fac_id) values ("???")
[2018-10-09 21:00:58,485] {bash_operator.py:110} INFO - INFO  :
Semantic Analysis Completed
[2018-10-09 21:00:58,486] {bash_operator.py:110} INFO - INFO  :
Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:_col0,
type:timestamp, comment:null), FieldSchema(name:_col1, type:string,
comment:null), FieldSchema(name:_col2, type:void, comment:null),
FieldSchema(name:_col3, type:void, comment:null),
FieldSchema(name:_col4, type:void, comment:null),
FieldSchema(name:_col5, type:void, comment:null),
FieldSchema(name:_col6, type:void, comment:null),
FieldSchema(name:_col7, type:bigint, comment:null),
FieldSchema(name:_col8, type:bigint, comment:null),
FieldSchema(name:_col9, type:bigint, comment:null),
FieldSchema(name:_col10, type:void, comment:null),
FieldSchema(name:_col11, type:timestamp, comment:null)],
properties:null)
[2018-10-09 21:00:58,486] {bash_operator.py:110} INFO - INFO  :
Completed compiling
command(queryId=hive_20181009210000_89390a92-c4de-413f-9958-4d7da1065ef9);
Time taken: 0.291 seconds
[2018-10-09 21:00:58,486] {bash_operator.py:110} INFO - INFO  :
Executing command(queryId=hive_20181009210000_89390a92-c4de-413f-9958-4d7da1065ef9):
insert into   di_zz.tt_wms_inout_detail_new(fac_id) values ("???")
[2018-10-09 21:00:58,486] {bash_operator.py:110} INFO - INFO  : Query
ID = hive_20181009210000_89390a92-c4de-413f-9958-4d7da1065ef9

data:

+------------------------------------+---------------------------------+-----------------------------------+----------------------------------+------------------------------------+------------------------------------+---------------------------------------+-----------------------------------+----------------------------------+-----------------------------------+-------------------------------------------+--------------------------------------+--+
| tt_wms_inout_detail_new.stat_date  | tt_wms_inout_detail_new.fac_id
| tt_wms_inout_detail_new.fac_name  | tt_wms_inout_detail_new.ware_id
| tt_wms_inout_detail_new.ware_name  |
tt_wms_inout_detail_new.ware_type  |
tt_wms_inout_detail_new.product_code  |
tt_wms_inout_detail_new.ware_cnt  | tt_wms_inout_detail_new.ware_in  |
tt_wms_inout_detail_new.ware_out  |
tt_wms_inout_detail_new.sap_factory_name  |
tt_wms_inout_detail_new.di_etl_date  |
+------------------------------------+---------------------------------+-----------------------------------+----------------------------------+------------------------------------+------------------------------------+---------------------------------------+-----------------------------------+----------------------------------+-----------------------------------+-------------------------------------------+--------------------------------------+--+
| NULL                               | ���
| NULL                              | NULL
| NULL                               | NULL
   | NULL                                  | NULL
        | NULL                             | NULL
        | NULL                                      | NULL
                    |

I am searching for a long time on net. But no use. Please help or try to
give some ideas how to achieve this.

Thanks in advance.