You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/03/04 18:16:18 UTC

[GitHub] [iceberg] stevenzwu commented on a change in pull request #2047: Flink: Refactor flink source tests for FLIP-27 unified source.

stevenzwu commented on a change in pull request #2047:
URL: https://github.com/apache/iceberg/pull/2047#discussion_r587705686



##########
File path: flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
##########
@@ -113,68 +93,47 @@ public void before() throws IOException {
     File warehouseFile = TEMPORARY_FOLDER.newFolder();
     Assert.assertTrue(warehouseFile.delete());
     // before variables
-    Configuration conf = new Configuration();
     warehouse = "file:" + warehouseFile;
+    Configuration conf = new Configuration();
     catalog = new HadoopCatalog(conf, warehouse);
+    location = String.format("%s/%s/%s", warehouse, TestFixtures.DATABASE, TestFixtures.TABLE);
   }
 
-  private List<Row> runWithProjection(String... projected) throws IOException {
-    TableSchema.Builder builder = TableSchema.builder();
-    TableSchema schema = FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(
-        catalog.loadTable(TableIdentifier.of("default", "t")).schema()));
-    for (String field : projected) {
-      TableColumn column = schema.getTableColumn(field).get();
-      builder.field(column.getName(), column.getType());
-    }
-    return run(FlinkSource.forRowData().project(builder.build()), Maps.newHashMap(), "", projected);
-  }
-
-  protected List<Row> runWithFilter(Expression filter, String sqlFilter) throws IOException {
-    FlinkSource.Builder builder = FlinkSource.forRowData().filters(Collections.singletonList(filter));
-    return run(builder, Maps.newHashMap(), sqlFilter, "*");
-  }
-
-  private List<Row> runWithOptions(Map<String, String> options) throws IOException {
-    FlinkSource.Builder builder = FlinkSource.forRowData();
-    Optional.ofNullable(options.get("snapshot-id")).ifPresent(value -> builder.snapshotId(Long.parseLong(value)));
-    Optional.ofNullable(options.get("start-snapshot-id"))
-        .ifPresent(value -> builder.startSnapshotId(Long.parseLong(value)));
-    Optional.ofNullable(options.get("end-snapshot-id"))
-        .ifPresent(value -> builder.endSnapshotId(Long.parseLong(value)));
-    Optional.ofNullable(options.get("as-of-timestamp"))
-        .ifPresent(value -> builder.asOfTimestamp(Long.parseLong(value)));
-    return run(builder, options, "", "*");
+  @After
+  public void after() throws IOException {
   }
 
-  private List<Row> run() throws IOException {
-    return run(FlinkSource.forRowData(), Maps.newHashMap(), "", "*");
+  protected TableLoader tableLoader() {
+    return TableLoader.fromHadoopTable(location);
   }
 
-  protected abstract List<Row> run(FlinkSource.Builder formatBuilder, Map<String, String> sqlOptions, String sqlFilter,
-                                   String... sqlSelectedFields) throws IOException;
+  protected abstract List<Row> runWithProjection(String... projected) throws Exception;

Review comment:
       The main difference is how they call the private/protected run method. 
   
   Current source: uses `FlinkSource#Builder` for everything 
   https://github.com/stevenzwu/iceberg/blob/flip27IcebergSource/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSource.java#L49
   
   FLIP-27 source: just construct and pass long the `ScanContext`
   https://github.com/stevenzwu/iceberg/blob/flip27IcebergSource/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java#L69




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org