You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "rodmeneses (via GitHub)" <gi...@apache.org> on 2024/04/09 20:31:18 UTC

[PR] Flink: Adds support for 1.19 version [iceberg]

rodmeneses opened a new pull request, #10112:
URL: https://github.com/apache/iceberg/pull/10112

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Flink: Adds support for Flink 1.19 version [iceberg]

Posted by "nastra (via GitHub)" <gi...@apache.org>.
nastra commented on code in PR #10112:
URL: https://github.com/apache/iceberg/pull/10112#discussion_r1560518381


##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormatReaderDeletes.java:
##########
@@ -35,6 +35,7 @@
 import org.apache.iceberg.util.StructLikeSet;
 
 public class TestFlinkInputFormatReaderDeletes extends TestFlinkReaderDeletesBase {
+

Review Comment:
   I think it would be good to remove all of these whitespace changes, which would make it much easier to review the PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Flink: Adds support for Flink 1.19 version [iceberg]

Posted by "nastra (via GitHub)" <gi...@apache.org>.
nastra commented on code in PR #10112:
URL: https://github.com/apache/iceberg/pull/10112#discussion_r1566852990


##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java:
##########
@@ -391,17 +391,16 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
               + "an iceberg catalog, Please create table with 'connector'='iceberg' property in a non-iceberg catalog or "
               + "create table without 'connector'='iceberg' related properties in an iceberg table.");
     }
-
-    createIcebergTable(tablePath, table, ignoreIfExists);
+    Preconditions.checkArgument(table instanceof ResolvedCatalogTable, "table should be resolved");

Review Comment:
   it's not clear to me why there's a diff on `FlinkCatalog`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Flink: Adds support for Flink 1.19 version [iceberg]

Posted by "nastra (via GitHub)" <gi...@apache.org>.
nastra commented on code in PR #10112:
URL: https://github.com/apache/iceberg/pull/10112#discussion_r1566850598


##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java:
##########
@@ -391,17 +391,16 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
               + "an iceberg catalog, Please create table with 'connector'='iceberg' property in a non-iceberg catalog or "
               + "create table without 'connector'='iceberg' related properties in an iceberg table.");
     }
-
-    createIcebergTable(tablePath, table, ignoreIfExists);
+    Preconditions.checkArgument(table instanceof ResolvedCatalogTable, "table should be resolved");

Review Comment:
   is this new code that's required for Flink 1.19?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Flink: Adds support for Flink 1.19 version [iceberg]

Posted by "rodmeneses (via GitHub)" <gi...@apache.org>.
rodmeneses commented on PR #10112:
URL: https://github.com/apache/iceberg/pull/10112#issuecomment-2050175110

   > @rodmeneses: Just curious to know what command you have used for the step "Flink: Recover flink/1.18 files from history" ? It is really nice.
   > 
   > I thought the only way was to add back the folder (which shows file was added).
   
   Thanks @ajantha-bhat . I updated the PR description with the steps I took. Please take a look! 😄 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Flink: Adds support for Flink 1.19 version [iceberg]

Posted by "pvary (via GitHub)" <gi...@apache.org>.
pvary commented on code in PR #10112:
URL: https://github.com/apache/iceberg/pull/10112#discussion_r1560437038


##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java:
##########
@@ -126,4 +126,18 @@ protected void dropCatalog(String catalogName, boolean ifExists) {
     sql("USE CATALOG default_catalog");
     sql("DROP CATALOG %s %s", ifExists ? "IF EXISTS" : "", catalogName);
   }
+
+  /**
+   * We can not drop currently used database after FLINK-33226, so we have make sure that we do not
+   * use the current database before dropping it. This method creates a database called 'temp', uses
+   * it and drops the one requested.
+   *
+   * @param database The database to drop
+   * @param ifExists If we should use the 'IF EXISTS' when dropping the database
+   */
+  protected void dropDatabase(String database, boolean ifExists) {
+    sql("CREATE DATABASE IF NOT EXISTS temp");

Review Comment:
   Do we have a default database to use? I find it strange that we are creating a database to drop a database 😄 
   What happens in Flink if we do not create/use a database before creating a table?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Flink: Adds support for Flink 1.19 version [iceberg]

Posted by "rodmeneses (via GitHub)" <gi...@apache.org>.
rodmeneses commented on code in PR #10112:
URL: https://github.com/apache/iceberg/pull/10112#discussion_r1564892433


##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java:
##########
@@ -98,7 +98,7 @@ public void before() {
   @Override
   public void clean() {
     sql("DROP TABLE IF EXISTS %s", TABLE_NAME);
-    sql("DROP DATABASE IF EXISTS %s", DATABASE_NAME);
+    dropDatabase(DATABASE_NAME, true);

Review Comment:
   I ended up changing this implementation of `dropDatabase` to:
   ```
     protected void dropDatabase(String database, boolean ifExists) {
       sql("USE CATALOG %s", DEFAULT_CATALOG_NAME);
       sql("USE %s", DEFAULT_DATABASE_NAME);
       sql("DROP DATABASE %s %s", ifExists ? "IF EXISTS" : "", database);
     }
   ```
   which is similar to our current `dropCatalog` implementation. 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Flink: Adds support for Flink 1.19 version [iceberg]

Posted by "pvary (via GitHub)" <gi...@apache.org>.
pvary commented on PR #10112:
URL: https://github.com/apache/iceberg/pull/10112#issuecomment-2059391305

   Merged to main.
   Thanks for the PR @rodmeneses and @nastra for the review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Flink: Adds support for Flink 1.19 version [iceberg]

Posted by "rodmeneses (via GitHub)" <gi...@apache.org>.
rodmeneses commented on code in PR #10112:
URL: https://github.com/apache/iceberg/pull/10112#discussion_r1561382610


##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java:
##########
@@ -98,7 +98,7 @@ public void before() {
   @Override
   public void clean() {
     sql("DROP TABLE IF EXISTS %s", TABLE_NAME);
-    sql("DROP DATABASE IF EXISTS %s", DATABASE_NAME);
+    dropDatabase(DATABASE_NAME, true);

Review Comment:
   After a discussion offline with @pvary, we have decided to create a new PR with a better cleaning up logic that will be applied to the unit tests. After that is merged, we can continue with this PR. 
   Thanks!
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Flink: Adds support for Flink 1.19 version [iceberg]

Posted by "rodmeneses (via GitHub)" <gi...@apache.org>.
rodmeneses commented on code in PR #10112:
URL: https://github.com/apache/iceberg/pull/10112#discussion_r1564895113


##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java:
##########
@@ -69,7 +69,9 @@ public class FlinkCatalogFactory implements CatalogFactory {
   public static final String HIVE_CONF_DIR = "hive-conf-dir";
   public static final String HADOOP_CONF_DIR = "hadoop-conf-dir";
   public static final String DEFAULT_DATABASE = "default-database";
-  public static final String DEFAULT_DATABASE_NAME = "default";
+  public static final String DEFAULT_DATABASE_NAME = "default_database";

Review Comment:
   This is needed for 
   ```
   TestIcebergConnector.testCatalogDatabaseConflictWithFlinkDatabase
   ```
   that unit test drops a database, under this condition:
   ```
   if (!isDefaultDatabaseName()) {
           sql("DROP DATABASE `%s`", databaseName());
         }
   ```
   Now, in this unit test context, there only exists one catalog (`default_catalog`) and one database (`default_database`). So, it is impossible to drop that DB because of https://github.com/apache/flink/pull/23501
   Also, I noticed that when you switch to `default_catalog` and you try to get the list of databases, the database returned has name `default_database` and not `default`
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Flink: Adds support for Flink 1.19 version [iceberg]

Posted by "rodmeneses (via GitHub)" <gi...@apache.org>.
rodmeneses commented on code in PR #10112:
URL: https://github.com/apache/iceberg/pull/10112#discussion_r1564895113


##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java:
##########
@@ -69,7 +69,9 @@ public class FlinkCatalogFactory implements CatalogFactory {
   public static final String HIVE_CONF_DIR = "hive-conf-dir";
   public static final String HADOOP_CONF_DIR = "hadoop-conf-dir";
   public static final String DEFAULT_DATABASE = "default-database";
-  public static final String DEFAULT_DATABASE_NAME = "default";
+  public static final String DEFAULT_DATABASE_NAME = "default_database";

Review Comment:
   This is needed for 
   ```
   TestIcebergConnector.testCatalogDatabaseConflictWithFlinkDatabase
   ```
   that unit test drops a database, under this condition:
   ```
   if (!isDefaultDatabaseName()) {
           sql("DROP DATABASE `%s`", databaseName());
         }
   ```
   Now, in this unit test context, there only exists one catalog (`default_catalog`) and one database (`default_database`). So, it is impossible to drop that DB because of https://github.com/apache/flink/pull/23501
   Also, I noticed that when you switch to `default_catalog` and you try to get the list of databases, the database returned has name `default_database` and not `default`
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Flink: Adds support for Flink 1.19 version [iceberg]

Posted by "nastra (via GitHub)" <gi...@apache.org>.
nastra commented on PR #10112:
URL: https://github.com/apache/iceberg/pull/10112#issuecomment-2058425533

   > @nastra: Any comments? I would like to merge this soon, as any merge to Flink code path will make this PR stale, and @rodmeneses needs to recreate the whole PR.
   > 
   > Thanks,
   
   I think there's still an issue as there are a bunch of files/diffs that are because Flink 1.16 is being removed and git detects it as a move (with some additional changes). This can also be seen when looking at the file path, where a Flink 1.16 file is moved to a Flink 1.19 file, while also adding some diffs where it's not clear why the diff is there in the first place.
   
   My suggestion would be to do the actual removal of the 1.16 directory as a separate PR in an immediate follow-up. This would mean to skip tests 8 + 9 from the PR description, but it's fine to update gradle files to not build 1.16 anymore. Thoughts on the suggestion?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Flink: Adds support for Flink 1.19 version [iceberg]

Posted by "rodmeneses (via GitHub)" <gi...@apache.org>.
rodmeneses commented on code in PR #10112:
URL: https://github.com/apache/iceberg/pull/10112#discussion_r1564895113


##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java:
##########
@@ -69,7 +69,9 @@ public class FlinkCatalogFactory implements CatalogFactory {
   public static final String HIVE_CONF_DIR = "hive-conf-dir";
   public static final String HADOOP_CONF_DIR = "hadoop-conf-dir";
   public static final String DEFAULT_DATABASE = "default-database";
-  public static final String DEFAULT_DATABASE_NAME = "default";
+  public static final String DEFAULT_DATABASE_NAME = "default_database";

Review Comment:
   This is needed for 
   ```
   TestIcebergConnector.testCatalogDatabaseConflictWithFlinkDatabase
   ```
   that unit test drop a database, under this condition:
   ```
   if (!isDefaultDatabaseName()) {
           sql("DROP DATABASE `%s`", databaseName());
         }
   ```
   Now, in this unit test context, there only exists one catalog (`default_catalog`) and one database (`default_database`). So, it is impossible to drop that DB because of https://github.com/apache/flink/pull/23501
   Also, I noticed that when you switch to `default_catalog` and you try to get the list of databases, the database returned has name `default_database` and not `default`
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Flink: Adds support for Flink 1.19 version [iceberg]

Posted by "rodmeneses (via GitHub)" <gi...@apache.org>.
rodmeneses commented on code in PR #10112:
URL: https://github.com/apache/iceberg/pull/10112#discussion_r1564892433


##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java:
##########
@@ -98,7 +98,7 @@ public void before() {
   @Override
   public void clean() {
     sql("DROP TABLE IF EXISTS %s", TABLE_NAME);
-    sql("DROP DATABASE IF EXISTS %s", DATABASE_NAME);
+    dropDatabase(DATABASE_NAME, true);

Review Comment:
   I ended up changing this implementation to:
   ```
     protected void dropDatabase(String database, boolean ifExists) {
       sql("USE CATALOG %s", DEFAULT_CATALOG_NAME);
       sql("USE %s", DEFAULT_DATABASE_NAME);
       sql("DROP DATABASE %s %s", ifExists ? "IF EXISTS" : "", database);
     }
   ```
   which is similar to our current `dropCatalog` implementation. 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Flink: Adds support for Flink 1.19 version [iceberg]

Posted by "rodmeneses (via GitHub)" <gi...@apache.org>.
rodmeneses commented on PR #10112:
URL: https://github.com/apache/iceberg/pull/10112#issuecomment-2059306340

   > > @nastra: Any comments? I would like to merge this soon, as any merge to Flink code path will make this PR stale, and @rodmeneses needs to recreate the whole PR.
   > > Thanks,
   > 
   > I think there's still an issue as there are a bunch of files/diffs that are because Flink 1.16 is being removed and git detects it as a move (with some additional changes). This can also be seen when looking at the file path, where a Flink 1.16 file is moved to a Flink 1.19 file, while also adding some diffs where it's not clear why the diff is there in the first place.
   > 
   > My suggestion would be to do the actual removal of the 1.16 directory as a separate PR in an immediate follow-up. This would mean to skip tests 8 + 9 from the PR description, but it's fine to update gradle files to not build 1.16 anymore. Thoughts on the suggestion?
   
   Hi @nastra thanks for your review and comment,
   > I think there's still an issue as there are a bunch of files/diffs that are because Flink 1.16 is being removed and git detects it as a move (with some additional changes).
   This is because you are looking at the changes in the whole PR. If you se each of the 4 commits individually, you'll find that everything is making sense
   
   I did the approach of not deleting v1.16 and this PR was updated. But, if you see the changes as a whole PR, now it's even worse than before, because we dont see the history properly. 
   
   Given this, I'd suggest to move forward with deleting v1.16 in this same PR. 
   Thoughts ? @nastra @pvary 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Flink: Adds support for Flink 1.19 version [iceberg]

Posted by "pvary (via GitHub)" <gi...@apache.org>.
pvary merged PR #10112:
URL: https://github.com/apache/iceberg/pull/10112


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Flink: Adds support for Flink 1.19 version [iceberg]

Posted by "rodmeneses (via GitHub)" <gi...@apache.org>.
rodmeneses commented on PR #10112:
URL: https://github.com/apache/iceberg/pull/10112#issuecomment-2048507129

   cc: @pvary @stevenzwu @mas-chen please take a look at your earliest convenience. Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Flink: Adds support for Flink 1.19 version [iceberg]

Posted by "nastra (via GitHub)" <gi...@apache.org>.
nastra commented on code in PR #10112:
URL: https://github.com/apache/iceberg/pull/10112#discussion_r1560516897


##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java:
##########
@@ -248,6 +248,7 @@ public void testInvalidMaxPlanningSnapshotCount() {
             .monitorInterval(Duration.ofMillis(100))
             .maxPlanningSnapshotCount(0)
             .build();
+

Review Comment:
   unnecessary change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Flink: Adds support for Flink 1.19 version [iceberg]

Posted by "pvary (via GitHub)" <gi...@apache.org>.
pvary commented on PR #10112:
URL: https://github.com/apache/iceberg/pull/10112#issuecomment-2059345426

   > Why do we remove Flink 1.1.6 in this PR?
   
   @manuzhang: This is how we usually do these changes. We support the 3 last version of Flink, so when we add a new version, we remove the old one.
   Also we do the changes this way, to keep the history of the main directory (in our case 1.18->1.19).
   
   Old PRs:
   - #9211
   - #7254
   - #6092


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Flink: Adds support for Flink 1.19 version [iceberg]

Posted by "nastra (via GitHub)" <gi...@apache.org>.
nastra commented on code in PR #10112:
URL: https://github.com/apache/iceberg/pull/10112#discussion_r1566855387


##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java:
##########
@@ -391,17 +391,16 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
               + "an iceberg catalog, Please create table with 'connector'='iceberg' property in a non-iceberg catalog or "
               + "create table without 'connector'='iceberg' related properties in an iceberg table.");
     }
-
-    createIcebergTable(tablePath, table, ignoreIfExists);
+    Preconditions.checkArgument(table instanceof ResolvedCatalogTable, "table should be resolved");

Review Comment:
   I think the problem is that the diff is still from a previous attempt where changes were done based on Flink 1.16. What about removing Flink 1.16 building from the gradle files but doing the actual remove of Flink 1.16 folders in a separate PR? I think that would avoid these diffs here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Flink: Adds support for Flink 1.19 version [iceberg]

Posted by "rodmeneses (via GitHub)" <gi...@apache.org>.
rodmeneses commented on code in PR #10112:
URL: https://github.com/apache/iceberg/pull/10112#discussion_r1564937403


##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java:
##########
@@ -257,20 +257,12 @@ public void testCreateTableUnderDefaultDatabase() {
   public void testCatalogDatabaseConflictWithFlinkDatabase() {
     sql("CREATE DATABASE IF NOT EXISTS `%s`", databaseName());
     sql("USE `%s`", databaseName());
-
-    try {
-      testCreateConnectorTable();
-      // Ensure that the table was created under the specific database.
-      Assertions.assertThatThrownBy(
-              () -> sql("CREATE TABLE `default_catalog`.`%s`.`%s`", databaseName(), TABLE_NAME))
-          .isInstanceOf(org.apache.flink.table.api.TableException.class)
-          .hasMessageStartingWith("Could not execute CreateTable in path");
-    } finally {
-      sql("DROP TABLE IF EXISTS `%s`.`%s`", databaseName(), TABLE_NAME);
-      if (!isDefaultDatabaseName()) {
-        sql("DROP DATABASE `%s`", databaseName());
-      }
-    }
+    testCreateConnectorTable();

Review Comment:
   Up to this point in the unit test, there's only one catalog `default_catalog` created and only one database `default_database` created so far. So, trying to drop that database will fail, as we are currently using it, and because if `FLINK-33226`, it will be impossible to drop the database.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Flink: Adds support for Flink 1.19 version [iceberg]

Posted by "nastra (via GitHub)" <gi...@apache.org>.
nastra commented on code in PR #10112:
URL: https://github.com/apache/iceberg/pull/10112#discussion_r1560516528


##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java:
##########
@@ -443,9 +446,10 @@ public void testIncrementalFromSnapshotTimestampWithInvalidIds() throws Exceptio
     ContinuousSplitPlannerImpl splitPlanner =
         new ContinuousSplitPlannerImpl(
             tableResource.tableLoader().clone(), scanContextWithInvalidSnapshotId, null);
+

Review Comment:
   seems like unnecessary changes here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Flink: Adds support for Flink 1.19 version [iceberg]

Posted by "pvary (via GitHub)" <gi...@apache.org>.
pvary commented on code in PR #10112:
URL: https://github.com/apache/iceberg/pull/10112#discussion_r1560439436


##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java:
##########
@@ -98,7 +98,7 @@ public void before() {
   @Override
   public void clean() {
     sql("DROP TABLE IF EXISTS %s", TABLE_NAME);
-    sql("DROP DATABASE IF EXISTS %s", DATABASE_NAME);
+    dropDatabase(DATABASE_NAME, true);

Review Comment:
   Do we really need this? Something behind the scenes issue a `USE DATABASE` command?
   
   If so, this could be a breaking change which we might want to highlight, as creating and dropping a database immediately after that won't work anymore 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Flink: Adds support for Flink 1.19 version [iceberg]

Posted by "nastra (via GitHub)" <gi...@apache.org>.
nastra commented on code in PR #10112:
URL: https://github.com/apache/iceberg/pull/10112#discussion_r1560519389


##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java:
##########
@@ -294,39 +308,284 @@ public void testAlterTable() throws TableNotExistException {
     assertThat(table("tl").properties()).containsAllEntriesOf(properties);
 
     // remove property
-    CatalogTable catalogTable = catalogTable("tl");
+    sql("ALTER TABLE tl RESET('oldK')");
     properties.remove("oldK");
-    getTableEnv()
-        .getCatalog(getTableEnv().getCurrentCatalog())
-        .get()
-        .alterTable(new ObjectPath(DATABASE, "tl"), catalogTable.copy(properties), false);
     assertThat(table("tl").properties()).containsAllEntriesOf(properties);
   }
 
   @TestTemplate
-  public void testAlterTableWithPrimaryKey() throws TableNotExistException {
-    sql("CREATE TABLE tl(id BIGINT, PRIMARY KEY(id) NOT ENFORCED) WITH ('oldK'='oldV')");
-    Map<String, String> properties = Maps.newHashMap();
-    properties.put("oldK", "oldV");
+  public void testAlterTableAddColumn() {
+    sql("CREATE TABLE tl(id BIGINT)");
+    Schema schemaBefore = table("tl").schema();
+    assertThat(schemaBefore.asStruct())
+        .isEqualTo(
+            new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct());
+    sql("ALTER TABLE tl ADD (dt STRING)");
+    Schema schemaAfter1 = table("tl").schema();
+    assertThat(schemaAfter1.asStruct())
+        .isEqualTo(
+            new Schema(
+                    Types.NestedField.optional(1, "id", Types.LongType.get()),
+                    Types.NestedField.optional(2, "dt", Types.StringType.get()))
+                .asStruct());
+    // Add multiple columns
+    sql("ALTER TABLE tl ADD (col1 STRING COMMENT 'comment for col1', col2 BIGINT)");
+    Schema schemaAfter2 = table("tl").schema();
+    assertThat(schemaAfter2.asStruct())
+        .isEqualTo(
+            new Schema(
+                    Types.NestedField.optional(1, "id", Types.LongType.get()),
+                    Types.NestedField.optional(2, "dt", Types.StringType.get()),
+                    Types.NestedField.optional(
+                        3, "col1", Types.StringType.get(), "comment for col1"),
+                    Types.NestedField.optional(4, "col2", Types.LongType.get()))
+                .asStruct());
+    // Adding a required field should fail because Iceberg's SchemaUpdate does not allow
+    // incompatible changes.
+    Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl ADD (pk STRING NOT NULL)"))

Review Comment:
   can be statically imported, similar to `assertThat()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Flink: Adds support for Flink 1.19 version [iceberg]

Posted by "rodmeneses (via GitHub)" <gi...@apache.org>.
rodmeneses commented on code in PR #10112:
URL: https://github.com/apache/iceberg/pull/10112#discussion_r1564892433


##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java:
##########
@@ -98,7 +98,7 @@ public void before() {
   @Override
   public void clean() {
     sql("DROP TABLE IF EXISTS %s", TABLE_NAME);
-    sql("DROP DATABASE IF EXISTS %s", DATABASE_NAME);
+    dropDatabase(DATABASE_NAME, true);

Review Comment:
   I ended up changing this implementation of `dropDatabase` to:
   ```
     protected void dropDatabase(String database, boolean ifExists) {
       String currentCatalog = getTableEnv().getCurrentCatalog();
       sql("USE CATALOG %s", DEFAULT_CATALOG_NAME);
       sql("USE %s", getTableEnv().listDatabases()[0]);
       sql("USE CATALOG %s", currentCatalog);
       sql("DROP DATABASE %s %s", ifExists ? "IF EXISTS" : "", database);
     }
   ```
   which is similar to our current `dropCatalog` implementation. 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Flink: Adds support for Flink 1.19 version [iceberg]

Posted by "nastra (via GitHub)" <gi...@apache.org>.
nastra commented on code in PR #10112:
URL: https://github.com/apache/iceberg/pull/10112#discussion_r1565260333


##########
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSource.java:
##########
@@ -60,7 +60,7 @@ protected List<Row> runWithFilter(Expression filter, String sqlFilter, boolean c
   protected List<Row> runWithOptions(Map<String, String> options) throws Exception {
     FlinkSource.Builder builder = FlinkSource.forRowData();
     Optional.ofNullable(options.get("case-sensitive"))
-        .ifPresent(value -> builder.caseSensitive(Boolean.parseBoolean(value)));
+        .ifPresent(value -> builder.caseSensitive(Boolean.getBoolean(value)));

Review Comment:
   parseBoolean is correct here (this has just been recently changed on main)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Flink: Adds support for Flink 1.19 version [iceberg]

Posted by "manuzhang (via GitHub)" <gi...@apache.org>.
manuzhang commented on PR #10112:
URL: https://github.com/apache/iceberg/pull/10112#issuecomment-2058257988

   Why do we remove Flink 1.1.6 in this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Flink: Adds support for Flink 1.19 version [iceberg]

Posted by "rodmeneses (via GitHub)" <gi...@apache.org>.
rodmeneses commented on code in PR #10112:
URL: https://github.com/apache/iceberg/pull/10112#discussion_r1561381139


##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java:
##########
@@ -248,6 +248,7 @@ public void testInvalidMaxPlanningSnapshotCount() {
             .monitorInterval(Duration.ofMillis(100))
             .maxPlanningSnapshotCount(0)
             .build();
+

Review Comment:
   please see my comment above



##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormatReaderDeletes.java:
##########
@@ -35,6 +35,7 @@
 import org.apache.iceberg.util.StructLikeSet;
 
 public class TestFlinkInputFormatReaderDeletes extends TestFlinkReaderDeletesBase {
+

Review Comment:
   please see my comment above



##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java:
##########
@@ -294,39 +308,284 @@ public void testAlterTable() throws TableNotExistException {
     assertThat(table("tl").properties()).containsAllEntriesOf(properties);
 
     // remove property
-    CatalogTable catalogTable = catalogTable("tl");
+    sql("ALTER TABLE tl RESET('oldK')");
     properties.remove("oldK");
-    getTableEnv()
-        .getCatalog(getTableEnv().getCurrentCatalog())
-        .get()
-        .alterTable(new ObjectPath(DATABASE, "tl"), catalogTable.copy(properties), false);
     assertThat(table("tl").properties()).containsAllEntriesOf(properties);
   }
 
   @TestTemplate
-  public void testAlterTableWithPrimaryKey() throws TableNotExistException {
-    sql("CREATE TABLE tl(id BIGINT, PRIMARY KEY(id) NOT ENFORCED) WITH ('oldK'='oldV')");
-    Map<String, String> properties = Maps.newHashMap();
-    properties.put("oldK", "oldV");
+  public void testAlterTableAddColumn() {
+    sql("CREATE TABLE tl(id BIGINT)");
+    Schema schemaBefore = table("tl").schema();
+    assertThat(schemaBefore.asStruct())
+        .isEqualTo(
+            new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct());
+    sql("ALTER TABLE tl ADD (dt STRING)");
+    Schema schemaAfter1 = table("tl").schema();
+    assertThat(schemaAfter1.asStruct())
+        .isEqualTo(
+            new Schema(
+                    Types.NestedField.optional(1, "id", Types.LongType.get()),
+                    Types.NestedField.optional(2, "dt", Types.StringType.get()))
+                .asStruct());
+    // Add multiple columns
+    sql("ALTER TABLE tl ADD (col1 STRING COMMENT 'comment for col1', col2 BIGINT)");
+    Schema schemaAfter2 = table("tl").schema();
+    assertThat(schemaAfter2.asStruct())
+        .isEqualTo(
+            new Schema(
+                    Types.NestedField.optional(1, "id", Types.LongType.get()),
+                    Types.NestedField.optional(2, "dt", Types.StringType.get()),
+                    Types.NestedField.optional(
+                        3, "col1", Types.StringType.get(), "comment for col1"),
+                    Types.NestedField.optional(4, "col2", Types.LongType.get()))
+                .asStruct());
+    // Adding a required field should fail because Iceberg's SchemaUpdate does not allow
+    // incompatible changes.
+    Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl ADD (pk STRING NOT NULL)"))

Review Comment:
   please see my comment above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Flink: Adds support for Flink 1.19 version [iceberg]

Posted by "rodmeneses (via GitHub)" <gi...@apache.org>.
rodmeneses commented on code in PR #10112:
URL: https://github.com/apache/iceberg/pull/10112#discussion_r1561380930


##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java:
##########
@@ -443,9 +446,10 @@ public void testIncrementalFromSnapshotTimestampWithInvalidIds() throws Exceptio
     ContinuousSplitPlannerImpl splitPlanner =
         new ContinuousSplitPlannerImpl(
             tableResource.tableLoader().clone(), scanContextWithInvalidSnapshotId, null);
+

Review Comment:
   HI @nastra and thanks a lot for your reviews.
   Some comments:
   1. The best way to review this PR is by looking at individual commits. The most important one is the 4th commit called "Flink: Refactoring code and properties to make Flink 1.19 to work". If you review that particular one, you will notice that I'm not introducing the extra line changes.
   2. The reason you're seeing those changes is because of the way Github is presenting the differences, which is showing the complete set of differences in the whole PR, and somehow, it's showing diff between the new 1.19 code and the old 1.16 code
   3. Now, we do we even have those changes? This is because of a previous back port of changes between versions. There were some inconsistencies when the back port was made and on those old commits the extra lines (or the use of `Assertions.assertThatThrownBy` instead of statically imported) were introduced.
   4. So, again, these changes you're referring to are not part of this PR. 
   5. I updated the PR description with the exact steps I take to create the PR
   
   Thanks a lot!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Flink: Adds support for Flink 1.19 version [iceberg]

Posted by "ajantha-bhat (via GitHub)" <gi...@apache.org>.
ajantha-bhat commented on PR #10112:
URL: https://github.com/apache/iceberg/pull/10112#issuecomment-2049367693

   @rodmeneses: Just curious to know what command you have used for the step "Flink: Recover flink/1.18 files from history" ? 
   It is really nice. 
   
   I thought the only way was to add back the folder (which shows file was added).  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org