You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@iceberg.apache.org by "Hailu, Andreas" <An...@gs.com> on 2022/04/21 00:50:00 UTC

NullPointerException during scan on V2 Table post row deletion

Hi folks,

I'm trying to test out record deletions on a V2 table but I seem to be corrupting the Table and receive a NullPointerException trying to read the Table post deletion.

I have a main Table 'T1' containing 2 records, and another Table 'T2' that contains 1 record from T1 that I want to delete. I create a DeleteFile out of T2 using the EqualityDeleteWriter, and then apply the RowDelta to T1 with this DeleteFile. Post this sequence, this exception is thrown trying to read:

Caused by: java.lang.NullPointerException
        at org.apache.iceberg.DeleteFileIndex.canContainEqDeletesForFile(DeleteFileIndex.java:193)
        at org.apache.iceberg.DeleteFileIndex.canContainDeletesForFile(DeleteFileIndex.java:144)
        at org.apache.iceberg.DeleteFileIndex.lambda$forDataFile$3(DeleteFileIndex.java:134)
        at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:174)
        at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
        at java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:742)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
        at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545)
        at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
        at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
       at org.apache.iceberg.DeleteFileIndex.forDataFile(DeleteFileIndex.java:135)
        at org.apache.iceberg.DeleteFileIndex.forEntry(DeleteFileIndex.java:115)
        at org.apache.iceberg.ManifestGroup.lambda$planFiles$6(ManifestGroup.java:181)
        at org.apache.iceberg.io.CloseableIterable$4$1.next(CloseableIterable.java:113)
        at org.apache.iceberg.io.CloseableIterable$ConcatCloseableIterable$ConcatCloseableIterator.next(CloseableIterable.java:206)
        at org.apache.iceberg.relocated.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
        at org.apache.iceberg.relocated.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
        at org.apache.iceberg.relocated.com.google.common.collect.Iterators$ConcatenatedIterator.hasNext(Iterators.java:1330)
        at org.apache.iceberg.io.CloseableIterator$1.hasNext(CloseableIterator.java:50)
        at org.apache.iceberg.util.BinPacking$PackingIterator.hasNext(BinPacking.java:106)
        at org.apache.iceberg.io.CloseableIterator$1.hasNext(CloseableIterator.java:50)
        at org.apache.iceberg.io.CloseableIterable$4$1.hasNext(CloseableIterable.java:108)
        at org.apache.iceberg.relocated.com.google.common.collect.TransformedIterator.hasNext(TransformedIterator.java:42)
        at org.apache.iceberg.relocated.com.google.common.collect.TransformedIterator.hasNext(TransformedIterator.java:42)
        at org.apache.iceberg.relocated.com.google.common.collect.Iterators$ConcatenatedIterator.getTopMetaIterator(Iterators.java:1309)
        at org.apache.iceberg.relocated.com.google.common.collect.Iterators$ConcatenatedIterator.hasNext(Iterators.java:1325)
        at org.apache.iceberg.relocated.com.google.common.collect.TransformedIterator.hasNext(TransformedIterator.java:42)
        at org.apache.iceberg.io.CloseableIterable.concat(CloseableIterable.java:122)
        at org.apache.iceberg.data.GenericReader.open(GenericReader.java:65)
        at org.apache.iceberg.data.TableScanIterable.iterator(TableScanIterable.java:41)
        at com.gs.ep.da.lake.refinerlib.flink.iceberg.poc.IcebergApplyDeltas.logRecordsInTable(IcebergApplyDeltas.java:111)

I'm reading the table through IcebergGenerics.read() API, which works fine prior to this. Is there something wrong with this approach? I've included my code snippet below.

// ...
Table deleteTable = catalog.createTable(deleteDeltaTableIdentifier, icebergSchema, null, properties);
DataStream<GenericRecord> recordsForDelete = readParquet(getStreamExecutionEnvironment(), avroSchema, deltaDeletesLocation);
writeToTable(recordsForDelete, avroSchema, icebergSchema, getTableLoader(deleteTable), deleteTable); // writeToTable() writes GenericRecords to Table through Iceberg Flink API
deleteTable = catalog.loadTable(deleteDeltaTableIdentifier); // This Table contains single record looking to delete from dataTable initialized later

TableIdentifier dataTableName = TableIdentifier.of(icebergProofOfConceptNamespace, getIcebergTableName());
Table dataTable = catalog.loadTable(dataTableName); // Main Table consiting of 2 records, 1 to be deleted
logger.info("Logging records before update");
logRecordsInTable(dataTable); // Works fine

logger.info("Logging records for deletion");
logRecordsInTable(deleteTable); // Works fine
OutputFileFactory fileFactory = OutputFileFactory.builderFor(dataTable, 1, 1).build();
OutputFile deleteOutputFile = fileFactory.newOutputFile().encryptingOutputFile();
logger.info("Performing deletes");
EqualityDeleteWriter<Record> deleteWriter = Parquet.writeDeletes(deleteOutputFile)
    .forTable(dataTable)
    .createWriterFunc(GenericParquetWriter::buildWriter)
    .overwrite()
    .equalityFieldIds(icebergSchema.columns().stream().mapToInt(Types.NestedField::fieldId).toArray())
    .buildEqualityWriter();
try(Closeable closeable = deleteWriter; CloseableIterable<Record> deletes = IcebergGenerics.read(deleteTable).build()) {
    deleteWriter.write(deletes);
}
DeleteFile deletes = deleteWriter.toDeleteFile();
Transaction deltaTransaction = dataTable.newTransaction();
deltaTransaction.newRowDelta()
    .addDeletes(deletes)
    .commit();
deltaTransaction.commitTransaction();

logger.info("Transaction complete");
logger.info("Logging records post update");
logRecordsInTable(dataTable); // Results in NPE
// ...

private void logRecordsInTable(Table dataTable) throws IOException {
    try(CloseableIterable<Record> postDeletionRes = IcebergGenerics.read(dataTable).build()) {
        List<String> columnNames = dataTable.schema().columns().stream().map(Types.NestedField::name).collect(Collectors.toList());
        for(Record res : postDeletionRes) {
            StringBuilder sb = new StringBuilder().append("[ ");
            columnNames.forEach(name -> {
            sb.append(name)
                .append(": ")
                .append(res.getField(name))
                .append(", ");
           });

            sb.append("]");
            logger.info(sb.toString());
        }
    }
}


best,
ah


________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>

RE: NullPointerException during scan on V2 Table post row deletion

Posted by "Hailu, Andreas" <An...@gs.com>.
You’re on the money – the kind folks in the Slack pointed me in the right direction on this thread and it was just as you said. Once I swapped the schema from the Iceberg to the Table, everything worked as expected.

Cheers.

ah

From: Ryan Blue <bl...@tabular.io>
Sent: Thursday, April 21, 2022 7:21 PM
To: Iceberg Dev List <de...@iceberg.apache.org>
Subject: Re: NullPointerException during scan on V2 Table post row deletion

Andreas,

I think the problem is that you're using the IDs from the schema you used to create the delete table, not the IDs from the data table itself. If I'm right, you should be able to fix this by changing how you are setting `equalityFieldIds` to use `dataTable.schema().select(...)` instead of `icebergSchema`.

When a table is created, Iceberg reassigns all of the IDs in the incoming schema to make sure they're consistent. So if you've created a schema using IDs other than the ones that would be automatically assigned (for example, starting at 0) then the IDs will be different. That would explain the failure at DeleteFileIndex.java:193, which is looking up the equality field IDs in the table schema. I'm not sure how you've created these schemas or what their relationships are, though, so I'm mostly speculating.

The important thing is that you always want to use a table's schema rather than the schema you used to create the table or another table's schema.

Ryan

On Wed, Apr 20, 2022 at 5:50 PM Hailu, Andreas <An...@gs.com>> wrote:
Hi folks,

I’m trying to test out record deletions on a V2 table but I seem to be corrupting the Table and receive a NullPointerException trying to read the Table post deletion.

I have a main Table ‘T1’ containing 2 records, and another Table ‘T2’ that contains 1 record from T1 that I want to delete. I create a DeleteFile out of T2 using the EqualityDeleteWriter, and then apply the RowDelta to T1 with this DeleteFile. Post this sequence, this exception is thrown trying to read:

Caused by: java.lang.NullPointerException
        at org.apache.iceberg.DeleteFileIndex.canContainEqDeletesForFile(DeleteFileIndex.java:193)
        at org.apache.iceberg.DeleteFileIndex.canContainDeletesForFile(DeleteFileIndex.java:144)
        at org.apache.iceberg.DeleteFileIndex.lambda$forDataFile$3(DeleteFileIndex.java:134)
        at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:174)
        at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
        at java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:742)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
        at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545)
        at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
        at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
       at org.apache.iceberg.DeleteFileIndex.forDataFile(DeleteFileIndex.java:135)
        at org.apache.iceberg.DeleteFileIndex.forEntry(DeleteFileIndex.java:115)
        at org.apache.iceberg.ManifestGroup.lambda$planFiles$6(ManifestGroup.java:181)
        at org.apache.iceberg.io.CloseableIterable$4$1.next(CloseableIterable.java:113)
        at org.apache.iceberg.io.CloseableIterable$ConcatCloseableIterable$ConcatCloseableIterator.next(CloseableIterable.java:206)
        at org.apache.iceberg.relocated.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
        at org.apache.iceberg.relocated.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
        at org.apache.iceberg.relocated.com.google.common.collect.Iterators$ConcatenatedIterator.hasNext(Iterators.java:1330)
        at org.apache.iceberg.io.CloseableIterator$1.hasNext(CloseableIterator.java:50)
        at org.apache.iceberg.util.BinPacking$PackingIterator.hasNext(BinPacking.java:106)
        at org.apache.iceberg.io.CloseableIterator$1.hasNext(CloseableIterator.java:50)
        at org.apache.iceberg.io.CloseableIterable$4$1.hasNext(CloseableIterable.java:108)
        at org.apache.iceberg.relocated.com.google.common.collect.TransformedIterator.hasNext(TransformedIterator.java:42)
        at org.apache.iceberg.relocated.com.google.common.collect.TransformedIterator.hasNext(TransformedIterator.java:42)
        at org.apache.iceberg.relocated.com.google.common.collect.Iterators$ConcatenatedIterator.getTopMetaIterator(Iterators.java:1309)
        at org.apache.iceberg.relocated.com.google.common.collect.Iterators$ConcatenatedIterator.hasNext(Iterators.java:1325)
        at org.apache.iceberg.relocated.com.google.common.collect.TransformedIterator.hasNext(TransformedIterator.java:42)
        at org.apache.iceberg.io.CloseableIterable.concat(CloseableIterable.java:122)
        at org.apache.iceberg.data.GenericReader.open(GenericReader.java:65)
        at org.apache.iceberg.data.TableScanIterable.iterator(TableScanIterable.java:41)
        at com.gs.ep.da.lake.refinerlib.flink.iceberg.poc.IcebergApplyDeltas.logRecordsInTable(IcebergApplyDeltas.java:111)

I’m reading the table through IcebergGenerics.read() API, which works fine prior to this. Is there something wrong with this approach? I’ve included my code snippet below.

// ...
Table deleteTable = catalog.createTable(deleteDeltaTableIdentifier, icebergSchema, null, properties);
DataStream<GenericRecord> recordsForDelete = readParquet(getStreamExecutionEnvironment(), avroSchema, deltaDeletesLocation);
writeToTable(recordsForDelete, avroSchema, icebergSchema, getTableLoader(deleteTable), deleteTable); // writeToTable() writes GenericRecords to Table through Iceberg Flink API
deleteTable = catalog.loadTable(deleteDeltaTableIdentifier); // This Table contains single record looking to delete from dataTable initialized later

TableIdentifier dataTableName = TableIdentifier.of(icebergProofOfConceptNamespace, getIcebergTableName());
Table dataTable = catalog.loadTable(dataTableName); // Main Table consiting of 2 records, 1 to be deleted
logger.info<https://urldefense.proofpoint.com/v2/url?u=http-3A__logger.info&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=KgwQhIXJEVtegDxTCCBQxaPkfBxyg5oDFZBHXTzj3tE&s=9mIlwvUJj9cvvbMYhrE8gYoIgJapuSG35wm4FlGEuLI&e=>("Logging records before update");
logRecordsInTable(dataTable); // Works fine

logger.info<https://urldefense.proofpoint.com/v2/url?u=http-3A__logger.info&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=KgwQhIXJEVtegDxTCCBQxaPkfBxyg5oDFZBHXTzj3tE&s=9mIlwvUJj9cvvbMYhrE8gYoIgJapuSG35wm4FlGEuLI&e=>("Logging records for deletion");
logRecordsInTable(deleteTable); // Works fine
OutputFileFactory fileFactory = OutputFileFactory.builderFor(dataTable, 1, 1).build();
OutputFile deleteOutputFile = fileFactory.newOutputFile().encryptingOutputFile();
logger.info<https://urldefense.proofpoint.com/v2/url?u=http-3A__logger.info&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=KgwQhIXJEVtegDxTCCBQxaPkfBxyg5oDFZBHXTzj3tE&s=9mIlwvUJj9cvvbMYhrE8gYoIgJapuSG35wm4FlGEuLI&e=>("Performing deletes");
EqualityDeleteWriter<Record> deleteWriter = Parquet.writeDeletes(deleteOutputFile)
    .forTable(dataTable)
    .createWriterFunc(GenericParquetWriter::buildWriter)
    .overwrite()
    .equalityFieldIds(icebergSchema.columns().stream().mapToInt(Types.NestedField::fieldId).toArray())
    .buildEqualityWriter();
try(Closeable closeable = deleteWriter; CloseableIterable<Record> deletes = IcebergGenerics.read(deleteTable).build()) {
    deleteWriter.write(deletes);
}
DeleteFile deletes = deleteWriter.toDeleteFile();
Transaction deltaTransaction = dataTable.newTransaction();
deltaTransaction.newRowDelta()
    .addDeletes(deletes)
    .commit();
deltaTransaction.commitTransaction();

logger.info<https://urldefense.proofpoint.com/v2/url?u=http-3A__logger.info&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=KgwQhIXJEVtegDxTCCBQxaPkfBxyg5oDFZBHXTzj3tE&s=9mIlwvUJj9cvvbMYhrE8gYoIgJapuSG35wm4FlGEuLI&e=>("Transaction complete");
logger.info<https://urldefense.proofpoint.com/v2/url?u=http-3A__logger.info&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=KgwQhIXJEVtegDxTCCBQxaPkfBxyg5oDFZBHXTzj3tE&s=9mIlwvUJj9cvvbMYhrE8gYoIgJapuSG35wm4FlGEuLI&e=>("Logging records post update");
logRecordsInTable(dataTable); // Results in NPE
// ...

private void logRecordsInTable(Table dataTable) throws IOException {
    try(CloseableIterable<Record> postDeletionRes = IcebergGenerics.read(dataTable).build()) {
        List<String> columnNames = dataTable.schema().columns().stream().map(Types.NestedField::name).collect(Collectors.toList());
        for(Record res : postDeletionRes) {
            StringBuilder sb = new StringBuilder().append("[ ");
            columnNames.forEach(name -> {
            sb.append(name)
                .append(": ")
                .append(res.getField(name))
                .append(", ");
           });

            sb.append("]");
            logger.info<https://urldefense.proofpoint.com/v2/url?u=http-3A__logger.info&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=KgwQhIXJEVtegDxTCCBQxaPkfBxyg5oDFZBHXTzj3tE&s=9mIlwvUJj9cvvbMYhrE8gYoIgJapuSG35wm4FlGEuLI&e=>(sb.toString());
        }
    }
}


best,
ah


________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>


--
Ryan Blue
Tabular

________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>

Re: NullPointerException during scan on V2 Table post row deletion

Posted by Ryan Blue <bl...@tabular.io>.
Andreas,

I think the problem is that you're using the IDs from the schema you used
to create the delete table, not the IDs from the data table itself. If I'm
right, you should be able to fix this by changing how you are setting
`equalityFieldIds` to use `dataTable.schema().select(...)` instead of
`icebergSchema`.

When a table is created, Iceberg reassigns all of the IDs in the incoming
schema to make sure they're consistent. So if you've created a schema using
IDs other than the ones that would be automatically assigned (for example,
starting at 0) then the IDs will be different. That would explain the
failure at DeleteFileIndex.java:193, which is looking up the equality field
IDs in the table schema. I'm not sure how you've created these schemas or
what their relationships are, though, so I'm mostly speculating.

The important thing is that you always want to use a table's schema rather
than the schema you used to create the table or another table's schema.

Ryan

On Wed, Apr 20, 2022 at 5:50 PM Hailu, Andreas <An...@gs.com> wrote:

> Hi folks,
>
>
>
> I’m trying to test out record deletions on a V2 table but I seem to be
> corrupting the Table and receive a NullPointerException trying to read the
> Table post deletion.
>
>
>
> I have a main Table ‘T1’ containing 2 records, and another Table ‘T2’ that
> contains 1 record from T1 that I want to delete. I create a DeleteFile out
> of T2 using the EqualityDeleteWriter, and then apply the RowDelta to T1
> with this DeleteFile. Post this sequence, this exception is thrown trying
> to read:
>
>
>
> Caused by: java.lang.NullPointerException
>
>         at
> org.apache.iceberg.DeleteFileIndex.canContainEqDeletesForFile(DeleteFileIndex.java:193)
>
>         at
> org.apache.iceberg.DeleteFileIndex.canContainDeletesForFile(DeleteFileIndex.java:144)
>
>         at
> org.apache.iceberg.DeleteFileIndex.lambda$forDataFile$3(DeleteFileIndex.java:134)
>
>         at
> java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:174)
>
>         at
> java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
>
>         at
> java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:742)
>
>         at
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>
>         at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>
>         at
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545)
>
>         at
> java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
>
>         at
> java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
>
>        at
> org.apache.iceberg.DeleteFileIndex.forDataFile(DeleteFileIndex.java:135)
>
>         at
> org.apache.iceberg.DeleteFileIndex.forEntry(DeleteFileIndex.java:115)
>
>         at
> org.apache.iceberg.ManifestGroup.lambda$planFiles$6(ManifestGroup.java:181)
>
>         at
> org.apache.iceberg.io.CloseableIterable$4$1.next(CloseableIterable.java:113)
>
>         at
> org.apache.iceberg.io.CloseableIterable$ConcatCloseableIterable$ConcatCloseableIterator.next(CloseableIterable.java:206)
>
>         at
> org.apache.iceberg.relocated.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
>
>         at
> org.apache.iceberg.relocated.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
>
>         at
> org.apache.iceberg.relocated.com.google.common.collect.Iterators$ConcatenatedIterator.hasNext(Iterators.java:1330)
>
>         at
> org.apache.iceberg.io.CloseableIterator$1.hasNext(CloseableIterator.java:50)
>
>         at
> org.apache.iceberg.util.BinPacking$PackingIterator.hasNext(BinPacking.java:106)
>
>         at
> org.apache.iceberg.io.CloseableIterator$1.hasNext(CloseableIterator.java:50)
>
>         at
> org.apache.iceberg.io.CloseableIterable$4$1.hasNext(CloseableIterable.java:108)
>
>         at
> org.apache.iceberg.relocated.com.google.common.collect.TransformedIterator.hasNext(TransformedIterator.java:42)
>
>         at
> org.apache.iceberg.relocated.com.google.common.collect.TransformedIterator.hasNext(TransformedIterator.java:42)
>
>         at
> org.apache.iceberg.relocated.com.google.common.collect.Iterators$ConcatenatedIterator.getTopMetaIterator(Iterators.java:1309)
>
>         at
> org.apache.iceberg.relocated.com.google.common.collect.Iterators$ConcatenatedIterator.hasNext(Iterators.java:1325)
>
>         at
> org.apache.iceberg.relocated.com.google.common.collect.TransformedIterator.hasNext(TransformedIterator.java:42)
>
>         at
> org.apache.iceberg.io.CloseableIterable.concat(CloseableIterable.java:122)
>
>         at
> org.apache.iceberg.data.GenericReader.open(GenericReader.java:65)
>
>         at
> org.apache.iceberg.data.TableScanIterable.iterator(TableScanIterable.java:41)
>
>         at
> com.gs.ep.da.lake.refinerlib.flink.iceberg.poc.IcebergApplyDeltas.logRecordsInTable(IcebergApplyDeltas.java:111)
>
>
>
> I’m reading the table through IcebergGenerics.read() API, which works fine
> prior to this. Is there something wrong with this approach? I’ve included
> my code snippet below.
>
>
>
> // ...
>
> Table deleteTable = catalog.createTable(deleteDeltaTableIdentifier,
> icebergSchema, null, properties);
>
> DataStream<GenericRecord> recordsForDelete =
> readParquet(getStreamExecutionEnvironment(), avroSchema,
> deltaDeletesLocation);
>
> writeToTable(recordsForDelete, avroSchema, icebergSchema,
> getTableLoader(deleteTable), deleteTable); // writeToTable() writes
> GenericRecords to Table through Iceberg Flink API
>
> deleteTable = catalog.loadTable(deleteDeltaTableIdentifier); // This Table
> contains single record looking to delete from dataTable initialized later
>
>
>
> TableIdentifier dataTableName =
> TableIdentifier.of(icebergProofOfConceptNamespace, getIcebergTableName());
>
> Table dataTable = catalog.loadTable(dataTableName); // Main Table
> consiting of 2 records, 1 to be deleted
>
> logger.info("Logging records before update");
>
> logRecordsInTable(dataTable); // Works fine
>
>
>
> logger.info("Logging records for deletion");
>
> logRecordsInTable(deleteTable); // Works fine
>
> OutputFileFactory fileFactory = OutputFileFactory.builderFor(dataTable, 1,
> 1).build();
>
> OutputFile deleteOutputFile =
> fileFactory.newOutputFile().encryptingOutputFile();
>
> logger.info("Performing deletes");
>
> EqualityDeleteWriter<Record> deleteWriter =
> Parquet.writeDeletes(deleteOutputFile)
>
>     .forTable(dataTable)
>
>     .createWriterFunc(GenericParquetWriter::buildWriter)
>
>     .overwrite()
>
>
> .equalityFieldIds(icebergSchema.columns().stream().mapToInt(Types.NestedField::fieldId).toArray())
>
>     .buildEqualityWriter();
>
> try(Closeable closeable = deleteWriter; CloseableIterable<Record> deletes
> = IcebergGenerics.read(deleteTable).build()) {
>
>     deleteWriter.write(deletes);
>
> }
>
> DeleteFile deletes = deleteWriter.toDeleteFile();
>
> Transaction deltaTransaction = dataTable.newTransaction();
>
> deltaTransaction.newRowDelta()
>
>     .addDeletes(deletes)
>
>     .commit();
>
> deltaTransaction.commitTransaction();
>
>
>
> logger.info("Transaction complete");
>
> logger.info("Logging records post update");
>
> logRecordsInTable(dataTable); // Results in NPE
>
> // ...
>
>
>
> private void logRecordsInTable(Table dataTable) throws IOException {
>
>     try(CloseableIterable<Record> postDeletionRes =
> IcebergGenerics.read(dataTable).build()) {
>
>         List<String> columnNames =
> dataTable.schema().columns().stream().map(Types.NestedField::name).collect(Collectors.toList());
>
>         for(Record res : postDeletionRes) {
>
>             StringBuilder sb = new StringBuilder().append("[ ");
>
>             columnNames.forEach(name -> {
>
>             sb.append(name)
>
>                 .append(": ")
>
>                 .append(res.getField(name))
>
>                 .append(", ");
>
>            });
>
>
>
>             sb.append("]");
>
>             logger.info(sb.toString());
>
>         }
>
>     }
>
> }
>
>
>
>
>
> best,
>
> ah
>
>
>
> ------------------------------
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
>


-- 
Ryan Blue
Tabular