You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by "LYL41011 (via GitHub)" <gi...@apache.org> on 2023/04/11 10:07:14 UTC

[GitHub] [incubator-seatunnel] LYL41011 opened a new issue, #4549: [Bug] [hive sink] Bug title

LYL41011 opened a new issue, #4549:
URL: https://github.com/apache/incubator-seatunnel/issues/4549

   ### Search before asking
   
   - [X] I had searched in the [issues](https://github.com/apache/incubator-seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues.
   
   
   ### What happened
   
   I want to synchronize data from kafka to hive. When I start the task, an error occurs.
   
   Caused by: java.lang.NullPointerException
           at org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractWriteStrategy.lambda$buildSchemaWithRowType$0(AbstractWriteStrategy.java:129)
           at java.util.ArrayList.forEach(ArrayList.java:1259)
           at org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractWriteStrategy.buildSchemaWithRowType(AbstractWriteStrategy.java:127)
   
   The table creation statement is as follows:
   CREATE TABLE `ods_tmp`.`prd_kafka_rta_toutiao_gl_topic`(`reqid` string COMMENT '使用项目', `reqtime` string COMMENT '上报时间', `media` string COMMENT '媒体', `isping` string COMMENT '是否ping测流量', `flowtype` string COMMENT '流量类型', `reqparams` string COMMENT '媒体下发的参数', `imeimd5` string COMMENT '设备 imei 号的 md5加密值', `oaidmd5` string COMMENT '设备 oaid 号的 md5加密值', `idfamd5` string COMMENT '设备 idfa 号的 md5加密值', `bucket` string COMMENT '', `results` string COMMENT '各账户的参竞、识别情况以及决策上下文', `exts` string COMMENT '扩展的字段', `timeconsume` string COMMENT '', `system` string COMMENT '', `debuginfo` string COMMENT '', `ts` string COMMENT '毓数消费时间戳') 
   PARTITIONED BY (`pday` string, `phour` string) 
   ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' WITH SERDEPROPERTIES ( 'serialization.format' = '1' ) STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' 
   
   ### SeaTunnel Version
   
   2.3.1
   
   ### SeaTunnel Config
   
   ```conf
   env {
     execution.parallelism = 1
     job.mode = "STREAMING"
     checkpoint.interval = 20000
   
   }
   
   source {
   
     Kafka {
       result_table_name = "kafka_toutiao_gl_topic_source"
       schema = {
         fields {
   reqId= "string"
   reqTime= "string"
   media= "string"
   isPing= "string"
   flowType= "string"
   reqParams= "string"
   imeiMd5= "string"
   oaidMd5= "string"
   idfaMd5= "string"
   bucket= "string"
   results= "string"
   exts= "string"
   timeConsume= "string"
   system= "string"
   debugInfo= "string"
         }
       }
       topic = "toutiao_gl_topic"
       bootstrap.servers = "10.220.193.248:39092"
       kafka.max.poll.records = 10000
       kafka.auto.offset.reset = earliest
       consumer.group =  "yushu_dis_seatunnel"
     }
   
   }
   
   transform {
   Sql {
       source_table_name = "kafka_toutiao_gl_topic_source"
       result_table_name = "kafka_toutiao_gl_topic_result"
   query="select IFNULL(reqId,''),IFNULL(reqTime,''),IFNULL(media,''),IFNULL(isPing,''),IFNULL(flowType,''),IFNULL(reqParams,''),IFNULL(imeiMd5,''),IFNULL(oaidMd5,''),IFNULL(idfaMd5,''),IFNULL(bucket,''),IFNULL(results,''),IFNULL(exts,''),IFNULL(timeConsume,''),IFNULL(system,''),IFNULL(debugInfo,''),current_timestamp as ts,SUBSTRING(REPLACE(reqTime,'-',''),1,8) AS pday,SUBSTRING(REPLACE(reqTime,'-',''),10,2)  AS phour from kafka_toutiao_gl_topic_source"
   }
   }
   sink {
     Hive {
       source_table_name = "kafka_toutiao_gl_topic_result"
       table_name = "ods_tmp.prd_kafka_rta_toutiao_gl_topic"
       metastore_uri = "thrift://xxx:9083"
     }
   
   }
   ```
   
   
   ### Running Command
   
   ```shell
   ../bin/start-seatunnel-flink-13-connector-v2.sh --config kafka2hive_toutiao_gl_topic -m yarn-cluster -ynm seatunnel
   ```
   
   
   ### Error Exception
   
   ```log
   [finloan@jds01 config]$ ../bin/start-seatunnel-flink-13-connector-v2.sh --config kafka2hive_toutiao_gl_topic -m yarn-cluster -ynm seatunnel
   Execute SeaTunnel Flink Job: ${FLINK_HOME}/bin/flink run -m yarn-cluster -ynm seatunnel -c org.apache.seatunnel.core.starter.flink.SeaTunnelFlink /home/q/dis/apache-seatunnel-incubating-2.3.1/starter/seatunnel-flink-13-starter.jar --config kafka2hive_toutiao_gl_topic --name SeaTunnel
   Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR or HADOOP_CLASSPATH was set.
   Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
   2023-04-11 17:58:43,199 INFO  org.apache.hadoop.security.UserGroupInformation              [] - Login successful for user dis@LYCC.COM using keytab file /home/q/dis/dis.keytab
   
   ------------------------------------------------------------
    The program finished with the following exception:
   
   org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Flink job executed failed
           at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
           at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
           at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
           at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
           at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
           at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
           at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
           at java.security.AccessController.doPrivileged(Native Method)
           at javax.security.auth.Subject.doAs(Subject.java:422)
           at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
           at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
           at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
   Caused by: org.apache.seatunnel.core.starter.exception.CommandExecuteException: Flink job executed failed
           at org.apache.seatunnel.core.starter.flink.command.FlinkTaskExecuteCommand.execute(FlinkTaskExecuteCommand.java:63)
           at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
           at org.apache.seatunnel.core.starter.flink.SeaTunnelFlink.main(SeaTunnelFlink.java:34)
           at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
           at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.lang.reflect.Method.invoke(Method.java:498)
           at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
           ... 11 more
   Caused by: java.lang.NullPointerException
           at org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractWriteStrategy.lambda$buildSchemaWithRowType$0(AbstractWriteStrategy.java:129)
           at java.util.ArrayList.forEach(ArrayList.java:1259)
           at org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractWriteStrategy.buildSchemaWithRowType(AbstractWriteStrategy.java:127)
           at org.apache.seatunnel.connectors.seatunnel.file.sink.writer.TextWriteStrategy.setSeaTunnelRowTypeInfo(TextWriteStrategy.java:68)
           at org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink.setTypeInfo(BaseFileSink.java:71)
           at org.apache.seatunnel.core.starter.flink.execution.SinkExecuteProcessor.execute(SinkExecuteProcessor.java:110)
           at org.apache.seatunnel.core.starter.flink.execution.FlinkExecution.execute(FlinkExecution.java:109)
           at org.apache.seatunnel.core.starter.flink.command.FlinkTaskExecuteCommand.execute(FlinkTaskExecuteCommand.java:61)
   ```
   
   
   ### Flink or Spark Version
   
   Flink  1.13.6
   
   ### Java or Scala Version
   
   1.8.0
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] LYL41011 closed issue #4549: [Bug] [connector-hive] sink hive AbstractWriteStrategy.buildSchemaWithRowType NullPointerException

Posted by "LYL41011 (via GitHub)" <gi...@apache.org>.
LYL41011 closed issue #4549: [Bug] [connector-hive] sink hive  AbstractWriteStrategy.buildSchemaWithRowType NullPointerException
URL: https://github.com/apache/incubator-seatunnel/issues/4549


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [seatunnel] SinyoWong commented on issue #4549: [Bug] [connector-hive] sink hive AbstractWriteStrategy.buildSchemaWithRowType NullPointerException

Posted by "SinyoWong (via GitHub)" <gi...@apache.org>.
SinyoWong commented on issue #4549:
URL: https://github.com/apache/seatunnel/issues/4549#issuecomment-1736614552

   给我这个PR,这个问题我解决了。
   Give me a PR, this problem I've solved.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [seatunnel] RU-jiayijie commented on issue #4549: [Bug] [connector-hive] sink hive AbstractWriteStrategy.buildSchemaWithRowType NullPointerException

Posted by "RU-jiayijie (via GitHub)" <gi...@apache.org>.
RU-jiayijie commented on issue #4549:
URL: https://github.com/apache/seatunnel/issues/4549#issuecomment-1588980402

   hive 表字段是不区分大小写的,所以你的query=中的字段必须 as 小写hive建表时的字段名 如下:
   `env {
     execution.parallelism = 1
     job.mode = "STREAMING"
     execution.checkpoint.interval = 60000
     format.fail-on-missing-field = false
   }
   
   source {
   
     Kafka {
       result_table_name = "xxxxx"
       format = json
       json.fail-on-missing-field = false
       json.ignore-parse-errors = true
       schema = {
         fields {
           EvnetBody = "string"
           EventDay = "string"
           EventTime = "string"
         }
       }
       start_mode = "group_offsets"
       topic = "xxx"
       consumer.group = "xxxx"
       commit_on_checkpoint = true
       bootstrap.servers = "xxxxxxxxx"
     }
     
   }
   
   transform {
     sql {
       source_table_name = "xxxxxxxx"
       result_table_name = "xxxxxxxx"
       query = "select EvnetBody as eventbody, replace(EventDay,'-','') as dt, EventTime as hour from xxxxxxx",
     }
   }
   
   sink {
     Hive {
       source_table_name = "xxxxx"
       table_name = "test.xxxxx"
       metastore_uri = "xxxxxxx"
       fields = ["eventbody","dt","hour"]
     }
   }`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [Bug] [connector-hive] sink hive AbstractWriteStrategy.buildSchemaWithRowType NullPointerException [seatunnel]

Posted by "zengqinchris (via GitHub)" <gi...@apache.org>.
zengqinchris commented on issue #4549:
URL: https://github.com/apache/seatunnel/issues/4549#issuecomment-1855281676

   大佬们,你们指定 start_mode = "group_offsets" 可以消费数据吗,我的一直不能消费数据,换成新的消费者组也是不行的,只能消费新的数据,之前的历史数据,只能消费新的数据
   env {
   parallelism = 1
   job.mode = "STREAMING"
   checkpoint.interval = 15
   }
   source {
     Kafka {
       format = json
       field_delimiter = ","
   	json.fail-on-missing-field = false
       json.ignore-parse-errors = true
       format_error_handle_way = "fail"
       topic = "kv_info"
       consumer.group = "kv_info_client_12"
   	commit_on_checkpoint = true
   	start_mode = "group_offsets"
       bootstrap.servers = "bd212:9092,bd213:9092,bd214:9092"
       kafka.config = {
         client.id = kv_info_client_12
         max.poll.records = 50000
   	  enable.auto.commit = false
       }
     }
   }
   transform {
   }
   sink {
   Console {
       } 
   }


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] yujh1981 commented on issue #4549: [Bug] [connector-hive] sink hive AbstractWriteStrategy.buildSchemaWithRowType NullPointerException

Posted by "yujh1981 (via GitHub)" <gi...@apache.org>.
yujh1981 commented on issue #4549:
URL: https://github.com/apache/incubator-seatunnel/issues/4549#issuecomment-1518551597

   I find the same error。
   How can i do to resolve it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org