You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/02/11 10:04:50 UTC

[GitHub] [iceberg] gcnote opened a new issue #4092: Not an iceberg table Error use hive catalog-type

gcnote opened a new issue #4092:
URL: https://github.com/apache/iceberg/issues/4092


   test flink-1.13.5 sql cdc mysql to iceberg with zeppelin-0.11.0
   
   jars in flink lib:
   flink-sql-connector-hive-3.1.2_2.11-1.13.5.jar
   flink-sql-connector-mysql-cdc-2.1.1.jar
   hadoop-mapreduce-client-core-3.1.3.jar
   iceberg-flink-runtime-1.13-0.13.0.jar
   
   step1:
   %flink.conf
   execution.checkpointing.interval 3000
   execution.checkpointing.externalized-checkpoint-retention RETAIN_ON_CANCELLATION
   state.backend filesystem
   state.checkpoints.dir hdfs://xxxx/test/flink-cdc-checkpoint
   execution.type streaming
   
   step2:
   %flink.ssql
    drop table if exists user_source;
    CREATE TABLE user_source (
       database_name STRING METADATA VIRTUAL,
       table_name STRING METADATA VIRTUAL,
       `id` int NOT NULL,
       name STRING,
       address STRING,
       phone_number STRING,
       email STRING,
       PRIMARY KEY (`id`) NOT ENFORCED
     ) WITH (
       'connector' = 'mysql-cdc',
       'hostname' = 'xxxx',
       'port' = '3306',
       'username' = 'xxxx',
       'password' = 'xxxx',
       'database-name' = 'test',
       'table-name' = 'cdc_user_[0-9]+'
     );
   
   step3:
   %flink.ssql
   drop table if exists all_users_sink;
   CREATE TABLE all_users_sink (
       database_name STRING,
       table_name    STRING,
       `id`          int NOT NULL,
       name          STRING,
       address       STRING,
       phone_number  STRING,
       email         STRING,
       PRIMARY KEY (database_name, table_name, `id`) NOT ENFORCED
     ) WITH (
       'connector'='iceberg',
       'catalog-name'='hive_prod',
       'catalog-type'='hive',  
       'uri'='thrift://xxxx:9083',
       'warehouse'='hdfs://xxxx/hive/warehouse',
       'format-version'='2',
       'engine.hive.enabled' = 'true',
       'write.metadata.delete-after-commit.enabled'='true',
       'write.metadata.previous-versions-max'='5'
     );
   
   step4:
   %flink.ssql
   INSERT INTO all_users_sink select * from user_source;
   
   step4 throw exception as below:
   
   Fail to run sql command: INSERT INTO all_users_sink select * from user_source
   java.io.IOException: org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'hive.default.all_users_sink'.
   
   Table options are:
   
   'catalog-name'='hive_prod'
   'catalog-type'='hive'
   'connector'='iceberg'
   'engine.hive.enabled'='true'
   'format-version'='2'
   'uri'='thrift://xxxx:9083'
   'warehouse'='hdfs://xxxx/hive/warehouse'
   'write.metadata.delete-after-commit.enabled'='true'
   'write.metadata.previous-versions-max'='5'
   	at org.apache.zeppelin.flink.FlinkSqlInterpreter.callInsertInto(FlinkSqlInterpreter.java:550)
   	at org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInsertInto(FlinkStreamSqlInterpreter.java:94)
   	at org.apache.zeppelin.flink.FlinkSqlInterpreter.callCommand(FlinkSqlInterpreter.java:264)
   	at org.apache.zeppelin.flink.FlinkSqlInterpreter.runSqlList(FlinkSqlInterpreter.java:151)
   	at org.apache.zeppelin.flink.FlinkSqlInterpreter.internalInterpret(FlinkSqlInterpreter.java:109)
   	at org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:55)
   	at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
   	at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:860)
   	at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:752)
   	at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
   	at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
   	at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   	at java.lang.Thread.run(Thread.java:748)
   Caused by: org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'hive.default.all_users_sink'.
   
   Table options are:
   
   'catalog-name'='hive_prod'
   'catalog-type'='hive'
   'connector'='iceberg'
   'engine.hive.enabled'='true'
   'format-version'='2'
   'uri'='thrift://xxxx:9083'
   'warehouse'='hdfs://xxxx/hive/warehouse'
   'write.metadata.delete-after-commit.enabled'='true'
   'write.metadata.previous-versions-max'='5'
   	at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:171)
   	at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:367)
   	at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:201)
   	at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:162)
   	at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:162)
   	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
   	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
   	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
   	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
   	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
   	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
   	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
   	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
   	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162)
   	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
   	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1510)
   	at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1460)
   	at org.apache.zeppelin.flink.FlinkSqlInterpreter.callInsertInto(FlinkSqlInterpreter.java:544)
   	... 14 more
   Caused by: org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'hive.default.all_users_sink'.
   
   Table options are:
   
   'catalog-name'='hive_prod'
   'catalog-type'='hive'
   'connector'='iceberg'
   'engine.hive.enabled'='true'
   'format-version'='2'
   'uri'='thrift://xxxx:9083'
   'warehouse'='hdfs://xxxx/hive/warehouse'
   'write.metadata.delete-after-commit.enabled'='true'
   'write.metadata.previous-versions-max'='5'
   	at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:171)
   	at org.apache.flink.connectors.hive.HiveDynamicTableFactory.createDynamicTableSink(HiveDynamicTableFactory.java:81)
   	at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:168)
   	... 31 more
   Caused by: org.apache.iceberg.exceptions.NoSuchIcebergTableException: Not an iceberg table: hive_prod.default.all_users_sink (type=null)
   	at org.apache.iceberg.exceptions.NoSuchIcebergTableException.check(NoSuchIcebergTableException.java:36)
   	at org.apache.iceberg.hive.HiveTableOperations.validateTableIsIceberg(HiveTableOperations.java:526)
   	at org.apache.iceberg.hive.HiveTableOperations.doRefresh(HiveTableOperations.java:189)
   	at org.apache.iceberg.BaseMetastoreTableOperations.refresh(BaseMetastoreTableOperations.java:95)
   	at org.apache.iceberg.BaseMetastoreTableOperations.current(BaseMetastoreTableOperations.java:78)
   	at org.apache.iceberg.BaseMetastoreCatalog$BaseMetastoreCatalogTableBuilder.create(BaseMetastoreCatalog.java:156)
   	at org.apache.iceberg.CachingCatalog$CachingTableBuilder.lambda$create$0(CachingCatalog.java:249)
   	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2344)
   	at java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1853)
   	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2342)
   	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2325)
   	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:108)
   	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:62)
   	at org.apache.iceberg.CachingCatalog$CachingTableBuilder.create(CachingCatalog.java:247)
   	at org.apache.iceberg.catalog.Catalog.createTable(Catalog.java:78)
   	at org.apache.iceberg.flink.FlinkCatalog.createIcebergTable(FlinkCatalog.java:390)
   	at org.apache.iceberg.flink.FlinkDynamicTableFactory.createTableLoader(FlinkDynamicTableFactory.java:187)
   	at org.apache.iceberg.flink.FlinkDynamicTableFactory.createDynamicTableSink(FlinkDynamicTableFactory.java:125)
   	at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:168)
   	... 33 more
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org