You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "konwu (Jira)" <ji...@apache.org> on 2022/03/08 07:22:00 UTC
[jira] [Updated] (HUDI-3559) NoSuchElementException when use BUCKET index in flink cow table
[ https://issues.apache.org/jira/browse/HUDI-3559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
konwu updated HUDI-3559:
------------------------
Description:
*Environment:*
```sql
CREATE TABLE test_source (
userid int, ts TIMESTAMP(3)
) WITH (
'connector' = 'datagen',
'rows-per-second'='1',
'fields.userid.kind'='random',
'fields.userid.min'='1',
'fields.userid.max'='100'
);
CREATE TABLE hudi_hdfs_test(
userid int,
ts TIMESTAMP(3),
PRIMARY KEY (userid) NOT ENFORCED
)WITH (
'connector' = 'hudi',
'path' = '/tmp/hudi_011_2',
'table.type' = 'COPY_ON_WRITE',
'write.insert.drop.duplicates' = 'true',
'write.precombine.field' = 'ts',
'index.type'='BUCKET',
'hive_sync.enable' = 'true'
);
insert into hudi_hdfs_test SELECT * from test_source
```
*Exception*
```java
Caused by: java.util.NoSuchElementException: No value present in Option at org.apache.hudi.common.util.Option.get(Option.java:88) ~[hudi-flink-bundle_2.11-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT] at org.apache.hudi.io.HoodieMergeHandle.<init>(HoodieMergeHandle.java:116) ~[hudi-flink-bundle_2.11-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT] at org.apache.hudi.io.FlinkMergeHandle.<init>(FlinkMergeHandle.java:70) ~[hudi-flink-bundle_2.11-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT] at org.apache.hudi.client.HoodieFlinkWriteClient.getOrCreateWriteHandle(HoodieFlinkWriteClient.java:485) ~[hudi-flink-bundle_2.11-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT] at org.apache.hudi.client.HoodieFlinkWriteClient.upsert(HoodieFlinkWriteClient.java:142) ~[hudi-flink-bundle_2.11-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT]
```
*Root cause*
processElement in hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteFunction.java
```java
if (bucketToFileIDMap.containsKey(partitionBucketId))
{ location = new HoodieRecordLocation("U", bucketToFileIDMap.get(partitionBucketId)); }
else
{ String newFileId = BucketIdentifier.newBucketFileIdPrefix(bucketNum); location = new HoodieRecordLocation("I", newFileId); bucketToFileIDMap.put(partitionBucketId, newFileId); }
```
for Bucket Index Type , COW table will deduplicateRecords before merge , but deduplicateRecords Method in FlinkWriteHelper
maybe run out of order
was:
*Environment:*
```sql
CREATE TABLE test_source (
userid int, ts TIMESTAMP(3)
) WITH (
'connector' = 'datagen',
'rows-per-second'='1',
'fields.userid.kind'='random',
'fields.userid.min'='1',
'fields.userid.max'='100'
);
CREATE TABLE hudi_hdfs_test(
userid int,
ts TIMESTAMP(3),
PRIMARY KEY (userid) NOT ENFORCED
)WITH (
'connector' = 'hudi',
'path' = '/tmp/hudi_011_2',
'table.type' = 'COPY_ON_WRITE',
'write.insert.drop.duplicates' = 'true',
'write.precombine.field' = 'ts',
'index.type'='BUCKET',
'hive_sync.enable' = 'true'
);
insert into hudi_hdfs_test SELECT * from test_source
```
*Exception*
```java
Caused by: java.util.NoSuchElementException: No value present in Option at org.apache.hudi.common.util.Option.get(Option.java:88) ~[hudi-flink-bundle_2.11-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT] at org.apache.hudi.io.HoodieMergeHandle.<init>(HoodieMergeHandle.java:116) ~[hudi-flink-bundle_2.11-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT] at org.apache.hudi.io.FlinkMergeHandle.<init>(FlinkMergeHandle.java:70) ~[hudi-flink-bundle_2.11-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT] at org.apache.hudi.client.HoodieFlinkWriteClient.getOrCreateWriteHandle(HoodieFlinkWriteClient.java:485) ~[hudi-flink-bundle_2.11-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT] at org.apache.hudi.client.HoodieFlinkWriteClient.upsert(HoodieFlinkWriteClient.java:142) ~[hudi-flink-bundle_2.11-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT]
```
*Root cause*
processElement in hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteFunction.java
```java
if (bucketToFileIDMap.containsKey(partitionBucketId)) {
location = new HoodieRecordLocation("U", bucketToFileIDMap.get(partitionBucketId));
} else {
String newFileId = BucketIdentifier.newBucketFileIdPrefix(bucketNum);
location = new HoodieRecordLocation("I", newFileId);
bucketToFileIDMap.put(partitionBucketId, newFileId);
}
```
With this logical if the `newFileId` in `bucketToFileIDMap` was created just before first commit , it use MergeHandle Then Throw this Exception ,because of no baseFile to Merge
> NoSuchElementException when use BUCKET index in flink cow table
> ---------------------------------------------------------------
>
> Key: HUDI-3559
> URL: https://issues.apache.org/jira/browse/HUDI-3559
> Project: Apache Hudi
> Issue Type: Bug
> Components: flink
> Reporter: konwu
> Priority: Major
> Fix For: 0.11.0
>
>
> *Environment:*
> ```sql
> CREATE TABLE test_source (
> userid int, ts TIMESTAMP(3)
> ) WITH (
> 'connector' = 'datagen',
> 'rows-per-second'='1',
> 'fields.userid.kind'='random',
> 'fields.userid.min'='1',
> 'fields.userid.max'='100'
> );
> CREATE TABLE hudi_hdfs_test(
> userid int,
> ts TIMESTAMP(3),
> PRIMARY KEY (userid) NOT ENFORCED
> )WITH (
> 'connector' = 'hudi',
> 'path' = '/tmp/hudi_011_2',
> 'table.type' = 'COPY_ON_WRITE',
> 'write.insert.drop.duplicates' = 'true',
> 'write.precombine.field' = 'ts',
> 'index.type'='BUCKET',
> 'hive_sync.enable' = 'true'
> );
> insert into hudi_hdfs_test SELECT * from test_source
> ```
> *Exception*
> ```java
> Caused by: java.util.NoSuchElementException: No value present in Option at org.apache.hudi.common.util.Option.get(Option.java:88) ~[hudi-flink-bundle_2.11-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT] at org.apache.hudi.io.HoodieMergeHandle.<init>(HoodieMergeHandle.java:116) ~[hudi-flink-bundle_2.11-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT] at org.apache.hudi.io.FlinkMergeHandle.<init>(FlinkMergeHandle.java:70) ~[hudi-flink-bundle_2.11-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT] at org.apache.hudi.client.HoodieFlinkWriteClient.getOrCreateWriteHandle(HoodieFlinkWriteClient.java:485) ~[hudi-flink-bundle_2.11-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT] at org.apache.hudi.client.HoodieFlinkWriteClient.upsert(HoodieFlinkWriteClient.java:142) ~[hudi-flink-bundle_2.11-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT]
> ```
> *Root cause*
> processElement in hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteFunction.java
> ```java
> if (bucketToFileIDMap.containsKey(partitionBucketId))
> { location = new HoodieRecordLocation("U", bucketToFileIDMap.get(partitionBucketId)); }
> else
> { String newFileId = BucketIdentifier.newBucketFileIdPrefix(bucketNum); location = new HoodieRecordLocation("I", newFileId); bucketToFileIDMap.put(partitionBucketId, newFileId); }
> ```
> for Bucket Index Type , COW table will deduplicateRecords before merge , but deduplicateRecords Method in FlinkWriteHelper
> maybe run out of order
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)