You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/09/14 09:20:27 UTC

[GitHub] [iceberg] mazhiyu123 opened a new issue #3114: after set 'write.upsert.enable'='true' in flink sql, using flink sql read iceberg table will get exception: java.lang.IllegalArgumentException: Row arity: 3, but serializer arity: 2

mazhiyu123 opened a new issue #3114:
URL: https://github.com/apache/iceberg/issues/3114


   flink version: 1.12
   iceberg version: master brach(2021-09-13)
   hadoop version: hadoop-2.6.0-cdh5.15.0
   
   create catalog:
   CREATE CATALOG hadoop_catalog WITH (
     'type'='iceberg',
     'catalog-type'='hadoop',
     'warehouse'='hdfs://xxx-hdfs/flink/tmp/iceberg_test',
     'property-version'='1'
   )
   ;
   
   create table:
   CREATE TABLE hadoop_catalog.iceberg_db.sample_test (
       id BIGINT COMMENT 'unique id',
       data STRING,
       PRIMARY KEY(id) NOT ENFORCED
   )
   WITH (
       'format-version'= '2',
       'write.upsert.enable'='true'
   )
   ;
   
   
   insert sql:
   INSERT INTO hadoop_catalog.iceberg_db.sample_test  VALUES (10, 'test10_U'), (11, 'test11'), (12, 'test12');
   INSERT INTO  hadoop_catalog.iceberg_db.sample_test  VALUES (12, 'test12_update');
   
   select sql:
   select * from hadoop_catalog.iceberg_db.sample_test;
   
   after run select sql, flink job will report an error:
   2021-09-14 17:13:14
   org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
   	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
   	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
   	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
   	at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
   	at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
   	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
   	at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
   	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
   	at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source)
   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.lang.reflect.Method.invoke(Method.java:498)
   	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
   	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
   	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
   	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
   	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
   	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
   	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
   	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
   	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
   	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
   	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
   	at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
   	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
   	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
   	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
   	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
   	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
   	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
   	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
   	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
   	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
   	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
   Caused by: java.lang.IllegalArgumentException: Row arity: 3, but serializer arity: 2
   	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:121)
   	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50)
   	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
   	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
   	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
   	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
   	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
   	at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
   	at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
   	at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:93)
   	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
   	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
   	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:215)
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on issue #3114: after set 'write.upsert.enable'='true' in flink sql, using flink sql read iceberg table will get exception: java.lang.IllegalArgumentException: Row arity: 3, but serializer arity: 2

Posted by GitBox <gi...@apache.org>.
openinx commented on issue #3114:
URL: https://github.com/apache/iceberg/issues/3114#issuecomment-918978509


   Hi @mazhiyu123 ,  I think you will need to apply this PR to your own branch: https://github.com/apache/iceberg/pull/2731


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on issue #3114: after set 'write.upsert.enable'='true' in flink sql, using flink sql read iceberg table will get exception: java.lang.IllegalArgumentException: Row arity: 3, but serializer arity: 2

Posted by GitBox <gi...@apache.org>.
openinx commented on issue #3114:
URL: https://github.com/apache/iceberg/issues/3114#issuecomment-919073754


   I've checked the behavior by using iceberg master branch (git commit-id: 838cc652273c1444155bec2e1d6029cfbdbf3ea3) and flink 1.12,  all work as expected: 
   
   ```sql
   CREATE TABLE iceberg_table_v2 (
       id   BIGINT,
       data STRING,
       PRIMARY KEY (id) NOT ENFORCED
   ) WITH (
        'connector'='iceberg',
        'format-version' = '2',
        'catalog-name'='hadoop_prod',
        'catalog-type'='hadoop',
        'write.upsert.enable'='true',
        'warehouse'='file:///Users/openinx/test/iceberg-warehouse'
   );
   
   select * from iceberg_table_v2;
   +----+------+
   | id | data |
   +----+------+
   0 row in set
   
   INSERT INTO iceberg_table_v2 VALUES (14, 'test14'), (14, 'test14_update');
   [INFO] Submitting SQL update statement to the cluster...
   [INFO] Table update statement has been successfully submitted to the cluster:
   Job ID: 083d3008e12bc533aacec0c2bdae9d0a
   
   select * from iceberg_table_v2;
   +----+---------------+
   | id |          data |
   +----+---------------+
   | 14 | test14_update |
   +----+---------------+
   1 row in set
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on issue #3114: after set 'write.upsert.enable'='true' in flink sql, using flink sql read iceberg table will get exception: java.lang.IllegalArgumentException: Row arity: 3, but serializer arity: 2

Posted by GitBox <gi...@apache.org>.
openinx commented on issue #3114:
URL: https://github.com/apache/iceberg/issues/3114#issuecomment-919064052


   I think you will need to rebase to use the latest master branch and enable the `'write.upsert.enable'='true'` from this [PR](https://github.com/apache/iceberg/pull/2863), which has enabled the upsert semantics in flink iceberg integration work.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx edited a comment on issue #3114: after set 'write.upsert.enable'='true' in flink sql, using flink sql read iceberg table will get exception: java.lang.IllegalArgumentException: Row arity: 3, but serializer arity: 2

Posted by GitBox <gi...@apache.org>.
openinx edited a comment on issue #3114:
URL: https://github.com/apache/iceberg/issues/3114#issuecomment-919073754


   I've checked the behavior by using iceberg master branch (git commit-id: 838cc652273c1444155bec2e1d6029cfbdbf3ea3) and flink 1.12,  all work as expected: 
   
   ```sql
   CREATE TABLE iceberg_table_v2 (
       id   BIGINT,
       data STRING,
       PRIMARY KEY (id) NOT ENFORCED
   ) WITH (
        'connector'='iceberg',
        'format-version' = '2',
        'catalog-name'='hadoop_prod',
        'catalog-type'='hadoop',
        'write.upsert.enable'='true',
        'warehouse'='file:///Users/openinx/test/iceberg-warehouse'
   );
   
   select * from iceberg_table_v2;
   +----+------+
   | id | data |
   +----+------+
   0 row in set
   
   INSERT INTO iceberg_table_v2 VALUES (14, 'test14'), (14, 'test14_update');
   [INFO] Submitting SQL update statement to the cluster...
   [INFO] Table update statement has been successfully submitted to the cluster:
   Job ID: 083d3008e12bc533aacec0c2bdae9d0a
   
   select * from iceberg_table_v2;
   +----+---------------+
   | id |          data |
   +----+---------------+
   | 14 | test14_update |
   +----+---------------+
   1 row in set
   
   INSERT INTO iceberg_table_v2 VALUES (14, 'test14_3');
   [INFO] Submitting SQL update statement to the cluster...
   [INFO] Table update statement has been successfully submitted to the cluster:
   Job ID: 9528163e8d06db33c090c16d9a858b45
   
   select * from iceberg_table_v2; 
   +----+----------+
   | id |     data |
   +----+----------+
   | 14 | test14_3 |
   +----+----------+
   1 row in set
   
   INSERT INTO iceberg_table_v2 VALUES (14, 'test14_4');
   [INFO] Submitting SQL update statement to the cluster...
   [INFO] Table update statement has been successfully submitted to the cluster:
   Job ID: 9543c0c29bc730ece8420066119f9abf
   
   select * from iceberg_table_v2;
   +----+----------+
   | id |     data |
   +----+----------+
   | 14 | test14_4 |
   +----+----------+
   1 row in set
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx closed issue #3114: after set 'write.upsert.enable'='true' in flink sql, using flink sql read iceberg table will get exception: java.lang.IllegalArgumentException: Row arity: 3, but serializer arity: 2

Posted by GitBox <gi...@apache.org>.
openinx closed issue #3114:
URL: https://github.com/apache/iceberg/issues/3114


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] mazhiyu123 commented on issue #3114: after set 'write.upsert.enable'='true' in flink sql, using flink sql read iceberg table will get exception: java.lang.IllegalArgumentException: Row arity: 3, but serializer arity: 2

Posted by GitBox <gi...@apache.org>.
mazhiyu123 commented on issue #3114:
URL: https://github.com/apache/iceberg/issues/3114#issuecomment-919006807


   > Hi @mazhiyu123 , I think you will need to apply this PR to your own branch: #2731
   
   thank you for replying soon!!!
   
   I already try this [pr(),](https://github.com/apache/iceberg/pull/2731) , but i will get a new situation:
   when i insert data  with same primary key on one sql, iceberg table will has duplicate data. such as:
    insert sql:
   INSERT INTO  hadoop_catalog.iceberg_db.sample_test  VALUES (14, 'test14'), (14, 'test14_update');
   
   select sql:
   select * from hadoop_catalog.iceberg_db.sample_test;
   
   expect : 
                           id                      data
                           14             test14_update
   
   but it return:
                           id                      data
                           14             test14_update
                           14                    test14
    


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on issue #3114: after set 'write.upsert.enable'='true' in flink sql, using flink sql read iceberg table will get exception: java.lang.IllegalArgumentException: Row arity: 3, but serializer arity: 2

Posted by GitBox <gi...@apache.org>.
openinx commented on issue #3114:
URL: https://github.com/apache/iceberg/issues/3114#issuecomment-919074210


   Close this issue now, feel free to reopen this if you have more questions !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] mazhiyu123 edited a comment on issue #3114: after set 'write.upsert.enable'='true' in flink sql, using flink sql read iceberg table will get exception: java.lang.IllegalArgumentException: Row arity: 3, but serializer arity: 2

Posted by GitBox <gi...@apache.org>.
mazhiyu123 edited a comment on issue #3114:
URL: https://github.com/apache/iceberg/issues/3114#issuecomment-919006807


   > Hi @mazhiyu123 , I think you will need to apply this PR to your own branch: #2731
   
   thank you for replying soon!!!
   
   I already try this [#2731](https://github.com/apache/iceberg/pull/2731) , but i will get a new situation:
   when i insert data  with same primary key on one sql, iceberg table will has duplicate data. such as:
    insert sql:
   INSERT INTO  hadoop_catalog.iceberg_db.sample_test  VALUES (14, 'test14'), (14, 'test14_update');
   
   select sql:
   select * from hadoop_catalog.iceberg_db.sample_test;
   
   expect : 
                           id                      data
                           14             test14_update
   
   but it return:
                           id                      data
                           14             test14_update
                           14                    test14
    


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org