You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "mumuhhh (via GitHub)" <gi...@apache.org> on 2023/06/23 22:51:29 UTC

[GitHub] [iceberg] mumuhhh commented on a diff in pull request #7866: Flink: fix TableSink anonymous object

mumuhhh commented on code in PR #7866:
URL: https://github.com/apache/iceberg/pull/7866#discussion_r1240482278


##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java:
##########
@@ -105,18 +105,21 @@ public DynamicTableSource createDynamicTableSource(Context context) {
 
   @Override
   public DynamicTableSink createDynamicTableSink(Context context) {
-    ObjectPath objectPath = context.getObjectIdentifier().toObjectPath();
+    ObjectIdentifier objectIdentifier = context.getObjectIdentifier();

Review Comment:
   ```
           File warehouseDir = Files.createTempDir();
           EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
   
           TableEnvironment tEnv = TableEnvironment.create(settings);
           Table table =
                   tEnv.from(
                           TableDescriptor.forConnector("datagen")
                                   .schema(
                                           Schema.newBuilder()
                                                   .column("f0", DataTypes.STRING())
                                                   .build())
                                   .option("number-of-rows", "3")
                                   .build());
   
           TableDescriptor descriptor =
                   TableDescriptor.forConnector("iceberg")
                           .schema(Schema.newBuilder().column("f0", DataTypes.STRING()).build())
                           .option("catalog-name", "hadoop_test")
                           .option("catalog-type", "hadoop")
                           .option("catalog-database", "test_db")
                           .option("catalog-table", "test")
                           .option("warehouse", warehouseDir.getAbsolutePath())
                           .build();
   
           table.insertInto(descriptor).execute();
   ```
   
   ```
   Unable to create a sink for writing table '*anonymous_iceberg$2*'.
   
   Table options are:
   
   'catalog-database'='test_db'
   'catalog-name'='hadoop_test'
   'catalog-table'='test'
   'catalog-type'='hadoop'
   'connector'='iceberg'
   'warehouse'='C:\Users\huawei\AppData\Local\Temp\1687560451312-0'
   org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table '*anonymous_iceberg$2*'.
   
   Table options are:
   
   'catalog-database'='test_db'
   'catalog-name'='hadoop_test'
   'catalog-table'='test'
   'catalog-type'='hadoop'
   'connector'='iceberg'
   'warehouse'='C:\Users\huawei\AppData\Local\Temp\1687560451312-0'
   	at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:262)
   	at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:434)
   	at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:227)
   	at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:185)
   	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
   	at scala.collection.Iterator.foreach(Iterator.scala:943)
   	at scala.collection.Iterator.foreach$(Iterator.scala:943)
   	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
   	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
   	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
   	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
   	at scala.collection.TraversableLike.map(TraversableLike.scala:285)
   	at scala.collection.TraversableLike.map$(TraversableLike.scala:278)
   	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
   	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185)
   	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1656)
   	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:782)
   	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:861)
   	at org.apache.flink.table.api.internal.TablePipelineImpl.execute(TablePipelineImpl.java:56)
   	at com.beagledata.flowingx.test.TestFlinkAnonymousTable.testWriteAnonymousTable(TestFlinkAnonymousTable.java:48)
   	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.lang.reflect.Method.invoke(Method.java:498)
   	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
   	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
   	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
   	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
   	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
   	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
   	at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
   	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
   	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
   	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
   	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
   	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
   	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
   	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
   	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
   	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
   	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
   	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
   	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
   	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
   	at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
   	at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
   	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.lang.reflect.Method.invoke(Method.java:498)
   	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
   	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
   	at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
   	at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
   	at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
   	at org.gradle.api.internal.tasks.testing.worker.TestWorker$2.run(TestWorker.java:176)
   	at org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129)
   	at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100)
   	at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60)
   	at org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
   	at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:133)
   	at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:71)
   	at worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
   	at worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)
   Caused by: org.apache.flink.table.api.TableException: This ObjectIdentifier instance refers to an anonymous object, hence it cannot be converted to ObjectPath and cannot be serialized.
   	at org.apache.flink.table.catalog.ObjectIdentifier.toObjectPath(ObjectIdentifier.java:112)
   	at org.apache.iceberg.flink.FlinkDynamicTableFactory.createDynamicTableSink(FlinkDynamicTableFactory.java:108)
   	at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:259)
   	... 63 more
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org