You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/01/08 00:07:03 UTC

[GitHub] [iceberg] stevenzwu opened a new pull request #2047: (1) refactor Flink source tests so that future FLIP-27 source test ca…

stevenzwu opened a new pull request #2047:
URL: https://github.com/apache/iceberg/pull/2047


   …n extend from (2) switch DeleteReadTests from HiveCatalog to HadoopCatalog, as HiveCatalog uses expensive TestHiveMetastore and is prone to connection leak problem


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #2047: (1) refactor Flink source tests so that future FLIP-27 source test ca…

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on pull request #2047:
URL: https://github.com/apache/iceberg/pull/2047#issuecomment-756472364


   @openinx I haven't switched `FlinkTestBase` to use HadoopCatalog yet. We probably should do it too. Maybe only leave `TestFlinkHiveCatalog` to use HiveCatalog and TestHiveMetastore.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2047: (1) refactor Flink source tests so that future FLIP-27 source test ca…

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2047:
URL: https://github.com/apache/iceberg/pull/2047#discussion_r553720705



##########
File path: flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
##########
@@ -83,8 +74,13 @@
           .bucket("id", 1)

Review comment:
       FLIP-27 source will have a test extending from this base class. Logic for the current source is refactored into the `TestFlinkSource` class




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu edited a comment on pull request #2047: refactor Flink source tests so that future FLIP-27 source test ca…

Posted by GitBox <gi...@apache.org>.
stevenzwu edited a comment on pull request #2047:
URL: https://github.com/apache/iceberg/pull/2047#issuecomment-771779923


   @openinx yes, please help review this unit test refactoring next. The main reason is to avoid constant needs of resolving merge conflicts after periodical rebasing. Other feature code and test are totally isolated and unlikely to get merge conflicts during the probably long review process.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #2047: refactor Flink source tests so that future FLIP-27 source test ca…

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #2047:
URL: https://github.com/apache/iceberg/pull/2047#issuecomment-771448892


   @stevenzwu ,   You mean this PR will be the next one for reviewing which was separated from this big one (https://github.com/apache/iceberg/pull/2105) ?   For my understanding, this is just an unit tests refactor and we could pull request the next feature PR for reviewing,  could just just improve the unit tests when iterating the unified flink source work.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2047: refactor Flink source tests so that future FLIP-27 source test ca…

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2047:
URL: https://github.com/apache/iceberg/pull/2047#discussion_r585556257



##########
File path: flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
##########
@@ -245,74 +204,80 @@ private void validateIdentityPartitionProjections(
 
   @Test
   public void testSnapshotReads() throws Exception {
-    Table table = catalog.createTable(TableIdentifier.of("default", "t"), SCHEMA);
+    Table table = catalog.createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA);
 
     GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER);
 
-    List<Record> expectedRecords = RandomGenericData.generate(SCHEMA, 1, 0L);
+    List<Record> expectedRecords = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L);
     helper.appendToTable(expectedRecords);
     long snapshotId = table.currentSnapshot().snapshotId();
 
     long timestampMillis = table.currentSnapshot().timestampMillis();
 
     // produce another timestamp
     waitUntilAfter(timestampMillis);
-    helper.appendToTable(RandomGenericData.generate(SCHEMA, 1, 0L));
-
-    assertRecords(
-        runWithOptions(ImmutableMap.<String, String>builder().put("snapshot-id", Long.toString(snapshotId)).build()),
-        expectedRecords, SCHEMA);
-    assertRecords(
-        runWithOptions(
-            ImmutableMap.<String, String>builder().put("as-of-timestamp", Long.toString(timestampMillis)).build()),
-        expectedRecords, SCHEMA);
+    helper.appendToTable(RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L));
+
+    TestHelpers.assertRecords(
+        runWithOptions(ImmutableMap.<String, String>builder()
+            .put("snapshot-id", Long.toString(snapshotId))
+            .build()),
+        expectedRecords, TestFixtures.SCHEMA);
+    TestHelpers.assertRecords(
+        runWithOptions(ImmutableMap.<String, String>builder()

Review comment:
       nit: ditto.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2047: refactor Flink source tests so that future FLIP-27 source test ca…

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2047:
URL: https://github.com/apache/iceberg/pull/2047#discussion_r585556072



##########
File path: flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
##########
@@ -245,74 +204,80 @@ private void validateIdentityPartitionProjections(
 
   @Test
   public void testSnapshotReads() throws Exception {
-    Table table = catalog.createTable(TableIdentifier.of("default", "t"), SCHEMA);
+    Table table = catalog.createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA);
 
     GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER);
 
-    List<Record> expectedRecords = RandomGenericData.generate(SCHEMA, 1, 0L);
+    List<Record> expectedRecords = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L);
     helper.appendToTable(expectedRecords);
     long snapshotId = table.currentSnapshot().snapshotId();
 
     long timestampMillis = table.currentSnapshot().timestampMillis();
 
     // produce another timestamp
     waitUntilAfter(timestampMillis);
-    helper.appendToTable(RandomGenericData.generate(SCHEMA, 1, 0L));
-
-    assertRecords(
-        runWithOptions(ImmutableMap.<String, String>builder().put("snapshot-id", Long.toString(snapshotId)).build()),
-        expectedRecords, SCHEMA);
-    assertRecords(
-        runWithOptions(
-            ImmutableMap.<String, String>builder().put("as-of-timestamp", Long.toString(timestampMillis)).build()),
-        expectedRecords, SCHEMA);
+    helper.appendToTable(RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L));
+
+    TestHelpers.assertRecords(
+        runWithOptions(ImmutableMap.<String, String>builder()

Review comment:
       nit: here we could just use: 
   ```java
   ImmutableMap.of("snapshot-id", Long.toString(snapshotId);
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx merged pull request #2047: Flink: Refactor flink source tests for FLIP-27 unified source.

Posted by GitBox <gi...@apache.org>.
openinx merged pull request #2047:
URL: https://github.com/apache/iceberg/pull/2047


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #2047: refactor Flink source tests so that future FLIP-27 source test ca…

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #2047:
URL: https://github.com/apache/iceberg/pull/2047#issuecomment-771448892


   @stevenzwu ,   You mean this PR will be the next one for reviewing which was separated from this big one (https://github.com/apache/iceberg/pull/2105) ?   For my understanding, this is just an unit tests refactor and we could pull request the next feature PR for reviewing,  could just just improve the unit tests when iterating the unified flink source work.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu edited a comment on pull request #2047: refactor Flink source tests so that future FLIP-27 source test ca…

Posted by GitBox <gi...@apache.org>.
stevenzwu edited a comment on pull request #2047:
URL: https://github.com/apache/iceberg/pull/2047#issuecomment-771779923






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2047: (1) refactor Flink source tests so that future FLIP-27 source test ca…

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2047:
URL: https://github.com/apache/iceberg/pull/2047#discussion_r553720135



##########
File path: flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkReaderDeletesBase.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.io.File;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.DeleteReadTests;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public abstract class TestFlinkReaderDeletesBase extends DeleteReadTests {

Review comment:
       FLIP-27 source will add a test extending from this base class. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2047: Flink: Refactor flink source tests for FLIP-27 unified source.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2047:
URL: https://github.com/apache/iceberg/pull/2047#discussion_r588003346



##########
File path: flink/src/main/java/org/apache/iceberg/flink/data/RowDataUtil.java
##########
@@ -69,4 +74,30 @@ public static Object convertConstant(Type type, Object value) {
     }
     return value;
   }
+
+  /**
+   * Similar to the private {@link RowDataSerializer#copyRowData(RowData, RowData)} method.
+   * This skips the check the arity of rowType and from,
+   * because the from RowData may contains additional column for position deletes.
+   * Using {@link RowDataSerializer#copy(RowData, RowData)} will fail the arity check.
+   */
+  public static RowData clone(RowData from, RowData reuse, RowType rowType, TypeSerializer[] fieldSerializers) {

Review comment:
       I think it could be do the refactor when we review the [RowDataIteratorBulkFormat](https://github.com/stevenzwu/iceberg/blob/flip27IcebergSource/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataIteratorBulkFormat.java#L116).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2047: Flink: Refactor flink source tests for FLIP-27 unified source.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2047:
URL: https://github.com/apache/iceberg/pull/2047#discussion_r588000745



##########
File path: flink/src/main/java/org/apache/iceberg/flink/data/RowDataUtil.java
##########
@@ -69,4 +74,30 @@ public static Object convertConstant(Type type, Object value) {
     }
     return value;
   }
+
+  /**
+   * Similar to the private {@link RowDataSerializer#copyRowData(RowData, RowData)} method.
+   * This skips the check the arity of rowType and from,
+   * because the from RowData may contains additional column for position deletes.
+   * Using {@link RowDataSerializer#copy(RowData, RowData)} will fail the arity check.
+   */
+  public static RowData clone(RowData from, RowData reuse, RowType rowType, TypeSerializer[] fieldSerializers) {

Review comment:
       OK ,  seems we're clone the rowData iterately,  I saw the  `FieldGetter` will be created for each row  [here](https://github.com/apache/iceberg/pull/2047/files#diff-81028bf7f8758383a5976b7c1461931a41485de740642252f5fd0c5c9cec51dfR94),  that should also not be the expected behavior.  We may need to introduce an new `RowDataCloner` which will initialize all its TypeSerializer &  `FieldGetter` once in instance constructor,  when iterating the `RowData` we will just clone row by row don't need to create any extra instances.
   
   Users don't have to interact with the internal `TypeSerializer`,  they could just use the `RowDataCloner`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2047: refactor Flink source tests so that future FLIP-27 source test ca…

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2047:
URL: https://github.com/apache/iceberg/pull/2047#discussion_r585563087



##########
File path: flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
##########
@@ -113,68 +93,47 @@ public void before() throws IOException {
     File warehouseFile = TEMPORARY_FOLDER.newFolder();
     Assert.assertTrue(warehouseFile.delete());
     // before variables
-    Configuration conf = new Configuration();
     warehouse = "file:" + warehouseFile;
+    Configuration conf = new Configuration();
     catalog = new HadoopCatalog(conf, warehouse);
+    location = String.format("%s/%s/%s", warehouse, TestFixtures.DATABASE, TestFixtures.TABLE);
   }
 
-  private List<Row> runWithProjection(String... projected) throws IOException {
-    TableSchema.Builder builder = TableSchema.builder();
-    TableSchema schema = FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(
-        catalog.loadTable(TableIdentifier.of("default", "t")).schema()));
-    for (String field : projected) {
-      TableColumn column = schema.getTableColumn(field).get();
-      builder.field(column.getName(), column.getType());
-    }
-    return run(FlinkSource.forRowData().project(builder.build()), Maps.newHashMap(), "", projected);
-  }
-
-  protected List<Row> runWithFilter(Expression filter, String sqlFilter) throws IOException {
-    FlinkSource.Builder builder = FlinkSource.forRowData().filters(Collections.singletonList(filter));
-    return run(builder, Maps.newHashMap(), sqlFilter, "*");
-  }
-
-  private List<Row> runWithOptions(Map<String, String> options) throws IOException {
-    FlinkSource.Builder builder = FlinkSource.forRowData();
-    Optional.ofNullable(options.get("snapshot-id")).ifPresent(value -> builder.snapshotId(Long.parseLong(value)));
-    Optional.ofNullable(options.get("start-snapshot-id"))
-        .ifPresent(value -> builder.startSnapshotId(Long.parseLong(value)));
-    Optional.ofNullable(options.get("end-snapshot-id"))
-        .ifPresent(value -> builder.endSnapshotId(Long.parseLong(value)));
-    Optional.ofNullable(options.get("as-of-timestamp"))
-        .ifPresent(value -> builder.asOfTimestamp(Long.parseLong(value)));
-    return run(builder, options, "", "*");
+  @After
+  public void after() throws IOException {
   }
 
-  private List<Row> run() throws IOException {
-    return run(FlinkSource.forRowData(), Maps.newHashMap(), "", "*");
+  protected TableLoader tableLoader() {
+    return TableLoader.fromHadoopTable(location);
   }
 
-  protected abstract List<Row> run(FlinkSource.Builder formatBuilder, Map<String, String> sqlOptions, String sqlFilter,
-                                   String... sqlSelectedFields) throws IOException;
+  protected abstract List<Row> runWithProjection(String... projected) throws Exception;

Review comment:
       What's the implementation for those four methods in FLIP-27  ?  Looks like we are just filling options in `TestFlinkSource`,  will the FLIP-27 have those different implementations ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu edited a comment on pull request #2047: refactor Flink source tests so that future FLIP-27 source test ca…

Posted by GitBox <gi...@apache.org>.
stevenzwu edited a comment on pull request #2047:
URL: https://github.com/apache/iceberg/pull/2047#issuecomment-771779923


   @openinx yes, please help review this unit test refactoring next. The reason is to avoid constant needs of resolving merge conflicts after peoridiatic rebasing. Other feature code and test are totally isolated and unlikely to get merge conflicts during the probably long review process.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2047: Flink: Refactor flink source tests for FLIP-27 unified source.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2047:
URL: https://github.com/apache/iceberg/pull/2047#discussion_r588000745



##########
File path: flink/src/main/java/org/apache/iceberg/flink/data/RowDataUtil.java
##########
@@ -69,4 +74,30 @@ public static Object convertConstant(Type type, Object value) {
     }
     return value;
   }
+
+  /**
+   * Similar to the private {@link RowDataSerializer#copyRowData(RowData, RowData)} method.
+   * This skips the check the arity of rowType and from,
+   * because the from RowData may contains additional column for position deletes.
+   * Using {@link RowDataSerializer#copy(RowData, RowData)} will fail the arity check.
+   */
+  public static RowData clone(RowData from, RowData reuse, RowType rowType, TypeSerializer[] fieldSerializers) {

Review comment:
       OK ,  seems we're clone the rowData iterately,  I saw the  `FieldGetter` will be created for each row  [here](https://github.com/apache/iceberg/pull/2047/files#diff-81028bf7f8758383a5976b7c1461931a41485de740642252f5fd0c5c9cec51dfR94),  that should also not be the expected behavior.  We may need to introduce an new `RowDataCloner` which will initialize all its TypeSerializer &  `FieldGetter` once in instance constructor,  when iterating the `RowData` we will just clone row by row don't need to create any extra instances.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2047: Flink: Refactor flink source tests for FLIP-27 unified source.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2047:
URL: https://github.com/apache/iceberg/pull/2047#discussion_r587708737



##########
File path: flink/src/main/java/org/apache/iceberg/flink/data/RowDataUtil.java
##########
@@ -69,4 +74,30 @@ public static Object convertConstant(Type type, Object value) {
     }
     return value;
   }
+
+  /**
+   * Similar to the private {@link RowDataSerializer#copyRowData(RowData, RowData)} method.
+   * This skips the check the arity of rowType and from,
+   * because the from RowData may contains additional column for position deletes.
+   * Using {@link RowDataSerializer#copy(RowData, RowData)} will fail the arity check.
+   */
+  public static RowData clone(RowData from, RowData reuse, RowType rowType, TypeSerializer[] fieldSerializers) {

Review comment:
       It is used here: https://github.com/stevenzwu/iceberg/blob/flip27IcebergSource/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataIteratorBulkFormat.java#L116
   
   The main reason for adding `TypeSerializer` to the `RowDataUtil#clone()` method is to avoid constructing it for each clone call. In the constructor of `RowDataIteratorBulkFormat`, we construct `TypeSerializer` once from `RowType`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2047: refactor Flink source tests so that future FLIP-27 source test ca…

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2047:
URL: https://github.com/apache/iceberg/pull/2047#discussion_r585560383



##########
File path: flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkReaderDeletesBase.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.io.File;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.DeleteReadTests;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public abstract class TestFlinkReaderDeletesBase extends DeleteReadTests {

Review comment:
       OK, sounds good to me to make it to be a separate base unit test class.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2047: (1) refactor Flink source tests so that future FLIP-27 source test ca…

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2047:
URL: https://github.com/apache/iceberg/pull/2047#discussion_r553720135



##########
File path: flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkReaderDeletesBase.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.io.File;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.DeleteReadTests;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public abstract class TestFlinkReaderDeletesBase extends DeleteReadTests {

Review comment:
       This new base class is extracted out of the `TestFlinkInputFormatReaderDeletes`. FLIP-27 source will add a test extending from this base class. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #2047: Flink: Refactor flink source tests for FLIP-27 unified source.

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #2047:
URL: https://github.com/apache/iceberg/pull/2047#issuecomment-788910177


   @stevenzwu PR looks good to me overall,  just left few comments. Sorry for the delay ( a bit busy for our internal things), thanks for the great work !


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #2047: refactor Flink source tests so that future FLIP-27 source test ca…

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on pull request #2047:
URL: https://github.com/apache/iceberg/pull/2047#issuecomment-771779923


   @openinx yes, please help review this unit test refactoring next. The reason is to avoid constant needs of resolving merge conflicts. Other feature code and test are totally isolated and unlikely to get merge conflicts during the probably long review process.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2047: Flink: Refactor flink source tests for FLIP-27 unified source.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2047:
URL: https://github.com/apache/iceberg/pull/2047#discussion_r588003346



##########
File path: flink/src/main/java/org/apache/iceberg/flink/data/RowDataUtil.java
##########
@@ -69,4 +74,30 @@ public static Object convertConstant(Type type, Object value) {
     }
     return value;
   }
+
+  /**
+   * Similar to the private {@link RowDataSerializer#copyRowData(RowData, RowData)} method.
+   * This skips the check the arity of rowType and from,
+   * because the from RowData may contains additional column for position deletes.
+   * Using {@link RowDataSerializer#copy(RowData, RowData)} will fail the arity check.
+   */
+  public static RowData clone(RowData from, RowData reuse, RowType rowType, TypeSerializer[] fieldSerializers) {

Review comment:
       I think we could do the refactor when we review the [RowDataIteratorBulkFormat](https://github.com/stevenzwu/iceberg/blob/flip27IcebergSource/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataIteratorBulkFormat.java#L116).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2047: Flink: Refactor flink source tests for FLIP-27 unified source.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2047:
URL: https://github.com/apache/iceberg/pull/2047#discussion_r587710801



##########
File path: flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
##########
@@ -245,74 +204,80 @@ private void validateIdentityPartitionProjections(
 
   @Test
   public void testSnapshotReads() throws Exception {
-    Table table = catalog.createTable(TableIdentifier.of("default", "t"), SCHEMA);
+    Table table = catalog.createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA);
 
     GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER);
 
-    List<Record> expectedRecords = RandomGenericData.generate(SCHEMA, 1, 0L);
+    List<Record> expectedRecords = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L);
     helper.appendToTable(expectedRecords);
     long snapshotId = table.currentSnapshot().snapshotId();
 
     long timestampMillis = table.currentSnapshot().timestampMillis();
 
     // produce another timestamp
     waitUntilAfter(timestampMillis);
-    helper.appendToTable(RandomGenericData.generate(SCHEMA, 1, 0L));
-
-    assertRecords(
-        runWithOptions(ImmutableMap.<String, String>builder().put("snapshot-id", Long.toString(snapshotId)).build()),
-        expectedRecords, SCHEMA);
-    assertRecords(
-        runWithOptions(
-            ImmutableMap.<String, String>builder().put("as-of-timestamp", Long.toString(timestampMillis)).build()),
-        expectedRecords, SCHEMA);
+    helper.appendToTable(RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L));
+
+    TestHelpers.assertRecords(
+        runWithOptions(ImmutableMap.<String, String>builder()

Review comment:
       thx. updated

##########
File path: flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
##########
@@ -245,74 +204,80 @@ private void validateIdentityPartitionProjections(
 
   @Test
   public void testSnapshotReads() throws Exception {
-    Table table = catalog.createTable(TableIdentifier.of("default", "t"), SCHEMA);
+    Table table = catalog.createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA);
 
     GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER);
 
-    List<Record> expectedRecords = RandomGenericData.generate(SCHEMA, 1, 0L);
+    List<Record> expectedRecords = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L);
     helper.appendToTable(expectedRecords);
     long snapshotId = table.currentSnapshot().snapshotId();
 
     long timestampMillis = table.currentSnapshot().timestampMillis();
 
     // produce another timestamp
     waitUntilAfter(timestampMillis);
-    helper.appendToTable(RandomGenericData.generate(SCHEMA, 1, 0L));
-
-    assertRecords(
-        runWithOptions(ImmutableMap.<String, String>builder().put("snapshot-id", Long.toString(snapshotId)).build()),
-        expectedRecords, SCHEMA);
-    assertRecords(
-        runWithOptions(
-            ImmutableMap.<String, String>builder().put("as-of-timestamp", Long.toString(timestampMillis)).build()),
-        expectedRecords, SCHEMA);
+    helper.appendToTable(RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L));
+
+    TestHelpers.assertRecords(
+        runWithOptions(ImmutableMap.<String, String>builder()
+            .put("snapshot-id", Long.toString(snapshotId))
+            .build()),
+        expectedRecords, TestFixtures.SCHEMA);
+    TestHelpers.assertRecords(
+        runWithOptions(ImmutableMap.<String, String>builder()

Review comment:
       thx. updated




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu edited a comment on pull request #2047: refactor Flink source tests so that future FLIP-27 source test ca…

Posted by GitBox <gi...@apache.org>.
stevenzwu edited a comment on pull request #2047:
URL: https://github.com/apache/iceberg/pull/2047#issuecomment-771779923


   @openinx yes, please help review this unit test refactoring next. The main reason is to avoid constant needs of resolving merge conflicts after periodical rebasing. Other FLIP-27 source code and tests are more isolated and less likely to get merge conflicts during the probably long review process.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2047: refactor Flink source tests so that future FLIP-27 source test ca…

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2047:
URL: https://github.com/apache/iceberg/pull/2047#discussion_r585543021



##########
File path: flink/src/main/java/org/apache/iceberg/flink/data/RowDataUtil.java
##########
@@ -69,4 +74,30 @@ public static Object convertConstant(Type type, Object value) {
     }
     return value;
   }
+
+  /**
+   * Similar to the private {@link RowDataSerializer#copyRowData(RowData, RowData)} method.
+   * This skips the check the arity of rowType and from,
+   * because the from RowData may contains additional column for position deletes.
+   * Using {@link RowDataSerializer#copy(RowData, RowData)} will fail the arity check.
+   */
+  public static RowData clone(RowData from, RowData reuse, RowType rowType, TypeSerializer[] fieldSerializers) {

Review comment:
       For my understanding,  this `clone` methods is really not friendly for developers to use.  If we really need to introduce a copy with checking the length, then how about making this to be private and expose an more easy method to public : 
   
   ```java
   public static RowData clone(RowData from, RowType rowType)
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2047: Flink: Refactor flink source tests for FLIP-27 unified source.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2047:
URL: https://github.com/apache/iceberg/pull/2047#discussion_r587705686



##########
File path: flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
##########
@@ -113,68 +93,47 @@ public void before() throws IOException {
     File warehouseFile = TEMPORARY_FOLDER.newFolder();
     Assert.assertTrue(warehouseFile.delete());
     // before variables
-    Configuration conf = new Configuration();
     warehouse = "file:" + warehouseFile;
+    Configuration conf = new Configuration();
     catalog = new HadoopCatalog(conf, warehouse);
+    location = String.format("%s/%s/%s", warehouse, TestFixtures.DATABASE, TestFixtures.TABLE);
   }
 
-  private List<Row> runWithProjection(String... projected) throws IOException {
-    TableSchema.Builder builder = TableSchema.builder();
-    TableSchema schema = FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(
-        catalog.loadTable(TableIdentifier.of("default", "t")).schema()));
-    for (String field : projected) {
-      TableColumn column = schema.getTableColumn(field).get();
-      builder.field(column.getName(), column.getType());
-    }
-    return run(FlinkSource.forRowData().project(builder.build()), Maps.newHashMap(), "", projected);
-  }
-
-  protected List<Row> runWithFilter(Expression filter, String sqlFilter) throws IOException {
-    FlinkSource.Builder builder = FlinkSource.forRowData().filters(Collections.singletonList(filter));
-    return run(builder, Maps.newHashMap(), sqlFilter, "*");
-  }
-
-  private List<Row> runWithOptions(Map<String, String> options) throws IOException {
-    FlinkSource.Builder builder = FlinkSource.forRowData();
-    Optional.ofNullable(options.get("snapshot-id")).ifPresent(value -> builder.snapshotId(Long.parseLong(value)));
-    Optional.ofNullable(options.get("start-snapshot-id"))
-        .ifPresent(value -> builder.startSnapshotId(Long.parseLong(value)));
-    Optional.ofNullable(options.get("end-snapshot-id"))
-        .ifPresent(value -> builder.endSnapshotId(Long.parseLong(value)));
-    Optional.ofNullable(options.get("as-of-timestamp"))
-        .ifPresent(value -> builder.asOfTimestamp(Long.parseLong(value)));
-    return run(builder, options, "", "*");
+  @After
+  public void after() throws IOException {
   }
 
-  private List<Row> run() throws IOException {
-    return run(FlinkSource.forRowData(), Maps.newHashMap(), "", "*");
+  protected TableLoader tableLoader() {
+    return TableLoader.fromHadoopTable(location);
   }
 
-  protected abstract List<Row> run(FlinkSource.Builder formatBuilder, Map<String, String> sqlOptions, String sqlFilter,
-                                   String... sqlSelectedFields) throws IOException;
+  protected abstract List<Row> runWithProjection(String... projected) throws Exception;

Review comment:
       The main difference is how they call the private/protected run method. 
   
   Current source: uses `FlinkSource#Builder` for everything 
   https://github.com/stevenzwu/iceberg/blob/flip27IcebergSource/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSource.java#L49
   
   FLIP-27 source: just construct and pass long the `ScanContext`
   https://github.com/stevenzwu/iceberg/blob/flip27IcebergSource/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java#L69




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2047: refactor Flink source tests so that future FLIP-27 source test ca…

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2047:
URL: https://github.com/apache/iceberg/pull/2047#discussion_r585514126



##########
File path: flink/src/main/java/org/apache/iceberg/flink/data/RowDataUtil.java
##########
@@ -69,4 +74,30 @@ public static Object convertConstant(Type type, Object value) {
     }
     return value;
   }
+
+  /**
+   * Similar to the private {@link RowDataSerializer#copyRowData(RowData, RowData)} method.
+   * This skips the check the arity of rowType and from,
+   * because the from RowData may contains additional column for position deletes.
+   * Using {@link RowDataSerializer#copy(RowData, RowData)} will fail the arity check.
+   */
+  public static RowData clone(RowData from, RowData reuse, RowType rowType, TypeSerializer[] fieldSerializers) {

Review comment:
       How the FLIP-27 use this method ?  How did they construct their `TypeSerializer` ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #2047: refactor Flink source tests so that future FLIP-27 source test ca…

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on pull request #2047:
URL: https://github.com/apache/iceberg/pull/2047#issuecomment-771779923


   @openinx yes, please help review this unit test refactoring next. The reason is to avoid constant needs of resolving merge conflicts. Other feature code and test are totally isolated and unlikely to get merge conflicts during the probably long review process.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org