You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "zhanglu153 (Jira)" <ji...@apache.org> on 2022/11/18 07:48:00 UTC

[jira] [Comment Edited] (FLINK-30075) Failed to load data to the cache after the hive lookup join task is restarted

    [ https://issues.apache.org/jira/browse/FLINK-30075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17635720#comment-17635720 ] 

zhanglu153 edited comment on FLINK-30075 at 11/18/22 7:47 AM:
--------------------------------------------------------------

1.When is the properties object in the Configuration object of DFSClient#namenode proxy in the DistributedFileSystem object set to null?

When calling the org.apache.flink.connectors.hive.read.HiveTableInputFormat#createInputSplits(int, java.util.List<org.apache.flink.connectors.hive.HiveTablePartition>, org.apache.hadoop.mapred.JobConf) method for the first time before restarting the task, set the properties object in the Configuration object of DFSClient#namenode proxy in the DistributedFileSystem object to null in the following figure:

!Set the properties object in the Configuration object of DFSClient#namenode proxy in the DistributedFileSystem object to null.png|width=608,height=301!

You can see that the org.apache.hadoop.mapred.FileInputFormat#getSplits method is called at the end of the org.apache.flink.connectors.hive.read.HiveTableInputFormat#createInputSplits(int, java.util.List<org.apache.flink.connectors.hive.HiveTablePartition>, org.apache.hadoop.mapred.JobConf) method, and then it will be called all the way to the org.apache.hadoop.mapred.Master#getMasterPrincipal method. In method org.apache.hadoop.mapred.Master#getMasterPrincipal, there is an if else code block. In the logic of the else code block, method org.apache.hadoop.yarn.client.util.YarnClientUtils#getRmPrincipal(org.apache.hadoop.conf.Configuration) will be called, and then it will be called all the way to org.apache.hadoop.yarn.client.util.YarnClientUtils#getYarnConfWithRmHaId. A new YarnConfiguration object will be created here. There is a static code block in the org.apache.hadoop.yarn.conf.YarnConfiguration class. When this static code block calls method org.apache.hadoop.conf.Configuration#addDefaultResource, it will call method org.apache.hadoop.conf.Configuration#reloadConfiguration in the for loop to set the properties object in all Configuration objects to null(Note that the static code block in the YarnConfiguration class is executed only when the method org.apache.hadoop.mapred.FileInputFormat#getSplits is called for the first time, because the YarnConfiguration class has not been loaded by the class loader. Therefore, if the HiveTableInputFormat#createInputSplits method is called again, the static code block in the YarnConfiguration class will not be executed, that is, the property object will not be null again.)

!hadoop2.10.1 org.apache.hadoop.mapred.Master#getMasterPrincipal.png|width=685,height=189!

!hadoop2.10.1 org.apache.hadoop.yarn.client.util.YarnClientUtils#getYarnConfWithRmHaId.png|width=423,height=185!

!hadoop2.10.1 org.apache.hadoop.yarn.conf.YarnConfiguration static code block.png|width=472,height=245!

!org.apache.hadoop.conf.Configuration#addDefaultResource.png|width=366,height=218!


was (Author: JIRAUSER298392):
1.When is the properties object in the Configuration object of DFSClient#namenode proxy in the DistributedFileSystem object set to null?

When calling the org.apache.flink.connectors.hive.read.HiveTableInputFormat#createInputSplits(int, java.util.List<org.apache.flink.connectors.hive.HiveTablePartition>, org.apache.hadoop.mapred.JobConf) method for the first time before restarting the task, set the properties object in the Configuration object of DFSClient#namenode proxy in the DistributedFileSystem object to null in the following figure:

!Set the properties object in the Configuration object of DFSClient#namenode proxy in the DistributedFileSystem object to null.png|width=608,height=301!

You can see that the org.apache.hadoop.mapred.FileInputFormat#getSplits method is called at the end of the org.apache.flink.connectors.hive.read.HiveTableInputFormat#createInputSplits(int, java.util.List<org.apache.flink.connectors.hive.HiveTablePartition>, org.apache.hadoop.mapred.JobConf) method, and then it will be called all the way to the org.apache.hadoop.mapred.Master#getMasterPrincipal method. In method org.apache.hadoop.mapred.Master#getMasterPrincipal, there is an if else code block. In the logic of the else code block, method org.apache.hadoop.yarn.client.util.YarnClientUtils#getRmPrincipal(org.apache.hadoop.conf.Configuration) will be called, and then it will be called all the way to org.apache.hadoop.yarn.client.util.YarnClientUtils#getYarnConfWithRmHaId. A new YarnConfiguration object will be created here. There is a static code block in the org.apache.hadoop.yarn.conf.YarnConfiguration class. When this static code block calls method org.apache.hadoop.conf.Configuration#addDefaultResource, it will call method org.apache.hadoop.conf.Configuration#reloadConfiguration in the for loop to set the properties object in all Configuration objects to null(Note that the static code block in the YarnConfiguration class is executed only when the method org.apache.hadoop.mapred.FileInputFormat#getSplits is called for the first time, because the YarnConfiguration class has not been loaded by the class loader. Therefore, if the HiveTableInputFormat#createInputSplits method is called again, the static code block in the YarnConfiguration class will not be executed, that is, the property object will not be null again.)

!hadoop2.10.1 org.apache.hadoop.mapred.Master#getMasterPrincipal.png|width=685,height=189!

!hadoop2.10.1 org.apache.hadoop.yarn.client.util.YarnClientUtils#getYarnConfWithRmHaId.png|width=423,height=185!

!hadoop2.10.1 org.apache.hadoop.yarn.conf.YarnConfiguration static code block.png|width=472,height=245!

!org.apache.hadoop.conf.Configuration#addDefaultResource.png|width=366,height=218!

 

 

> Failed to load data to the cache after the hive lookup join task is restarted
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-30075
>                 URL: https://issues.apache.org/jira/browse/FLINK-30075
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Hive
>    Affects Versions: 1.12.2
>            Reporter: zhanglu153
>            Priority: Major
>         Attachments: Set the properties object in the Configuration object of DFSClient#namenode proxy in the DistributedFileSystem object to null.png, hadoop2.10.1 org.apache.hadoop.mapred.Master#getMasterPrincipal.png, hadoop2.10.1 org.apache.hadoop.security.SaslRpcClient#conf.png, hadoop2.10.1 org.apache.hadoop.yarn.client.util.YarnClientUtils#getYarnConfWithRmHaId.png, hadoop2.10.1 org.apache.hadoop.yarn.conf.YarnConfiguration static code block.png, hadoop2.8.5 org.apache.hadoop.security.SaslRpcClient#conf.png, org.apache.flink.connectors.hive.read.HiveTableInputFormat#createInputSplits.png, org.apache.hadoop.conf.Configuration#addDefaultResource.png, org.apache.hadoop.security.SaslRpcClient.png
>
>
> When I test kafka and hive lookup join, only the connection between tm and zookeeper is disconnected after the data is successfully loaded into the cache. In this case, the task will be restarted because the flink task restart policy is configured (note that the task process does not change at this time, and the task restarts in the same container).
> The task sql is as follows:
> {code:java}
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
> tableEnv.executeSql("CREATE TABLE kafka_table_1 (\n" +
> "  a string,\n" +
> "  b int,\n" +
> "  c string,\n" +
> "  log_ts TIMESTAMP(0),\n" +
> ") WITH (\n" +
> "    'connector' = 'kafka',\n" +
> "    'topic' = 'test',\n" +
> "    'scan.startup.mode' = 'latest-offset',\n" +
> "    'properties.bootstrap.servers' = '172.16.144.208:9092',\n" +
> "    'format' = 'csv'\n" +
> ")");
> String name = "hive_catalog";
> String hiveConfDir = "/cloud/service/flink/conf";
> HiveCatalog hive = new HiveCatalog(name, null, hiveConfDir);
> hive.open();
> tableEnv.registerCatalog(name, hive);
> tableEnv.useCatalog(name);
> tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> tableEnv.executeSql("create table if not exists ttt(a string,b int,c string)");
> tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> tableEnv.executeSql("select * from default_catalog.default_database.kafka_table_1 as u join ttt for system_time as of u.proctime as o on u.a = o.a").print();{code}
> After the task is successfully restarted, kafka produces data. When some data comes in, an error message is displayed indicating that loading data into the cache fails. The error message is as follows:
> {code:java}
> 2022-11-17 14:09:16,041 INFO  org.apache.flink.table.filesystem.FileSystemLookupFunction   [] - Populating lookup join cache
> 2022-11-17 14:09:16,051 INFO  org.apache.hadoop.conf.Configuration                         [] - getProps 1230612033
> 2022-11-17 14:09:16,053 INFO  org.apache.hadoop.io.retry.RetryInvocationHandler            [] - java.io.IOException: org.apache.flink.shaded.hadoop2.com.google.protobuf.ServiceException: java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'., while invoking ClientNamenodeProtocolTranslatorPB.getFileInfo over hdp-hadoop-hdp-namenode-1.hdp-hadoop-hdp-namenode.hdp-dev-env-3.svc.cluster.local/192.168.6.85:9000. Trying to failover immediately.
> 2022-11-17 14:09:16,053 DEBUG org.apache.hadoop.ipc.Client                                 [] - The ping interval is 60000 ms.
> 2022-11-17 14:09:16,053 DEBUG org.apache.hadoop.ipc.Client                                 [] - Connecting to hdp-hadoop-hdp-namenode-0.hdp-hadoop-hdp-namenode.hdp-dev-env-3.svc.cluster.local/192.168.0.8:9000
> 2022-11-17 14:09:16,055 DEBUG org.apache.hadoop.security.UserGroupInformation              [] - PrivilegedAction as:hadoop/hdp-flinkhistory-hdp-flink-history-6754db4f9c-8b8nd.hdp-flinkhistory-hdp-flink-history.hdp-dev-env-3.svc.cluster.local@DAHUA.COM (auth:KERBEROS) from:org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:825)
> 2022-11-17 14:09:16,056 DEBUG org.apache.hadoop.security.SaslRpcClient                     [] - Sending sasl message state: NEGOTIATE
> 2022-11-17 14:09:16,057 DEBUG org.apache.hadoop.security.SaslRpcClient                     [] - Get token info proto:interface org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB info:@org.apache.hadoop.security.token.TokenInfo(value=class org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector)
> 2022-11-17 14:09:16,057 INFO  org.apache.hadoop.security.SaslRpcClient                     [] - getServerPrincipal 1230612033
> 2022-11-17 14:09:16,057 DEBUG org.apache.hadoop.security.SaslRpcClient                     [] - Get kerberos info proto:interface org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB info:@org.apache.hadoop.security.KerberosInfo(clientPrincipal=, serverPrincipal=dfs.namenode.kerberos.principal)
> 2022-11-17 14:09:16,059 DEBUG org.apache.hadoop.security.SaslRpcClient                     [] - getting serverKey: dfs.namenode.kerberos.principal conf value: null principal: null
> 2022-11-17 14:09:16,059 DEBUG org.apache.hadoop.ipc.Client                                 [] - closing ipc connection to hdp-hadoop-hdp-namenode-0.hdp-hadoop-hdp-namenode.hdp-dev-env-3.svc.cluster.local/192.168.0.8:9000: Couldn't set up IO streams: java.lang.IllegalArgumentException: Failed to specify server's Kerberos principal name
> java.io.IOException: Couldn't set up IO streams: java.lang.IllegalArgumentException: Failed to specify server's Kerberos principal name
> 	at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:891) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
> 	at org.apache.hadoop.ipc.Client$Connection.access$3700(Client.java:423) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
> 	at org.apache.hadoop.ipc.Client.getConnection(Client.java:1615) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1446) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1399) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:118) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
> 	at com.sun.proxy.$Proxy27.getFileInfo(Unknown Source) ~[?:?]
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:800) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_342]
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_342]
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_342]
> 	at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_342]
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
> 	at com.sun.proxy.$Proxy28.getFileInfo(Unknown Source) ~[?:?]
> 	at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1673) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
> 	at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1524) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
> 	at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1521) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
> 	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1536) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
> 	at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1632) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
> 	at org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:334) ~[flink-connector-hive_2.11-1.12.2-HDP-22.10.28.jar:1.12.2-HDP-22.10.28]
> 	at org.apache.flink.connectors.hive.read.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:318) ~[flink-connector-hive_2.11-1.12.2-HDP-22.10.28.jar:1.12.2-HDP-22.10.28]
> 	at org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:83) ~[flink-connector-hive_2.11-1.12.2-HDP-22.10.28.jar:1.12.2-HDP-22.10.28]
> 	at org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:132) ~[flink-table-blink_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
> 	at org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105) ~[flink-table-blink_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
> 	at org.apache.flink.table.filesystem.security.HdpFileSystemLookupFunction.lambda$eval$1(HdpFileSystemLookupFunction.java:59) ~[flink-table-blink_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
> 	at org.apache.flink.runtime.security.SecurityUtils.runAtOneContext(SecurityUtils.java:100) [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
> 	at org.apache.flink.table.filesystem.security.HdpFileSystemLookupFunction.eval(HdpFileSystemLookupFunction.java:58) [flink-table-blink_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
> 	at LookupFunction$10.flatMap(Unknown Source) [flink-table-blink_2.11-1.12.2-HDP-22.10.09.jar:?]
> 	at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:81) [flink-table-blink_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
> 	at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:34) [flink-table-blink_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
> 	at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
> 	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
> 	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
> 	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
> 	at StreamExecCalc$7.processElement(Unknown Source) [flink-table-blink_2.11-1.12.2-HDP-22.10.09.jar:?]
> 	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
> 	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
> 	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322) [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426) [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
> 	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365) [flink-sql-connector-kafka_2.11-1.12.2-HDP-22.09.20.jar:1.12.2-HDP-22.09.20]
> 	at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183) [flink-sql-connector-kafka_2.11-1.12.2-HDP-22.09.20.jar:1.12.2-HDP-22.09.20]
> 	at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) [flink-sql-connector-kafka_2.11-1.12.2-HDP-22.09.20.jar:1.12.2-HDP-22.09.20]
> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) [flink-sql-connector-kafka_2.11-1.12.2-HDP-22.09.20.jar:1.12.2-HDP-22.09.20]
> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
> 	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263) [flink-dist_2.11-1.12.2-HDP-22.10.09.jar:1.12.2-HDP-22.10.09]
> Caused by: java.lang.IllegalArgumentException: Failed to specify server's Kerberos principal name
> 	at org.apache.hadoop.security.SaslRpcClient.getServerPrincipal(SaslRpcClient.java:326) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
> 	at org.apache.hadoop.security.SaslRpcClient.createSaslClient(SaslRpcClient.java:231) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
> 	at org.apache.hadoop.security.SaslRpcClient.selectSaslClient(SaslRpcClient.java:159) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
> 	at org.apache.hadoop.security.SaslRpcClient.saslConnect(SaslRpcClient.java:391) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
> 	at org.apache.hadoop.ipc.Client$Connection.setupSaslConnection(Client.java:617) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
> 	at org.apache.hadoop.ipc.Client$Connection.access$2200(Client.java:423) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
> 	at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:829) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
> 	at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:825) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
> 	at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_342]
> 	at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_342]
> 	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:2012) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
> 	at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:825) ~[flink-shaded-hadoop-2-uber-2.10.1-HDP-22.09.1-release-10.0.jar:2.10.1-HDP-22.09.1-release-10.0]
> 	... 56 more{code}
> The task did not fail, but hive failed to load data into the cache.
> The hadoop version used in the test is 2.10.1. During the test, it was found that changing the hadoop version to 2.8.5 did not throw an exception. Only after hive data is loaded to the cache for the first time and the task is restarted, the restarted task cannot properly load hive data to the cache; The test found that if the nth time (n>=2) load the hive data into the cache and the task is restarted, the restarted task can normally load the hive data into the cache.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)