You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/02/11 10:04:50 UTC
[GitHub] [iceberg] gcnote opened a new issue #4092: Not an iceberg table Error use hive catalog-type
gcnote opened a new issue #4092:
URL: https://github.com/apache/iceberg/issues/4092
test flink-1.13.5 sql cdc mysql to iceberg with zeppelin-0.11.0
jars in flink lib:
flink-sql-connector-hive-3.1.2_2.11-1.13.5.jar
flink-sql-connector-mysql-cdc-2.1.1.jar
hadoop-mapreduce-client-core-3.1.3.jar
iceberg-flink-runtime-1.13-0.13.0.jar
step1:
%flink.conf
execution.checkpointing.interval 3000
execution.checkpointing.externalized-checkpoint-retention RETAIN_ON_CANCELLATION
state.backend filesystem
state.checkpoints.dir hdfs://xxxx/test/flink-cdc-checkpoint
execution.type streaming
step2:
%flink.ssql
drop table if exists user_source;
CREATE TABLE user_source (
database_name STRING METADATA VIRTUAL,
table_name STRING METADATA VIRTUAL,
`id` int NOT NULL,
name STRING,
address STRING,
phone_number STRING,
email STRING,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'xxxx',
'port' = '3306',
'username' = 'xxxx',
'password' = 'xxxx',
'database-name' = 'test',
'table-name' = 'cdc_user_[0-9]+'
);
step3:
%flink.ssql
drop table if exists all_users_sink;
CREATE TABLE all_users_sink (
database_name STRING,
table_name STRING,
`id` int NOT NULL,
name STRING,
address STRING,
phone_number STRING,
email STRING,
PRIMARY KEY (database_name, table_name, `id`) NOT ENFORCED
) WITH (
'connector'='iceberg',
'catalog-name'='hive_prod',
'catalog-type'='hive',
'uri'='thrift://xxxx:9083',
'warehouse'='hdfs://xxxx/hive/warehouse',
'format-version'='2',
'engine.hive.enabled' = 'true',
'write.metadata.delete-after-commit.enabled'='true',
'write.metadata.previous-versions-max'='5'
);
step4:
%flink.ssql
INSERT INTO all_users_sink select * from user_source;
step4 throw exception as below:
Fail to run sql command: INSERT INTO all_users_sink select * from user_source
java.io.IOException: org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'hive.default.all_users_sink'.
Table options are:
'catalog-name'='hive_prod'
'catalog-type'='hive'
'connector'='iceberg'
'engine.hive.enabled'='true'
'format-version'='2'
'uri'='thrift://xxxx:9083'
'warehouse'='hdfs://xxxx/hive/warehouse'
'write.metadata.delete-after-commit.enabled'='true'
'write.metadata.previous-versions-max'='5'
at org.apache.zeppelin.flink.FlinkSqlInterpreter.callInsertInto(FlinkSqlInterpreter.java:550)
at org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInsertInto(FlinkStreamSqlInterpreter.java:94)
at org.apache.zeppelin.flink.FlinkSqlInterpreter.callCommand(FlinkSqlInterpreter.java:264)
at org.apache.zeppelin.flink.FlinkSqlInterpreter.runSqlList(FlinkSqlInterpreter.java:151)
at org.apache.zeppelin.flink.FlinkSqlInterpreter.internalInterpret(FlinkSqlInterpreter.java:109)
at org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:55)
at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:860)
at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:752)
at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'hive.default.all_users_sink'.
Table options are:
'catalog-name'='hive_prod'
'catalog-type'='hive'
'connector'='iceberg'
'engine.hive.enabled'='true'
'format-version'='2'
'uri'='thrift://xxxx:9083'
'warehouse'='hdfs://xxxx/hive/warehouse'
'write.metadata.delete-after-commit.enabled'='true'
'write.metadata.previous-versions-max'='5'
at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:171)
at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:367)
at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:201)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:162)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:162)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1510)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1460)
at org.apache.zeppelin.flink.FlinkSqlInterpreter.callInsertInto(FlinkSqlInterpreter.java:544)
... 14 more
Caused by: org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'hive.default.all_users_sink'.
Table options are:
'catalog-name'='hive_prod'
'catalog-type'='hive'
'connector'='iceberg'
'engine.hive.enabled'='true'
'format-version'='2'
'uri'='thrift://xxxx:9083'
'warehouse'='hdfs://xxxx/hive/warehouse'
'write.metadata.delete-after-commit.enabled'='true'
'write.metadata.previous-versions-max'='5'
at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:171)
at org.apache.flink.connectors.hive.HiveDynamicTableFactory.createDynamicTableSink(HiveDynamicTableFactory.java:81)
at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:168)
... 31 more
Caused by: org.apache.iceberg.exceptions.NoSuchIcebergTableException: Not an iceberg table: hive_prod.default.all_users_sink (type=null)
at org.apache.iceberg.exceptions.NoSuchIcebergTableException.check(NoSuchIcebergTableException.java:36)
at org.apache.iceberg.hive.HiveTableOperations.validateTableIsIceberg(HiveTableOperations.java:526)
at org.apache.iceberg.hive.HiveTableOperations.doRefresh(HiveTableOperations.java:189)
at org.apache.iceberg.BaseMetastoreTableOperations.refresh(BaseMetastoreTableOperations.java:95)
at org.apache.iceberg.BaseMetastoreTableOperations.current(BaseMetastoreTableOperations.java:78)
at org.apache.iceberg.BaseMetastoreCatalog$BaseMetastoreCatalogTableBuilder.create(BaseMetastoreCatalog.java:156)
at org.apache.iceberg.CachingCatalog$CachingTableBuilder.lambda$create$0(CachingCatalog.java:249)
at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2344)
at java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1853)
at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2342)
at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2325)
at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:108)
at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:62)
at org.apache.iceberg.CachingCatalog$CachingTableBuilder.create(CachingCatalog.java:247)
at org.apache.iceberg.catalog.Catalog.createTable(Catalog.java:78)
at org.apache.iceberg.flink.FlinkCatalog.createIcebergTable(FlinkCatalog.java:390)
at org.apache.iceberg.flink.FlinkDynamicTableFactory.createTableLoader(FlinkDynamicTableFactory.java:187)
at org.apache.iceberg.flink.FlinkDynamicTableFactory.createDynamicTableSink(FlinkDynamicTableFactory.java:125)
at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:168)
... 33 more
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org