You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "tartarus (Jira)" <ji...@apache.org> on 2022/07/01 07:05:00 UTC

[jira] [Created] (FLINK-28342) Flink batch support for Hive StorageHandlers

tartarus created FLINK-28342:
--------------------------------

             Summary: Flink batch support for Hive StorageHandlers
                 Key: FLINK-28342
                 URL: https://issues.apache.org/jira/browse/FLINK-28342
             Project: Flink
          Issue Type: Sub-task
          Components: Connectors / Hive
            Reporter: tartarus


Hive introduced StorageHandlers when integrating Hbase, we can refer to the documentation:

[https://cwiki.apache.org/confluence/display/Hive/HBaseIntegration]

[https://cwiki.apache.org/confluence/display/Hive/StorageHandlers]

Usually a Hive table does not set InputFormat if it uses StorageHandler, but currently Flink's MRSplitsGetter does not consider this case. 

When accessing an external Hbase table mapped by Hive using the Flink dialect, an NPE is thrown.
{code:java}
2022-07-01 15:03:09,240 ERROR org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Failed to create Source Enumerator for source Source: dp_workflow_dag_run_hbase[1]
org.apache.flink.util.FlinkRuntimeException: Could not enumerate file splits
	at org.apache.flink.connector.file.src.AbstractFileSource.createEnumerator(AbstractFileSource.java:143) ~[flink-connector-files-1.15.0.jar:1.15.0]
	at org.apache.flink.connectors.hive.HiveSource.createEnumerator(HiveSource.java:124) ~[flink-sql-connector-hive-2.3.6_2.12-1.15.0.jar:1.15.0]
	at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:197) ~[flink-dist-1.15.0.jar:1.15.0]
	at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:318) ~[flink-dist-1.15.0.jar:1.15.0]
	at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.start(RecreateOnResetOperatorCoordinator.java:71) ~[flink-dist-1.15.0.jar:1.15.0]
	at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:196) ~[flink-dist-1.15.0.jar:1.15.0]
	at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:165) ~[flink-dist-1.15.0.jar:1.15.0]
	at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.registerAndStartNewCoordinators(DefaultOperatorCoordinatorHandler.java:159) ~[flink-dist-1.15.0.jar:1.15.0]
	at org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.initializeOperatorCoordinatorsFor(AdaptiveBatchScheduler.java:295) ~[flink-dist-1.15.0.jar:1.15.0]
	at org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.updateTopology(AdaptiveBatchScheduler.java:287) ~[flink-dist-1.15.0.jar:1.15.0]
	at org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.initializeVerticesIfPossible(AdaptiveBatchScheduler.java:181) ~[flink-dist-1.15.0.jar:1.15.0]
	at org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.startSchedulingInternal(AdaptiveBatchScheduler.java:147) ~[flink-dist-1.15.0.jar:1.15.0]
	at org.apache.flink.runtime.scheduler.SpeculativeScheduler.startSchedulingInternal(SpeculativeScheduler.java:162) ~[flink-dist-1.15.0.jar:1.15.0]
	at org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:626) ~[flink-dist-1.15.0.jar:1.15.0]
	at org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:1092) ~[flink-dist-1.15.0.jar:1.15.0]
	at org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:965) ~[flink-dist-1.15.0.jar:1.15.0]
	at org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:406) ~[flink-dist-1.15.0.jar:1.15.0]
	at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181) ~[flink-dist-1.15.0.jar:1.15.0]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.lambda$start$0(AkkaRpcActor.java:612) ~[flink-rpc-akka_745a2262-f519-4375-8b64-8bf2f805f55a.jar:1.15.0]
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_745a2262-f519-4375-8b64-8bf2f805f55a.jar:1.15.0]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:611) ~[flink-rpc-akka_745a2262-f519-4375-8b64-8bf2f805f55a.jar:1.15.0]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:185) ~[flink-rpc-akka_745a2262-f519-4375-8b64-8bf2f805f55a.jar:1.15.0]
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [flink-rpc-akka_745a2262-f519-4375-8b64-8bf2f805f55a.jar:1.15.0]
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [flink-rpc-akka_745a2262-f519-4375-8b64-8bf2f805f55a.jar:1.15.0]
	at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-rpc-akka_745a2262-f519-4375-8b64-8bf2f805f55a.jar:1.15.0]
	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-rpc-akka_745a2262-f519-4375-8b64-8bf2f805f55a.jar:1.15.0]
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [flink-rpc-akka_745a2262-f519-4375-8b64-8bf2f805f55a.jar:1.15.0]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-rpc-akka_745a2262-f519-4375-8b64-8bf2f805f55a.jar:1.15.0]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_745a2262-f519-4375-8b64-8bf2f805f55a.jar:1.15.0]
	at akka.actor.Actor.aroundReceive(Actor.scala:537) [flink-rpc-akka_745a2262-f519-4375-8b64-8bf2f805f55a.jar:1.15.0]
	at akka.actor.Actor.aroundReceive$(Actor.scala:535) [flink-rpc-akka_745a2262-f519-4375-8b64-8bf2f805f55a.jar:1.15.0]
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) [flink-rpc-akka_745a2262-f519-4375-8b64-8bf2f805f55a.jar:1.15.0]
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) [flink-rpc-akka_745a2262-f519-4375-8b64-8bf2f805f55a.jar:1.15.0]
	at akka.actor.ActorCell.invoke(ActorCell.scala:548) [flink-rpc-akka_745a2262-f519-4375-8b64-8bf2f805f55a.jar:1.15.0]
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [flink-rpc-akka_745a2262-f519-4375-8b64-8bf2f805f55a.jar:1.15.0]
	at akka.dispatch.Mailbox.run(Mailbox.scala:231) [flink-rpc-akka_745a2262-f519-4375-8b64-8bf2f805f55a.jar:1.15.0]
	at akka.dispatch.Mailbox.exec(Mailbox.scala:243) [flink-rpc-akka_745a2262-f519-4375-8b64-8bf2f805f55a.jar:1.15.0]
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_102]
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_102]
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_102]
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) [?:1.8.0_102]
Caused by: java.io.IOException: Fail to create input splits.
	at org.apache.flink.connectors.hive.MRSplitsGetter.getHiveTablePartitionMRSplits(MRSplitsGetter.java:84) ~[flink-sql-connector-hive-2.3.6_2.12-1.15.0.jar:1.15.0]
	at org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createInputSplits(HiveSourceFileEnumerator.java:69) ~[flink-sql-connector-hive-2.3.6_2.12-1.15.0.jar:1.15.0]
	at org.apache.flink.connectors.hive.HiveSourceFileEnumerator.enumerateSplits(HiveSourceFileEnumerator.java:60) ~[flink-sql-connector-hive-2.3.6_2.12-1.15.0.jar:1.15.0]
	at org.apache.flink.connector.file.src.AbstractFileSource.createEnumerator(AbstractFileSource.java:141) ~[flink-connector-files-1.15.0.jar:1.15.0]
	... 40 more
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.connectors.hive.FlinkHiveException: Unable to instantiate the hadoop input format
	at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:1.8.0_102]
	at java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[?:1.8.0_102]
	at org.apache.flink.connectors.hive.MRSplitsGetter.getHiveTablePartitionMRSplits(MRSplitsGetter.java:79) ~[flink-sql-connector-hive-2.3.6_2.12-1.15.0.jar:1.15.0]
	at org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createInputSplits(HiveSourceFileEnumerator.java:69) ~[flink-sql-connector-hive-2.3.6_2.12-1.15.0.jar:1.15.0]
	at org.apache.flink.connectors.hive.HiveSourceFileEnumerator.enumerateSplits(HiveSourceFileEnumerator.java:60) ~[flink-sql-connector-hive-2.3.6_2.12-1.15.0.jar:1.15.0]
	at org.apache.flink.connector.file.src.AbstractFileSource.createEnumerator(AbstractFileSource.java:141) ~[flink-connector-files-1.15.0.jar:1.15.0]
	... 40 more
Caused by: org.apache.flink.connectors.hive.FlinkHiveException: Unable to instantiate the hadoop input format
	at org.apache.flink.connectors.hive.MRSplitsGetter$MRSplitter.call(MRSplitsGetter.java:126) ~[flink-sql-connector-hive-2.3.6_2.12-1.15.0.jar:1.15.0]
	at org.apache.flink.connectors.hive.MRSplitsGetter$MRSplitter.call(MRSplitsGetter.java:96) ~[flink-sql-connector-hive-2.3.6_2.12-1.15.0.jar:1.15.0]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_102]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[?:1.8.0_102]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[?:1.8.0_102]
	at java.lang.Thread.run(Thread.java:745) ~[?:1.8.0_102]
Caused by: java.lang.NullPointerException
	at java.lang.Class.forName0(Native Method) ~[?:1.8.0_102]
	at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_102]
	at org.apache.flink.connectors.hive.MRSplitsGetter$MRSplitter.call(MRSplitsGetter.java:120) ~[flink-sql-connector-hive-2.3.6_2.12-1.15.0.jar:1.15.0]
	at org.apache.flink.connectors.hive.MRSplitsGetter$MRSplitter.call(MRSplitsGetter.java:96) ~[flink-sql-connector-hive-2.3.6_2.12-1.15.0.jar:1.15.0]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_102]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[?:1.8.0_102]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[?:1.8.0_102]
	at java.lang.Thread.run(Thread.java:745) ~[?:1.8.0_102] {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)