You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hive.apache.org by "László Bodor (Jira)" <ji...@apache.org> on 2021/10/27 15:17:00 UTC
[jira] [Comment Edited] (HIVE-25637) Hive on Tez: inserting data
failing into the non native hive external table managed by kafka storage
handler
[ https://issues.apache.org/jira/browse/HIVE-25637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17434895#comment-17434895 ]
László Bodor edited comment on HIVE-25637 at 10/27/21, 3:16 PM:
----------------------------------------------------------------
Finally, it turned out we need 2 changes:
1. Consider all TableDesc objects for kafka credentials which are present in FileSinkOperators in MapWork. This is because HIVE-23408 has taken care of TableDescs of the source table, but while inserting, the source table is a dummy table, explain is like:
{code}
| Stage: Stage-2 |
| Tez |
| DagId: hive_20211026130429_4c237196-f3ef-4edb-8cc5-584d63f8b03d:4 |
| DagName: hive_20211026130429_4c237196-f3ef-4edb-8cc5-584d63f8b03d:4 |
| Vertices: |
| Map 1 |
| Map Operator Tree: |
| TableScan |
| alias: _dummy_table |
| Row Limit Per Split: 1 |
| Statistics: Num rows: 1 Data size: 10 Basic stats: COMPLETE Column stats: COMPLETE |
| Select Operator |
| expressions: array(const struct(null,'comment',0,1,2,3,null,null,null,null)) (type: array<struct<col1:void,col2:string,col3:int,col4:int,col5:int,col6:decimal(1,0),col7:void,col8:void,col9:void,col10:void>>) |
| outputColumnNames: _col0 |
| Statistics: Num rows: 1 Data size: 48 Basic stats: COMPLETE Column stats: COMPLETE |
| UDTF Operator |
| Statistics: Num rows: 1 Data size: 48 Basic stats: COMPLETE Column stats: COMPLETE |
| function name: inline |
| Select Operator |
| expressions: null (type: timestamp), col2 (type: string), UDFToBoolean(col3) (type: boolean), col4 (type: int), UDFToLong(col5) (type: bigint), UDFToDouble(col6) (type: double), null (type: binary), null (type: int), null (type: bigint), null (type: bigint) |
| outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9 |
| Statistics: Num rows: 1 Data size: 120 Basic stats: COMPLETE Column stats: COMPLETE |
| File Output Operator |
| compressed: false |
| Statistics: Num rows: 1 Data size: 120 Basic stats: COMPLETE Column stats: COMPLETE |
| table: |
| input format: org.apache.hadoop.hive.kafka.KafkaInputFormat |
| output format: org.apache.hadoop.hive.kafka.KafkaOutputFormat |
| serde: org.apache.hadoop.hive.kafka.KafkaSerDe |
| name: default.kafka_table |
| |
{code}
2. merge credentials in TezProcessor to make them available for Hive operators, because there were no tokens present in below codepath -> this caused a fallback to GSSAPI (kerberos auth) instead of token based auth in KafkaUtils
{code}
at org.apache.hadoop.hive.kafka.KafkaUtils.addKerberosJaasConf(KafkaUtils.java:362)
at org.apache.hadoop.hive.kafka.KafkaUtils.producerProperties(KafkaUtils.java:230)
at org.apache.hadoop.hive.kafka.KafkaOutputFormat.getHiveRecordWriter(KafkaOutputFormat.java:56)
at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getRecordWriter(HiveFileFormatUtils.java:294)
at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:279)
at org.apache.hadoop.hive.ql.exec.FileSinkOperator.createBucketForFileIdx(FileSinkOperator.java:872)
at org.apache.hadoop.hive.ql.exec.FileSinkOperator.createBucketFiles(FileSinkOperator.java:823)
at org.apache.hadoop.hive.ql.exec.FileSinkOperator.process(FileSinkOperator.java:1004)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:937)
at org.apache.hadoop.hive.ql.exec.SelectOperator.process(SelectOperator.java:95)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:937)
at org.apache.hadoop.hive.ql.exec.UDTFOperator.forwardUDTFOutput(UDTFOperator.java:133)
at org.apache.hadoop.hive.ql.udf.generic.UDTFCollector.collect(UDTFCollector.java:45)
at org.apache.hadoop.hive.ql.udf.generic.GenericUDTF.forward(GenericUDTF.java:110)
at org.apache.hadoop.hive.ql.udf.generic.GenericUDTFInline.process(GenericUDTFInline.java:64)
at org.apache.hadoop.hive.ql.exec.UDTFOperator.process(UDTFOperator.java:116)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:937)
at org.apache.hadoop.hive.ql.exec.SelectOperator.process(SelectOperator.java:95)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:937)
at org.apache.hadoop.hive.ql.exec.TableScanOperator.process(TableScanOperator.java:128)
at org.apache.hadoop.hive.ql.exec.MapOperator$MapOpCtx.forward(MapOperator.java:152)
at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:552)
at org.apache.hadoop.hive.ql.exec.tez.MapRecordSource.processRow(MapRecordSource.java:92)
at org.apache.hadoop.hive.ql.exec.tez.MapRecordSource.pushRecord(MapRecordSource.java:76)
at org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.run(MapRecordProcessor.java:437)
at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:267)
at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:250)
at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:374)
at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:75)
at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:62)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:62)
at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:38)
at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69)
at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{code}
was (Author: abstractdog):
Finally, it turned out we need 2 changes:
1. Consider all TableDesc objects for kafka credentials which are present in MapWork. This is because HIVE-23408 has taken care of TableDescs of the source table, but while inserting, the source table is a dummy table, explain is like:
{code}
| Stage: Stage-2 |
| Tez |
| DagId: hive_20211026130429_4c237196-f3ef-4edb-8cc5-584d63f8b03d:4 |
| DagName: hive_20211026130429_4c237196-f3ef-4edb-8cc5-584d63f8b03d:4 |
| Vertices: |
| Map 1 |
| Map Operator Tree: |
| TableScan |
| alias: _dummy_table |
| Row Limit Per Split: 1 |
| Statistics: Num rows: 1 Data size: 10 Basic stats: COMPLETE Column stats: COMPLETE |
| Select Operator |
| expressions: array(const struct(null,'comment',0,1,2,3,null,null,null,null)) (type: array<struct<col1:void,col2:string,col3:int,col4:int,col5:int,col6:decimal(1,0),col7:void,col8:void,col9:void,col10:void>>) |
| outputColumnNames: _col0 |
| Statistics: Num rows: 1 Data size: 48 Basic stats: COMPLETE Column stats: COMPLETE |
| UDTF Operator |
| Statistics: Num rows: 1 Data size: 48 Basic stats: COMPLETE Column stats: COMPLETE |
| function name: inline |
| Select Operator |
| expressions: null (type: timestamp), col2 (type: string), UDFToBoolean(col3) (type: boolean), col4 (type: int), UDFToLong(col5) (type: bigint), UDFToDouble(col6) (type: double), null (type: binary), null (type: int), null (type: bigint), null (type: bigint) |
| outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9 |
| Statistics: Num rows: 1 Data size: 120 Basic stats: COMPLETE Column stats: COMPLETE |
| File Output Operator |
| compressed: false |
| Statistics: Num rows: 1 Data size: 120 Basic stats: COMPLETE Column stats: COMPLETE |
| table: |
| input format: org.apache.hadoop.hive.kafka.KafkaInputFormat |
| output format: org.apache.hadoop.hive.kafka.KafkaOutputFormat |
| serde: org.apache.hadoop.hive.kafka.KafkaSerDe |
| name: default.kafka_table |
| |
{code}
2. merge credentials in TezProcessor to make them available for Hive operators, because there were no tokens present in below codepath -> this caused a fallback to GSSAPI (kerberos auth) instead of token based auth in KafkaUtils
{code}
at org.apache.hadoop.hive.kafka.KafkaUtils.addKerberosJaasConf(KafkaUtils.java:362)
at org.apache.hadoop.hive.kafka.KafkaUtils.producerProperties(KafkaUtils.java:230)
at org.apache.hadoop.hive.kafka.KafkaOutputFormat.getHiveRecordWriter(KafkaOutputFormat.java:56)
at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getRecordWriter(HiveFileFormatUtils.java:294)
at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:279)
at org.apache.hadoop.hive.ql.exec.FileSinkOperator.createBucketForFileIdx(FileSinkOperator.java:872)
at org.apache.hadoop.hive.ql.exec.FileSinkOperator.createBucketFiles(FileSinkOperator.java:823)
at org.apache.hadoop.hive.ql.exec.FileSinkOperator.process(FileSinkOperator.java:1004)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:937)
at org.apache.hadoop.hive.ql.exec.SelectOperator.process(SelectOperator.java:95)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:937)
at org.apache.hadoop.hive.ql.exec.UDTFOperator.forwardUDTFOutput(UDTFOperator.java:133)
at org.apache.hadoop.hive.ql.udf.generic.UDTFCollector.collect(UDTFCollector.java:45)
at org.apache.hadoop.hive.ql.udf.generic.GenericUDTF.forward(GenericUDTF.java:110)
at org.apache.hadoop.hive.ql.udf.generic.GenericUDTFInline.process(GenericUDTFInline.java:64)
at org.apache.hadoop.hive.ql.exec.UDTFOperator.process(UDTFOperator.java:116)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:937)
at org.apache.hadoop.hive.ql.exec.SelectOperator.process(SelectOperator.java:95)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:937)
at org.apache.hadoop.hive.ql.exec.TableScanOperator.process(TableScanOperator.java:128)
at org.apache.hadoop.hive.ql.exec.MapOperator$MapOpCtx.forward(MapOperator.java:152)
at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:552)
at org.apache.hadoop.hive.ql.exec.tez.MapRecordSource.processRow(MapRecordSource.java:92)
at org.apache.hadoop.hive.ql.exec.tez.MapRecordSource.pushRecord(MapRecordSource.java:76)
at org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.run(MapRecordProcessor.java:437)
at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:267)
at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:250)
at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:374)
at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:75)
at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:62)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:62)
at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:38)
at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69)
at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{code}
> Hive on Tez: inserting data failing into the non native hive external table managed by kafka storage handler
> -------------------------------------------------------------------------------------------------------------
>
> Key: HIVE-25637
> URL: https://issues.apache.org/jira/browse/HIVE-25637
> Project: Hive
> Issue Type: Improvement
> Reporter: László Bodor
> Assignee: László Bodor
> Priority: Major
> Labels: pull-request-available
> Time Spent: 10m
> Remaining Estimate: 0h
>
> This is the followup for HIVE-23408, repro is below:
> {code}
> CREATE EXTERNAL TABLE `kafka_table`(
> `timestamp` timestamp COMMENT 'from deserializer',
> `page` string COMMENT 'from deserializer',
> `newpage` boolean COMMENT 'from deserializer',
> `added` int COMMENT 'from deserializer',
> `deleted` bigint COMMENT 'from deserializer',
> `delta` double COMMENT 'from deserializer')
> ROW FORMAT SERDE
> 'org.apache.hadoop.hive.kafka.KafkaSerDe'
> STORED BY
> 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
> WITH SERDEPROPERTIES (
> 'serialization.format'='1')
> LOCATION
> 'hdfs://lbodorkafkaunsec-2.lbodorkafkaunsec.root.hwx.site:8020/warehouse/tablespace/external/hive/kafka_table'
> TBLPROPERTIES (
> 'bucketing_version'='2',
> 'hive.kafka.max.retries'='6',
> 'hive.kafka.metadata.poll.timeout.ms'='30000',
> 'hive.kafka.optimistic.commit'='false',
> 'hive.kafka.poll.timeout.ms'='5000',
> 'kafka.bootstrap.servers'='lbodorkafkaunsec-1.lbodorkafkaunsec.root.hwx.site:9092,lbodorkafkaunsec-2.lbodorkafkaunsec.root.hwx.site:9092,lbodorkafkaunsec-3.lbodorkafkaunsec.root.hwx.site:9092',
> 'kafka.serde.class'='org.apache.hadoop.hive.serde2.JsonSerDe',
> 'kafka.topic'='hit-topic-1',
> 'kafka.write.semantic'='AT_LEAST_ONCE');
> SELECT COUNT(*) FROM kafka_table WHERE `__timestamp` > 1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval '10' MINUTES); # works due to HIVE-23408
> insert into kafka_table values(NULL, 'comment', 0, 1, 2, 3.0, NULL, NULL, NULL, NULL); # fails
> {code}
> exception I get:
> {code}
> Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: org.apache.hadoop.hive.ql.metadata.HiveException: org.apache.kafkaesque.common.KafkaException: Failed to construct kafka producer
> at org.apache.hadoop.hive.ql.exec.FileSinkOperator.createBucketFiles(FileSinkOperator.java:829)
> at org.apache.hadoop.hive.ql.exec.FileSinkOperator.process(FileSinkOperator.java:1004)
> at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:937)
> at org.apache.hadoop.hive.ql.exec.SelectOperator.process(SelectOperator.java:95)
> at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:937)
> at org.apache.hadoop.hive.ql.exec.UDTFOperator.forwardUDTFOutput(UDTFOperator.java:133)
> at org.apache.hadoop.hive.ql.udf.generic.UDTFCollector.collect(UDTFCollector.java:45)
> at org.apache.hadoop.hive.ql.udf.generic.GenericUDTF.forward(GenericUDTF.java:110)
> at org.apache.hadoop.hive.ql.udf.generic.GenericUDTFInline.process(GenericUDTFInline.java:64)
> at org.apache.hadoop.hive.ql.exec.UDTFOperator.process(UDTFOperator.java:116)
> at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:937)
> at org.apache.hadoop.hive.ql.exec.SelectOperator.process(SelectOperator.java:95)
> at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:937)
> at org.apache.hadoop.hive.ql.exec.TableScanOperator.process(TableScanOperator.java:128)
> at org.apache.hadoop.hive.ql.exec.MapOperator$MapOpCtx.forward(MapOperator.java:152)
> at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:552)
> ... 20 more
> Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: org.apache.kafkaesque.common.KafkaException: Failed to construct kafka producer
> at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:282)
> at org.apache.hadoop.hive.ql.exec.FileSinkOperator.createBucketForFileIdx(FileSinkOperator.java:872)
> at org.apache.hadoop.hive.ql.exec.FileSinkOperator.createBucketFiles(FileSinkOperator.java:823)
> ... 35 more
> Caused by: org.apache.kafkaesque.common.KafkaException: Failed to construct kafka producer
> at org.apache.kafkaesque.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
> at org.apache.kafkaesque.clients.producer.KafkaProducer.<init>(KafkaProducer.java:313)
> at org.apache.hadoop.hive.kafka.SimpleKafkaWriter.<init>(SimpleKafkaWriter.java:80)
> at org.apache.hadoop.hive.kafka.KafkaOutputFormat.getHiveRecordWriter(KafkaOutputFormat.java:60)
> at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getRecordWriter(HiveFileFormatUtils.java:294)
> at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:279)
> ... 37 more
> Caused by: org.apache.kafkaesque.common.KafkaException: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner authentication information from the user
> at org.apache.kafkaesque.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:158)
> at org.apache.kafkaesque.common.network.ChannelBuilders.create(ChannelBuilders.java:146)
> at org.apache.kafkaesque.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)
> at org.apache.kafkaesque.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99)
> at org.apache.kafkaesque.clients.producer.KafkaProducer.newSender(KafkaProducer.java:450)
> at org.apache.kafkaesque.clients.producer.KafkaProducer.<init>(KafkaProducer.java:421)
> ... 42 more
> Caused by: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner authentication information from the user
> at com.sun.security.auth.module.Krb5LoginModule.promptForPass(Krb5LoginModule.java:944)
> at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:764)
> at com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:617)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at javax.security.auth.login.LoginContext.invoke(LoginContext.java:755)
> at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195)
> at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)
> at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680)
> at javax.security.auth.login.LoginContext.login(LoginContext.java:587)
> at org.apache.kafkaesque.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:60)
> at org.apache.kafkaesque.common.security.kerberos.KerberosLogin.login(KerberosLogin.java:103)
> at org.apache.kafkaesque.common.security.authenticator.LoginManager.<init>(LoginManager.java:62)
> at org.apache.kafkaesque.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:105)
> at org.apache.kafkaesque.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:147)
> ... 47 more
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)