You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "stevenzwu (via GitHub)" <gi...@apache.org> on 2023/04/22 01:16:54 UTC

[GitHub] [iceberg] stevenzwu opened a new pull request, #7403: Flink: sync 1.16 with 1.17 for backports missed or not ported identic…

stevenzwu opened a new pull request, #7403:
URL: https://github.com/apache/iceberg/pull/7403

   …ally
   
   seems that we have some divergencies from the backport effort. maintaining the versions in sync can make future backport easier when comparing git diff result.
   
   Here are the remaining diffs after this sync. they are due to changes in 1.16 or 1.17.
    git diff --no-index  flink/v1.16/flink/src/ flink/v1.17/flink/src
   
   ```
   diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/.DS_Store b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/.DS_Store
   deleted file mode 100644
   index c678298c3..000000000
   Binary files a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/.DS_Store and /dev/null differ
   diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/.DS_Store b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/.DS_Store
   deleted file mode 100644
   index 0f57d6329..000000000
   Binary files a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/.DS_Store and /dev/null differ
   diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
   index 07e5ca051..e59d7dacd 100644
   --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
   +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
   @@ -22,20 +22,15 @@ import static org.apache.iceberg.flink.SimpleDataUtil.RECORD;
    
    import java.io.File;
    import java.io.IOException;
   -import java.util.Collection;
    import java.util.List;
   -import java.util.Set;
    import java.util.UUID;
    import java.util.stream.Collectors;
    import org.apache.commons.lang3.StringUtils;
    import org.apache.flink.configuration.CoreOptions;
    import org.apache.flink.table.api.TableEnvironment;
   -import org.apache.flink.types.Row;
    import org.apache.iceberg.ContentFile;
    import org.apache.iceberg.DataFile;
    import org.apache.iceberg.DataFiles;
   -import org.apache.iceberg.DeleteFile;
   -import org.apache.iceberg.FileContent;
    import org.apache.iceberg.FileFormat;
    import org.apache.iceberg.FileScanTask;
    import org.apache.iceberg.Files;
   @@ -47,18 +42,13 @@ import org.apache.iceberg.catalog.TableIdentifier;
    import org.apache.iceberg.data.GenericAppenderFactory;
    import org.apache.iceberg.data.GenericRecord;
    import org.apache.iceberg.data.Record;
   -import org.apache.iceberg.exceptions.ValidationException;
    import org.apache.iceberg.expressions.Expressions;
    import org.apache.iceberg.flink.FlinkCatalogTestBase;
    import org.apache.iceberg.flink.SimpleDataUtil;
    import org.apache.iceberg.io.CloseableIterable;
    import org.apache.iceberg.io.FileAppender;
   -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
   -import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
    import org.apache.iceberg.relocated.com.google.common.collect.Lists;
    import org.apache.iceberg.types.Types;
   -import org.apache.iceberg.util.Pair;
   -import org.assertj.core.api.Assertions;
    import org.junit.After;
    import org.junit.Assert;
    import org.junit.Before;
   @@ -73,11 +63,9 @@ public class TestRewriteDataFilesAction extends FlinkCatalogTestBase {
    
      private static final String TABLE_NAME_UNPARTITIONED = "test_table_unpartitioned";
      private static final String TABLE_NAME_PARTITIONED = "test_table_partitioned";
   -  private static final String TABLE_NAME_WITH_PK = "test_table_with_pk";
      private final FileFormat format;
      private Table icebergTableUnPartitioned;
      private Table icebergTablePartitioned;
   -  private Table icebergTableWithPk;
    
      public TestRewriteDataFilesAction(
          String catalogName, Namespace baseNamespace, FileFormat format) {
   @@ -126,12 +114,6 @@ public class TestRewriteDataFilesAction extends FlinkCatalogTestBase {
            TABLE_NAME_PARTITIONED, format.name());
        icebergTablePartitioned =
            validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_PARTITIONED));
   -
   -    sql(
   -        "CREATE TABLE %s (id int, data varchar, PRIMARY KEY(`id`) NOT ENFORCED) with ('write.format.default'='%s', 'format-version'='2')",
   -        TABLE_NAME_WITH_PK, format.name());
   -    icebergTableWithPk =
   -        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_WITH_PK));
      }
    
      @Override
   @@ -139,7 +121,6 @@ public class TestRewriteDataFilesAction extends FlinkCatalogTestBase {
      public void clean() {
        sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME_UNPARTITIONED);
        sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME_PARTITIONED);
   -    sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME_WITH_PK);
        sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
        super.clean();
      }
   @@ -405,95 +386,4 @@ public class TestRewriteDataFilesAction extends FlinkCatalogTestBase {
        expected.add(SimpleDataUtil.createRecord(2, "b"));
        SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
      }
   -
   -  @Test
   -  public void testRewriteNoConflictWithEqualityDeletes() throws IOException {
   -    // Add 2 data files
   -    sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_WITH_PK);
   -    sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_WITH_PK);
   -
   -    // Load 2 stale tables to pass to rewrite actions
   -    // Since the first rewrite will refresh stale1, we need another stale2 for the second rewrite
   -    Table stale1 =
   -        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_WITH_PK));
   -    Table stale2 =
   -        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_WITH_PK));
   -
   -    // Add 1 data file and 1 equality-delete file
   -    sql("INSERT INTO %s /*+ OPTIONS('upsert-enabled'='true')*/ SELECT 1, 'hi'", TABLE_NAME_WITH_PK);
   -
   -    icebergTableWithPk.refresh();
   -    Assert.assertEquals(
   -        "The latest sequence number should be greater than that of the stale snapshot",
   -        stale1.currentSnapshot().sequenceNumber() + 1,
   -        icebergTableWithPk.currentSnapshot().sequenceNumber());
   -
   -    CloseableIterable<FileScanTask> tasks = icebergTableWithPk.newScan().planFiles();
   -    List<DataFile> dataFiles =
   -        Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
   -    Set<DeleteFile> deleteFiles =
   -        Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::deletes)).stream()
   -            .flatMap(Collection::stream)
   -            .collect(Collectors.toSet());
   -    Assert.assertEquals("Should have 3 data files before rewrite", 3, dataFiles.size());
   -    Assert.assertEquals("Should have 1 delete file before rewrite", 1, deleteFiles.size());
   -    Assert.assertSame(
   -        "The 1 delete file should be an equality-delete file",
   -        Iterables.getOnlyElement(deleteFiles).content(),
   -        FileContent.EQUALITY_DELETES);
   -    shouldHaveDataAndFileSequenceNumbers(
   -        TABLE_NAME_WITH_PK,
   -        ImmutableList.of(Pair.of(1L, 1L), Pair.of(2L, 2L), Pair.of(3L, 3L), Pair.of(3L, 3L)));
   -
   -    Assertions.assertThatThrownBy(
   -            () ->
   -                Actions.forTable(stale1)
   -                    .rewriteDataFiles()
   -                    .useStartingSequenceNumber(false)
   -                    .execute(),
   -            "Rewrite using new sequence number should fail")
   -        .isInstanceOf(ValidationException.class);
   -
   -    // Rewrite using the starting sequence number should succeed
   -    RewriteDataFilesActionResult result =
   -        Actions.forTable(stale2).rewriteDataFiles().useStartingSequenceNumber(true).execute();
   -
   -    // Should not rewrite files from the new commit
   -    Assert.assertEquals("Action should rewrite 2 data files", 2, result.deletedDataFiles().size());
   -    Assert.assertEquals("Action should add 1 data file", 1, result.addedDataFiles().size());
   -    // The 2 older files with file-sequence-number <= 2 should be rewritten into a new file.
   -    // The new file is the one with file-sequence-number == 4.
   -    // The new file should use rewrite's starting-sequence-number 2 as its data-sequence-number.
   -    shouldHaveDataAndFileSequenceNumbers(
   -        TABLE_NAME_WITH_PK, ImmutableList.of(Pair.of(3L, 3L), Pair.of(3L, 3L), Pair.of(2L, 4L)));
   -
   -    // Assert the table records as expected.
   -    SimpleDataUtil.assertTableRecords(
   -        icebergTableWithPk,
   -        Lists.newArrayList(
   -            SimpleDataUtil.createRecord(1, "hi"), SimpleDataUtil.createRecord(2, "world")));
   -  }
   -
   -  /**
   -   * Assert that data files and delete files in the table should have expected data sequence numbers
   -   * and file sequence numbers
   -   *
   -   * @param tableName table name
   -   * @param expectedSequenceNumbers list of {@link Pair}'s. Each {@link Pair} contains
   -   *     (expectedDataSequenceNumber, expectedFileSequenceNumber) of a file.
   -   */
   -  private void shouldHaveDataAndFileSequenceNumbers(
   -      String tableName, List<Pair<Long, Long>> expectedSequenceNumbers) {
   -    // "status < 2" for added or existing entries
   -    List<Row> liveEntries = sql("SELECT * FROM %s$entries WHERE status < 2", tableName);
   -
   -    List<Pair<Long, Long>> actualSequenceNumbers =
   -        liveEntries.stream()
   -            .map(
   -                row ->
   -                    Pair.<Long, Long>of(
   -                        row.getFieldAs("sequence_number"), row.getFieldAs("file_sequence_number")))
   -            .collect(Collectors.toList());
   -    Assertions.assertThat(actualSequenceNumbers).hasSameElementsAs(expectedSequenceNumbers);
   -  }
    }
   diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java
   index f240e564a..d72f57dce 100644
   --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java
   +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java
   @@ -422,8 +422,12 @@ public class TestFlinkTableSource extends FlinkTestBase {
        Assert.assertEquals("Should have 1 record", 1, result.size());
        Assert.assertEquals(
            "Should produce the expected record", Row.of(1, "iceberg", 10.0), result.get(0));
   +
   +    // In SQL, null check can only be done as IS NULL or IS NOT NULL, so it's correct to ignore it
   +    // and push the rest down.
   +    String expectedScan = "ref(name=\"data\") == \"iceberg\"";
        Assert.assertEquals(
   -        "Should not push down a filter", Expressions.alwaysTrue(), lastScanEvent.filter());
   +        "Should contain the push down filter", expectedScan, lastScanEvent.filter().toString());
      }
    
      @Test
   @@ -445,8 +449,9 @@ public class TestFlinkTableSource extends FlinkTestBase {
        String sqlNotInNull = String.format("SELECT * FROM %s WHERE id NOT IN (1,2,NULL) ", TABLE_NAME);
        List<Row> resultGT = sql(sqlNotInNull);
        Assert.assertEquals("Should have 0 record", 0, resultGT.size());
   -    Assert.assertEquals(
   -        "Should not push down a filter", Expressions.alwaysTrue(), lastScanEvent.filter());
   +    Assert.assertNull(
   +        "As the predicate pushdown filter out all rows, Flink did not create scan plan, so it doesn't publish any ScanEvent.",
   +        lastScanEvent);
      }
    
      @Test
   @@ -542,6 +547,17 @@ public class TestFlinkTableSource extends FlinkTestBase {
        Assert.assertEquals("Should create only one scan", 1, scanEventCount);
        Assert.assertEquals(
            "Should contain the push down filter", expectedFilter, lastScanEvent.filter().toString());
   +
   +    // %% won't match the row with null value
   +    sqlLike = "SELECT * FROM  " + TABLE_NAME + "  WHERE data LIKE '%%' ";
   +    resultLike = sql(sqlLike);
   +    Assert.assertEquals("Should have 2 records", 2, resultLike.size());
   +    List<Row> expectedRecords =
   +        Lists.newArrayList(Row.of(1, "iceberg", 10.0), Row.of(2, "b", 20.0));
   +    assertSameElements(expectedRecords, resultLike);
   +    String expectedScan = "not_null(ref(name=\"data\"))";
   +    Assert.assertEquals(
   +        "Should contain the push down filter", expectedScan, lastScanEvent.filter().toString());
      }
    
      @Test
   @@ -549,7 +565,7 @@ public class TestFlinkTableSource extends FlinkTestBase {
        Row expectRecord = Row.of(1, "iceberg", 10.0);
        String sqlNoPushDown = "SELECT * FROM " + TABLE_NAME + " WHERE data LIKE '%%i' ";
        List<Row> resultLike = sql(sqlNoPushDown);
   -    Assert.assertEquals("Should have 1 record", 0, resultLike.size());
   +    Assert.assertEquals("Should have 0 record", 0, resultLike.size());
        Assert.assertEquals(
            "Should not push down a filter", Expressions.alwaysTrue(), lastScanEvent.filter());
    
   @@ -567,15 +583,6 @@ public class TestFlinkTableSource extends FlinkTestBase {
        Assert.assertEquals(
            "Should not push down a filter", Expressions.alwaysTrue(), lastScanEvent.filter());
    
   -    sqlNoPushDown = "SELECT * FROM  " + TABLE_NAME + "  WHERE data LIKE '%%' ";
   -    resultLike = sql(sqlNoPushDown);
   -    Assert.assertEquals("Should have 3 records", 3, resultLike.size());
   -    List<Row> expectedRecords =
   -        Lists.newArrayList(Row.of(1, "iceberg", 10.0), Row.of(2, "b", 20.0), Row.of(3, null, 30.0));
   -    assertSameElements(expectedRecords, resultLike);
   -    Assert.assertEquals(
   -        "Should not push down a filter", Expressions.alwaysTrue(), lastScanEvent.filter());
   -
        sqlNoPushDown = "SELECT * FROM  " + TABLE_NAME + "  WHERE data LIKE 'iceber_' ";
        resultLike = sql(sqlNoPushDown);
        Assert.assertEquals("Should have 1 record", 1, resultLike.size());
   diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java
   index ac4c92b06..08cccbbc8 100644
   --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java
   +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java
   @@ -26,6 +26,6 @@ public class TestFlinkPackage {
      /** This unit test would need to be adjusted as new Flink version is supported. */
      @Test
      public void testVersion() {
   -    Assert.assertEquals("1.16.1", FlinkPackage.version());
   +    Assert.assertEquals("1.17.0", FlinkPackage.version());
      }
    }
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #7403: Flink: sync 1.16 with 1.17 for backports missed or not ported identically

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on PR #7403:
URL: https://github.com/apache/iceberg/pull/7403#issuecomment-1520652578

   thanks @Fokko for the review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #7403: Flink: sync 1.16 with 1.17 for backports missed or not ported identically

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on code in PR #7403:
URL: https://github.com/apache/iceberg/pull/7403#discussion_r1175655737


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java:
##########
@@ -59,14 +58,8 @@ public class RowDataFileScanTaskReader implements FileScanTaskReader<RowData> {
   private final Schema projectedSchema;
   private final String nameMapping;
   private final boolean caseSensitive;
-
   private final FlinkSourceFilter rowFilter;
 
-  public RowDataFileScanTaskReader(

Review Comment:
   I missed that, sorry for the extra work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #7403: Flink: sync 1.16 with 1.17 for backports missed or not ported identically

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on code in PR #7403:
URL: https://github.com/apache/iceberg/pull/7403#discussion_r1174453273


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java:
##########
@@ -59,14 +58,8 @@ public class RowDataFileScanTaskReader implements FileScanTaskReader<RowData> {
   private final Schema projectedSchema;
   private final String nameMapping;
   private final boolean caseSensitive;
-
   private final FlinkSourceFilter rowFilter;
 
-  public RowDataFileScanTaskReader(

Review Comment:
   This was intentional. I've added this constructor because the library has been released, and otherwise, we would remove a public method. Since `iceberg-flink-runtime-1.17` hasn't been released, we didn't need it there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu merged pull request #7403: Flink: sync 1.16 with 1.17 for backports missed or not ported identically

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu merged PR #7403:
URL: https://github.com/apache/iceberg/pull/7403


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #7403: Flink: sync 1.16 with 1.17 for backports missed or not ported identically

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #7403:
URL: https://github.com/apache/iceberg/pull/7403#discussion_r1175654693


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java:
##########
@@ -59,14 +58,8 @@ public class RowDataFileScanTaskReader implements FileScanTaskReader<RowData> {
   private final Schema projectedSchema;
   private final String nameMapping;
   private final boolean caseSensitive;
-
   private final FlinkSourceFilter rowFilter;
 
-  public RowDataFileScanTaskReader(

Review Comment:
   thx for the context. `RowDataFileScanTaskReader` is marked as `@Internal`. Users don't construct it directly. it should be only used internally. Hence I think it is better to keep it the same across versions.
   
   <img width="449" alt="image" src="https://user-images.githubusercontent.com/1545663/234086006-04a1b4b0-d575-4280-a632-2f899ffa4a5a.png">
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #7403: Flink: sync 1.16 with 1.17 for backports missed or not ported identic…

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on PR #7403:
URL: https://github.com/apache/iceberg/pull/7403#issuecomment-1518456216

   `TestRewriteDataFilesAction` diff is caused by https://github.com/apache/iceberg/pull/7218 just merged today in 1.16. asked author to port it to 1.15 and 1.17 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org