You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "stevenzwu (via GitHub)" <gi...@apache.org> on 2023/06/24 00:55:16 UTC

[GitHub] [iceberg] stevenzwu commented on a diff in pull request #7836: Flink: Key projection should be based on the requested Flink table schema

stevenzwu commented on code in PR #7836:
URL: https://github.com/apache/iceberg/pull/7836#discussion_r1240545609


##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java:
##########
@@ -349,4 +397,42 @@ private TaskWriterFactory<RowData> createTaskWriterFactory(List<Integer> equalit
         equalityFieldIds,
         false);
   }
+
+  private TaskWriterFactory<RowData> createTaskWriterFactory(
+      RowType flinkType, List<Integer> equalityFieldIds) {
+    return new RowDataTaskWriterFactory(
+        SerializableTable.copyOf(table),
+        flinkType,
+        128 * 1024 * 1024,
+        format,
+        table.properties(),
+        equalityFieldIds,
+        true);
+  }
+
+  private void initTable(boolean partitioned) {
+    if (partitioned) {
+      this.table = create(SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("data").build());
+    } else {
+      this.table = create(SCHEMA, PartitionSpec.unpartitioned());
+    }
+
+    initTable(table);
+  }
+
+  private void initTable(TestTables.TestTable testTable) {
+    this.table = testTable;
+
+    table
+        .updateProperties()
+        .set(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, String.valueOf(8 * 1024))
+        .defaultFormat(format)
+        .commit();
+  }
+
+  private RowData createBinaryRowData(

Review Comment:
   curious why this needs to be `BinaryRowData`? what's the problem with `GenericrowData`?



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java:
##########
@@ -349,4 +397,42 @@ private TaskWriterFactory<RowData> createTaskWriterFactory(List<Integer> equalit
         equalityFieldIds,
         false);
   }
+
+  private TaskWriterFactory<RowData> createTaskWriterFactory(
+      RowType flinkType, List<Integer> equalityFieldIds) {
+    return new RowDataTaskWriterFactory(
+        SerializableTable.copyOf(table),
+        flinkType,
+        128 * 1024 * 1024,
+        format,
+        table.properties(),
+        equalityFieldIds,
+        true);
+  }
+
+  private void initTable(boolean partitioned) {
+    if (partitioned) {
+      this.table = create(SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("data").build());
+    } else {
+      this.table = create(SCHEMA, PartitionSpec.unpartitioned());
+    }
+
+    initTable(table);
+  }
+
+  private void initTable(TestTables.TestTable testTable) {
+    this.table = testTable;

Review Comment:
   this is redundant to line 420 above. that signals the structure may not be ideal.
   
   maybe method in line 413 should be called `createTable`. and this method can be kept as `initTable`. but usage at line 338 can be broken into two lines (first - create table, second - call this method to init table).



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java:
##########
@@ -321,6 +322,53 @@ public void testPartitionedTableWithDataAndIdAsKey() throws IOException {
         "Should have expected records", expectedRowSet(createRecord(1, "aaa")), actualRowSet("*"));
   }
 
+  @Test
+  public void testEqualityColumnOnCustomPrecisionTSColumn() throws IOException {
+    Schema tableSchema =
+        new Schema(
+            required(3, "id", Types.IntegerType.get()),
+            required(4, "ts", Types.TimestampType.withZone()));
+    RowType flinkType =
+        new RowType(
+            false,
+            ImmutableList.of(
+                new RowType.RowField("id", new IntType()),
+                new RowType.RowField("ts", new LocalZonedTimestampType(3))));
+
+    initTable(create(tableSchema, PartitionSpec.unpartitioned()));
+
+    List<Integer> equalityIds = ImmutableList.of(table.schema().findField("ts").fieldId());
+    TaskWriterFactory<RowData> taskWriterFactory = createTaskWriterFactory(flinkType, equalityIds);
+    taskWriterFactory.initialize(1, 1);
+
+    TaskWriter<RowData> writer = taskWriterFactory.create();
+    RowDataSerializer serializer = new RowDataSerializer(flinkType);
+
+    OffsetDateTime start = OffsetDateTime.now();
+    writer.write(createBinaryRowData(serializer, RowKind.INSERT, 1, start));
+    writer.write(createBinaryRowData(serializer, RowKind.INSERT, 2, start.plusSeconds(1)));
+

Review Comment:
   nit: empty line not necessary



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java:
##########
@@ -349,4 +397,42 @@ private TaskWriterFactory<RowData> createTaskWriterFactory(List<Integer> equalit
         equalityFieldIds,
         false);
   }
+
+  private TaskWriterFactory<RowData> createTaskWriterFactory(
+      RowType flinkType, List<Integer> equalityFieldIds) {
+    return new RowDataTaskWriterFactory(
+        SerializableTable.copyOf(table),
+        flinkType,
+        128 * 1024 * 1024,
+        format,
+        table.properties(),
+        equalityFieldIds,
+        true);
+  }
+
+  private void initTable(boolean partitioned) {
+    if (partitioned) {
+      this.table = create(SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("data").build());
+    } else {
+      this.table = create(SCHEMA, PartitionSpec.unpartitioned());
+    }
+
+    initTable(table);
+  }
+
+  private void initTable(TestTables.TestTable testTable) {
+    this.table = testTable;
+
+    table
+        .updateProperties()
+        .set(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, String.valueOf(8 * 1024))
+        .defaultFormat(format)
+        .commit();
+  }
+
+  private RowData createBinaryRowData(

Review Comment:
   also we probably can remove this method for two reasons
   
   - the method name is too broad/generic. it is meant for one specific test method and schema
   - it doesn't save much space. it would be more readable to directly construct the RowData in the place it is used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org