You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/08/04 07:42:46 UTC

[GitHub] [iceberg] JingsongLi opened a new pull request #1293: Flink: Implement Flink InputFormat and integrate it to FlinkCatalog

JingsongLi opened a new pull request #1293:
URL: https://github.com/apache/iceberg/pull/1293


   Fixes #1275
   
   This is Proof of Concept (POC) for Flink reader.
   
   The Flink reader is essentially the same as Spark.
   - Flink `InputFormat` is similar to Hive (Hadoop) input format. Its splits are generated in the job manager. Therefore, an iceberg catalog loader is needed to obtain the Iceberg `Table` object.
   - Flink `TableFactory` and `TableSource` are similar to Spark `TableProvider` and `SparkScanBuilder`, It also provides projection push down `ProjectableTableSource` and filter push down `FilterableTableSource`.
   
   Work can be divided into:
   - (Ongoing) Flink: Using RowData to avro reader and writer #1232
   - Introduce `IcebergCatalogLoader`.
   - Introduce `FlinkInputFormat`: implement SplitGenerator and RowDataReader.
   - Extract `TestAppendHelper` for testing.
   - Introduce `FlinkTableFactory` and `FlinkTableSource`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JingsongLi commented on a change in pull request #1293: Flink: Implement Flink InputFormat and integrate it to FlinkCatalog

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #1293:
URL: https://github.com/apache/iceberg/pull/1293#discussion_r470371226



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -80,14 +85,16 @@ public FlinkCatalog(
       String catalogName,
       String defaultDatabase,
       String[] baseNamespace,
-      Catalog icebergCatalog,
+      CatalogLoader catalogLoader,
       boolean cacheEnabled) {
     super(catalogName, defaultDatabase);
-    this.originalCatalog = icebergCatalog;
-    this.icebergCatalog = cacheEnabled ? CachingCatalog.wrap(icebergCatalog) : icebergCatalog;
+    this.originalCatalog = catalogLoader.loadCatalog(
+        HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration()));

Review comment:
       We can discuss it in https://github.com/apache/iceberg/pull/1332




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1293: Flink: Implement Flink InputFormat and integrate it to FlinkCatalog

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1293:
URL: https://github.com/apache/iceberg/pull/1293#discussion_r469845803



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -80,14 +85,16 @@ public FlinkCatalog(
       String catalogName,
       String defaultDatabase,
       String[] baseNamespace,
-      Catalog icebergCatalog,
+      CatalogLoader catalogLoader,
       boolean cacheEnabled) {
     super(catalogName, defaultDatabase);
-    this.originalCatalog = icebergCatalog;
-    this.icebergCatalog = cacheEnabled ? CachingCatalog.wrap(icebergCatalog) : icebergCatalog;
+    this.originalCatalog = catalogLoader.loadCatalog(
+        HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration()));

Review comment:
       How about  adding a `Configuration` argument in this current constructor.  IMO  for `FlinkCatalog`,  it shouldn't handle the logic about configuration initialization.  Moving the configuration loading to the `FlinkCatalogFactory`  sounds more reasonable.   besides,   unit test for `FlinkCatalog`  will be easy because it don't depend on how the flink will load its configuration.  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JingsongLi commented on a change in pull request #1293: Flink: Implement Flink InputFormat and integrate it to FlinkCatalog

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #1293:
URL: https://github.com/apache/iceberg/pull/1293#discussion_r470427393



##########
File path: data/src/test/java/org/apache/iceberg/TestAppendHelper.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.data.orc.GenericOrcWriter;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.junit.Assert;
+import org.junit.rules.TemporaryFolder;
+
+public class TestAppendHelper {
+
+  private final Table table;
+  private final FileFormat fileFormat;
+  private final TemporaryFolder tmp;
+
+  public TestAppendHelper(Table table, FileFormat fileFormat, TemporaryFolder tmp) {
+    this.table = table;
+    this.fileFormat = fileFormat;
+    this.tmp = tmp;
+  }
+
+  public void appendToTable(DataFile... dataFiles) {
+    Preconditions.checkNotNull(table, "table not set");
+
+    AppendFiles append = table.newAppend();
+
+    for (DataFile dataFile : dataFiles) {
+      append = append.appendFile(dataFile);
+    }
+
+    append.commit();
+  }
+
+  public void appendToTable(List<Record> records) throws IOException {
+    appendToTable(null, records);
+  }
+
+  public void appendToTable(StructLike partition, List<Record> records) throws IOException {
+    appendToTable(writeFile(partition, records));
+  }
+
+  public DataFile writeFile(StructLike partition, List<Record> records) throws IOException {
+    Preconditions.checkNotNull(table, "table not set");
+    return writeFile(table, partition, records, fileFormat, tmp.newFile());
+  }
+
+  public static DataFile writeFile(
+      Table table, StructLike partition, List<Record> records, FileFormat fileFormat,
+      File file) throws IOException {
+    Assert.assertTrue(file.delete());
+
+    FileAppender<Record> appender;
+
+    switch (fileFormat) {
+      case AVRO:
+        appender = Avro.write(Files.localOutput(file))
+            .schema(table.schema())
+            .createWriterFunc(DataWriter::create)
+            .named(fileFormat.name())
+            .build();
+        break;
+
+      case PARQUET:
+        appender = Parquet.write(Files.localOutput(file))
+            .schema(table.schema())
+            .createWriterFunc(GenericParquetWriter::buildWriter)
+            .named(fileFormat.name())
+            .build();
+        break;
+
+      case ORC:
+        appender = ORC.write(Files.localOutput(file))
+            .schema(table.schema())
+            .createWriterFunc(GenericOrcWriter::buildWriter)
+            .build();
+        break;
+
+      default:
+        throw new UnsupportedOperationException("Cannot write format: " + fileFormat);
+    }
+
+    try {
+      appender.addAll(records);
+    } finally {
+      appender.close();
+    }

Review comment:
       I think we can have a `GenericAppenderFactory`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JingsongLi closed pull request #1293: Flink: Implement Flink InputFormat and integrate it to FlinkCatalog

Posted by GitBox <gi...@apache.org>.
JingsongLi closed pull request #1293:
URL: https://github.com/apache/iceberg/pull/1293


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JingsongLi commented on a change in pull request #1293: Flink: Implement Flink InputFormat and integrate it to FlinkCatalog

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #1293:
URL: https://github.com/apache/iceberg/pull/1293#discussion_r470425585



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/RowDataReader.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.flink.data.FlinkAvroReader;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.PartitionUtil;
+
+class RowDataReader extends BaseDataReader<RowData> {
+
+  private final int[] finalProjection;
+  private final Schema expectedSchema;
+  private final String nameMapping;
+
+  RowDataReader(CombinedScanTask task, FileIO fileIo, EncryptionManager encryption, int[] finalProjection,
+                Schema expectedSchema, String nameMapping) {
+    super(task, fileIo, encryption);
+    this.finalProjection = finalProjection;
+    this.expectedSchema = expectedSchema;
+    this.nameMapping = nameMapping;
+  }
+
+  @Override
+  protected CloseableIterator<RowData> nextTaskIterator(FileScanTask task) {
+    // schema or rows returned by readers
+    PartitionSpec spec = task.spec();
+    Set<Integer> idColumns = spec.identitySourceIds();
+    Schema partitionSchema = TypeUtil.select(expectedSchema, idColumns);
+
+    Map<Integer, ?> idToConstant = partitionSchema.columns().isEmpty() ? ImmutableMap.of() :
+        PartitionUtil.constantsMap(task, RowDataReader::convertConstant);
+    CloseableIterable<RowData> iterable = newIterable(task, idToConstant);
+    ProjectionRowData projectionRow = new ProjectionRowData();
+    return (finalProjection == null ? iterable : CloseableIterable.transform(
+        iterable, rowData -> (RowData) projectionRow.replace(rowData, finalProjection))).iterator();

Review comment:
       You can take a look to `FlinkSchemaUtil. pruneWithoutReordering`, it just keep the order from original schema. But the real Flink projection may change the order.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JingsongLi commented on pull request #1293: Flink: Implement Flink InputFormat and integrate it to FlinkCatalog

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on pull request #1293:
URL: https://github.com/apache/iceberg/pull/1293#issuecomment-674684490


   Hi @openinx , I have create #1346 for `InputFormat` and addressed your comments.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JingsongLi commented on a change in pull request #1293: Flink: Implement Flink InputFormat and integrate it to FlinkCatalog

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #1293:
URL: https://github.com/apache/iceberg/pull/1293#discussion_r470371347



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -80,14 +85,16 @@ public FlinkCatalog(
       String catalogName,
       String defaultDatabase,
       String[] baseNamespace,
-      Catalog icebergCatalog,
+      CatalogLoader catalogLoader,
       boolean cacheEnabled) {
     super(catalogName, defaultDatabase);
-    this.originalCatalog = icebergCatalog;
-    this.icebergCatalog = cacheEnabled ? CachingCatalog.wrap(icebergCatalog) : icebergCatalog;
+    this.originalCatalog = catalogLoader.loadCatalog(
+        HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration()));
+    this.catalogLoader = catalogLoader;
+    this.icebergCatalog = cacheEnabled ? CachingCatalog.wrap(originalCatalog) : originalCatalog;
     this.baseNamespace = baseNamespace;
-    if (icebergCatalog instanceof SupportsNamespaces) {
-      asNamespaceCatalog = (SupportsNamespaces) icebergCatalog;

Review comment:
       previous code is correct too, `icebergCatalog` is the catalog.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1293: Flink: Implement Flink InputFormat and integrate it to FlinkCatalog

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1293:
URL: https://github.com/apache/iceberg/pull/1293#discussion_r470378525



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -80,14 +85,16 @@ public FlinkCatalog(
       String catalogName,
       String defaultDatabase,
       String[] baseNamespace,
-      Catalog icebergCatalog,
+      CatalogLoader catalogLoader,
       boolean cacheEnabled) {
     super(catalogName, defaultDatabase);
-    this.originalCatalog = icebergCatalog;
-    this.icebergCatalog = cacheEnabled ? CachingCatalog.wrap(icebergCatalog) : icebergCatalog;
+    this.originalCatalog = catalogLoader.loadCatalog(
+        HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration()));
+    this.catalogLoader = catalogLoader;
+    this.icebergCatalog = cacheEnabled ? CachingCatalog.wrap(originalCatalog) : originalCatalog;
     this.baseNamespace = baseNamespace;
-    if (icebergCatalog instanceof SupportsNamespaces) {
-      asNamespaceCatalog = (SupportsNamespaces) icebergCatalog;

Review comment:
       OK, this is the difference between `icebergCatalog` and `this.icebergCatalog`.  `icebergCatalog` could be a `SupportsNamespaces` catalog,  while `this.icebergCatalog` won't be... 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1293: Flink: Implement Flink InputFormat and integrate it to FlinkCatalog

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1293:
URL: https://github.com/apache/iceberg/pull/1293#discussion_r480013889



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkTableFactory.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.TableSourceFactory;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.source.FlinkTableSource;
+
+/**
+ * Flink Iceberg table factory to create table source and sink.
+ * Only works for catalog, can not be loaded from Java SPI(Service Provider Interface).
+ */
+class FlinkTableFactory implements TableSourceFactory<RowData> {
+
+  private final FlinkCatalog catalog;
+
+  FlinkTableFactory(FlinkCatalog catalog) {
+    this.catalog = catalog;
+  }
+
+  @Override
+  public Map<String, String> requiredContext() {
+    throw new UnsupportedOperationException("Iceberg Table Factory can not be loaded from Java SPI");
+  }
+
+  @Override
+  public List<String> supportedProperties() {
+    throw new UnsupportedOperationException("Iceberg Table Factory can not be loaded from Java SPI");
+  }
+
+  @Override
+  public TableSource<RowData> createTableSource(Context context) {
+    ObjectIdentifier identifier = context.getObjectIdentifier();
+    ObjectPath objectPath = new ObjectPath(identifier.getDatabaseName(), identifier.getObjectName());
+    TableIdentifier icebergIdentifier = catalog.toIdentifier(objectPath);

Review comment:
       nit:  the `toIdentifier` could be a static method in catalog .




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1293: Flink: Implement Flink InputFormat and integrate it to FlinkCatalog

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1293:
URL: https://github.com/apache/iceberg/pull/1293#discussion_r469806393



##########
File path: flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.Serializable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.hive.HiveCatalog;
+
+/**
+ * Serializable loader to load an Iceberg {@link Catalog}.
+ * Flink needs to get {@link Table} objects in the cluster (for example, to get splits), not just on the client side.
+ * So we need an Iceberg catalog loader to get the {@link Catalog} and get the {@link Table} object.
+ */
+public interface CatalogLoader extends Serializable {

Review comment:
       Seems we could move this class to hive modules ? 

##########
File path: flink/src/test/java/org/apache/iceberg/flink/data/RandomData.java
##########
@@ -88,6 +93,34 @@ public Row next() {
     };
   }
 
+  private static Iterable<RowData> generateRowData(Schema schema, int numRecords,

Review comment:
       We will remove this `RandomData` class in the future (Once RowData parquet reader & writer is ready),   we've already had the `RandomRowData` to generate `Iterable<RowData>`, you may want to rebase master branch now.  (so we don't have to define a `generateRowData` here now).
   It will generate `Record` firstly, then convert them into `RowData`, in this way we could assert the RowData with Record,  it can be more rigorous to determine the correctness of the data.

##########
File path: data/src/test/java/org/apache/iceberg/TestAppendHelper.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.data.orc.GenericOrcWriter;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.junit.Assert;
+import org.junit.rules.TemporaryFolder;
+
+public class TestAppendHelper {
+
+  private final Table table;
+  private final FileFormat fileFormat;
+  private final TemporaryFolder tmp;
+
+  public TestAppendHelper(Table table, FileFormat fileFormat, TemporaryFolder tmp) {
+    this.table = table;
+    this.fileFormat = fileFormat;
+    this.tmp = tmp;
+  }
+
+  public void appendToTable(DataFile... dataFiles) {
+    Preconditions.checkNotNull(table, "table not set");
+
+    AppendFiles append = table.newAppend();
+
+    for (DataFile dataFile : dataFiles) {
+      append = append.appendFile(dataFile);
+    }
+
+    append.commit();
+  }
+
+  public void appendToTable(List<Record> records) throws IOException {
+    appendToTable(null, records);
+  }
+
+  public void appendToTable(StructLike partition, List<Record> records) throws IOException {
+    appendToTable(writeFile(partition, records));
+  }
+
+  public DataFile writeFile(StructLike partition, List<Record> records) throws IOException {
+    Preconditions.checkNotNull(table, "table not set");
+    return writeFile(table, partition, records, fileFormat, tmp.newFile());
+  }
+
+  public static DataFile writeFile(
+      Table table, StructLike partition, List<Record> records, FileFormat fileFormat,
+      File file) throws IOException {
+    Assert.assertTrue(file.delete());
+
+    FileAppender<Record> appender;
+
+    switch (fileFormat) {
+      case AVRO:
+        appender = Avro.write(Files.localOutput(file))
+            .schema(table.schema())
+            .createWriterFunc(DataWriter::create)
+            .named(fileFormat.name())
+            .build();
+        break;
+
+      case PARQUET:
+        appender = Parquet.write(Files.localOutput(file))
+            .schema(table.schema())
+            .createWriterFunc(GenericParquetWriter::buildWriter)
+            .named(fileFormat.name())
+            .build();
+        break;
+
+      case ORC:
+        appender = ORC.write(Files.localOutput(file))
+            .schema(table.schema())
+            .createWriterFunc(GenericOrcWriter::buildWriter)
+            .build();
+        break;
+
+      default:
+        throw new UnsupportedOperationException("Cannot write format: " + fileFormat);
+    }
+
+    try {
+      appender.addAll(records);
+    } finally {
+      appender.close();
+    }

Review comment:
       Seems we could abstract those appender building into a separate method,  I saw lots of unit tests depend on this common logics, could be a separate issue to do this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JingsongLi commented on a change in pull request #1293: Flink: Implement Flink InputFormat and integrate it to FlinkCatalog

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #1293:
URL: https://github.com/apache/iceberg/pull/1293#discussion_r470443300



##########
File path: data/src/test/java/org/apache/iceberg/TestAppendHelper.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.data.orc.GenericOrcWriter;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.junit.Assert;
+import org.junit.rules.TemporaryFolder;
+
+public class TestAppendHelper {
+
+  private final Table table;
+  private final FileFormat fileFormat;
+  private final TemporaryFolder tmp;
+
+  public TestAppendHelper(Table table, FileFormat fileFormat, TemporaryFolder tmp) {
+    this.table = table;
+    this.fileFormat = fileFormat;
+    this.tmp = tmp;
+  }
+
+  public void appendToTable(DataFile... dataFiles) {
+    Preconditions.checkNotNull(table, "table not set");
+
+    AppendFiles append = table.newAppend();
+
+    for (DataFile dataFile : dataFiles) {
+      append = append.appendFile(dataFile);
+    }
+
+    append.commit();
+  }
+
+  public void appendToTable(List<Record> records) throws IOException {
+    appendToTable(null, records);
+  }
+
+  public void appendToTable(StructLike partition, List<Record> records) throws IOException {
+    appendToTable(writeFile(partition, records));
+  }
+
+  public DataFile writeFile(StructLike partition, List<Record> records) throws IOException {
+    Preconditions.checkNotNull(table, "table not set");
+    return writeFile(table, partition, records, fileFormat, tmp.newFile());
+  }
+
+  public static DataFile writeFile(
+      Table table, StructLike partition, List<Record> records, FileFormat fileFormat,
+      File file) throws IOException {
+    Assert.assertTrue(file.delete());
+
+    FileAppender<Record> appender;
+
+    switch (fileFormat) {
+      case AVRO:
+        appender = Avro.write(Files.localOutput(file))
+            .schema(table.schema())
+            .createWriterFunc(DataWriter::create)
+            .named(fileFormat.name())
+            .build();
+        break;
+
+      case PARQUET:
+        appender = Parquet.write(Files.localOutput(file))
+            .schema(table.schema())
+            .createWriterFunc(GenericParquetWriter::buildWriter)
+            .named(fileFormat.name())
+            .build();
+        break;
+
+      case ORC:
+        appender = ORC.write(Files.localOutput(file))
+            .schema(table.schema())
+            .createWriterFunc(GenericOrcWriter::buildWriter)
+            .build();
+        break;
+
+      default:
+        throw new UnsupportedOperationException("Cannot write format: " + fileFormat);
+    }
+
+    try {
+      appender.addAll(records);
+    } finally {
+      appender.close();
+    }

Review comment:
       Created #1340




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JingsongLi commented on a change in pull request #1293: Flink: Implement Flink InputFormat and integrate it to FlinkCatalog

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #1293:
URL: https://github.com/apache/iceberg/pull/1293#discussion_r469836663



##########
File path: flink/src/test/java/org/apache/iceberg/flink/data/RandomData.java
##########
@@ -88,6 +93,34 @@ public Row next() {
     };
   }
 
+  private static Iterable<RowData> generateRowData(Schema schema, int numRecords,

Review comment:
       Sorry, this is legacy useless codes. I'll remove it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JingsongLi commented on a change in pull request #1293: Flink: Implement Flink InputFormat and integrate it to FlinkCatalog

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #1293:
URL: https://github.com/apache/iceberg/pull/1293#discussion_r470426278



##########
File path: flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.Serializable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.hive.HiveCatalog;
+
+/**
+ * Serializable loader to load an Iceberg {@link Catalog}.
+ * Flink needs to get {@link Table} objects in the cluster (for example, to get splits), not just on the client side.
+ * So we need an Iceberg catalog loader to get the {@link Catalog} and get the {@link Table} object.
+ */
+public interface CatalogLoader extends Serializable {

Review comment:
       Maybe not in hive but somewhere in Iceberg modules, I don't know if any other modules want to use `CatalogLoader`.
   We can discuss it in #1332




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #1293: Flink: Implement Flink InputFormat and integrate it to FlinkCatalog

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #1293:
URL: https://github.com/apache/iceberg/pull/1293#issuecomment-674693612


   Thanks for the update, I will take a look today. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JingsongLi commented on a change in pull request #1293: Flink: Implement Flink InputFormat and integrate it to FlinkCatalog

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #1293:
URL: https://github.com/apache/iceberg/pull/1293#discussion_r470424985



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java
##########
@@ -98,4 +102,24 @@ public static TableSchema toSchema(RowType rowType) {
     }
     return builder.build();
   }
+
+  /**
+   * Prune columns from a {@link Schema} using a projected fields.
+   * TODO Why Spark care about filters?

Review comment:
       The columns which has been involved in push-down filter must be in the projection column list. Because just like spark:
   `Spark doesn't support residuals per task, so return all filters to get Spark to handle record-level filtering`. Flink source also doesn't support residuals per task, left these filtering to Flink planner.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JingsongLi commented on a change in pull request #1293: Flink: Implement Flink InputFormat and integrate it to FlinkCatalog

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #1293:
URL: https://github.com/apache/iceberg/pull/1293#discussion_r470424985



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java
##########
@@ -98,4 +102,24 @@ public static TableSchema toSchema(RowType rowType) {
     }
     return builder.build();
   }
+
+  /**
+   * Prune columns from a {@link Schema} using a projected fields.
+   * TODO Why Spark care about filters?

Review comment:
       The columns which has been involved in push-down filter must be in the projection column list. Because just like spark:
   `Spark doesn't support residuals per task, so return all filters to get Spark to handle record-level filtering`.
   Flink source also doesn't support residuals per task, left these filtering to Flink planner.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JingsongLi commented on a change in pull request #1293: Flink: Implement Flink InputFormat and integrate it to FlinkCatalog

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #1293:
URL: https://github.com/apache/iceberg/pull/1293#discussion_r480014073



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkTableFactory.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.TableSourceFactory;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.source.FlinkTableSource;
+
+/**
+ * Flink Iceberg table factory to create table source and sink.
+ * Only works for catalog, can not be loaded from Java SPI(Service Provider Interface).
+ */
+class FlinkTableFactory implements TableSourceFactory<RowData> {

Review comment:
       Yes, should be in this class too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1293: Flink: Implement Flink InputFormat and integrate it to FlinkCatalog

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1293:
URL: https://github.com/apache/iceberg/pull/1293#discussion_r480016861



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkTableFactory.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.TableSourceFactory;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.source.FlinkTableSource;
+
+/**
+ * Flink Iceberg table factory to create table source and sink.
+ * Only works for catalog, can not be loaded from Java SPI(Service Provider Interface).
+ */
+class FlinkTableFactory implements TableSourceFactory<RowData> {
+
+  private final FlinkCatalog catalog;
+
+  FlinkTableFactory(FlinkCatalog catalog) {
+    this.catalog = catalog;
+  }
+
+  @Override
+  public Map<String, String> requiredContext() {
+    throw new UnsupportedOperationException("Iceberg Table Factory can not be loaded from Java SPI");
+  }
+
+  @Override
+  public List<String> supportedProperties() {
+    throw new UnsupportedOperationException("Iceberg Table Factory can not be loaded from Java SPI");
+  }
+
+  @Override
+  public TableSource<RowData> createTableSource(Context context) {
+    ObjectIdentifier identifier = context.getObjectIdentifier();
+    ObjectPath objectPath = new ObjectPath(identifier.getDatabaseName(), identifier.getObjectName());
+    TableIdentifier icebergIdentifier = catalog.toIdentifier(objectPath);
+    try {
+      Table table = catalog.getIcebergTable(objectPath);
+      // Excludes computed columns
+      TableSchema icebergSchema = TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema());

Review comment:
       Q:  what if someone query the flink table with projecting a computed column (which does not exist in iceberg table )?  Does that works fine in current version , or will it throw an exception ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JingsongLi closed pull request #1293: Flink: Implement Flink InputFormat and integrate it to FlinkCatalog

Posted by GitBox <gi...@apache.org>.
JingsongLi closed pull request #1293:
URL: https://github.com/apache/iceberg/pull/1293


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JingsongLi commented on a change in pull request #1293: Flink: Implement Flink InputFormat and integrate it to FlinkCatalog

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #1293:
URL: https://github.com/apache/iceberg/pull/1293#discussion_r469836663



##########
File path: flink/src/test/java/org/apache/iceberg/flink/data/RandomData.java
##########
@@ -88,6 +93,34 @@ public Row next() {
     };
   }
 
+  private static Iterable<RowData> generateRowData(Schema schema, int numRecords,

Review comment:
       Sorry, this is legacy useless codes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1293: Flink: Implement Flink InputFormat and integrate it to FlinkCatalog

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1293:
URL: https://github.com/apache/iceberg/pull/1293#discussion_r469893630



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+
+/**
+ * Flink Iceberg {@link InputFormat}.
+ * - Calls that occur on the Job manager side: {@link #createInputSplits} and {@link #getInputSplitAssigner}.
+ * - Calls that occur on the Task side: {@link #open}, {@link #reachedEnd}, {@link #nextRecord} and {@link #close}.
+ */
+public class FlinkInputFormat extends RichInputFormat<RowData, FlinkInputSplit> {
+
+  private final TableLoader tableLoader;
+  private final Schema tableSchema;
+  private final List<String> projectedFields;
+  private final ScanOptions options;
+  private final List<Expression> filterExpressions;
+  private final FileIO io;
+  private final EncryptionManager encryption;
+
+  private transient RowDataReader reader;
+  private transient boolean fetched;
+  private transient boolean hasNext;
+
+  private FlinkInputFormat(
+      TableLoader tableLoader, Schema tableSchema, FileIO io, EncryptionManager encryption,
+      List<String> projectedFields, List<Expression> filterExpressions, ScanOptions options) {
+    this.tableLoader = tableLoader;
+    this.tableSchema = tableSchema;
+    this.projectedFields = projectedFields;
+    this.options = options;
+    this.filterExpressions = filterExpressions;
+    this.io = io;
+    this.encryption = encryption;
+  }
+
+  @VisibleForTesting
+  Schema getTableSchema() {
+    return tableSchema;
+  }
+
+  @VisibleForTesting
+  List<String> getProjectedFields() {
+    return projectedFields;
+  }
+
+  @Override
+  public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
+    // Legacy method, not be used.
+    return null;
+  }
+
+  @Override
+  public FlinkInputSplit[] createInputSplits(int minNumSplits) throws IOException {
+    // Invoked by Job manager, so it is OK to load table from catalog.
+    tableLoader.open(HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration()));
+    try (TableLoader loader = tableLoader) {
+      Table table = loader.loadTable();
+      Schema expectedSchema = FlinkSchemaUtil.pruneWithoutReordering(tableSchema, projectedFields);
+      FlinkSplitGenerator generator = new FlinkSplitGenerator(table, expectedSchema, options, filterExpressions);
+      return generator.createInputSplits();
+    }
+  }
+
+  @Override
+  public InputSplitAssigner getInputSplitAssigner(FlinkInputSplit[] inputSplits) {
+    return new DefaultInputSplitAssigner(inputSplits);
+  }
+
+  @Override
+  public void configure(Configuration parameters) {
+  }
+
+  @Override
+  public void open(FlinkInputSplit split) {
+    String nameMappingString = options.getNameMapping();
+    Schema expectedSchema = FlinkSchemaUtil.pruneWithoutReordering(tableSchema, projectedFields);
+    List<String> expectedNameList = expectedSchema.asStruct().fields().stream()
+        .map(Types.NestedField::name)
+        .collect(Collectors.toList());
+    int[] finalProjection = projectedFields == null ? null :
+        projectedFields.stream().mapToInt(expectedNameList::indexOf).toArray();
+    this.reader = new RowDataReader(split.getTask(), io, encryption, finalProjection, expectedSchema,
+                                    nameMappingString);
+  }
+
+  @Override
+  public boolean reachedEnd() throws IOException {
+    if (!fetched) {
+      hasNext = reader.next();
+      fetched = true;
+    }
+    return !hasNext;
+  }
+
+  @Override
+  public RowData nextRecord(RowData reuse) throws IOException {
+    if (reachedEnd()) {
+      return null;
+    }
+
+    fetched = false;
+    return reader.get();
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (reader != null) {
+      reader.close();
+    }
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static final class Builder {
+    private TableLoader tableLoader;
+    private Schema tableSchema;
+    private List<String> projectedFields;
+    private ScanOptions options = ScanOptions.builder().build();
+    private List<Expression> filterExpressions = Lists.newArrayList();
+    private FileIO io;
+    private EncryptionManager encryption;
+
+    private Builder() {}
+
+    // -------------------------- Required options -------------------------------
+
+    public Builder table(Table newTable) {
+      this.tableSchema = newTable.schema();
+      this.io = newTable.io();
+      this.encryption = newTable.encryption();
+      return this;
+    }
+
+    public Builder tableLoader(TableLoader newLoader) {
+      this.tableLoader = newLoader;
+      return this;
+    }
+
+    // -------------------------- Optional options -------------------------------
+
+    public Builder filters(List<Expression> newFilters) {
+      this.filterExpressions = newFilters;
+      return this;
+    }
+
+    public Builder projectedFields(List<String> newProjectedFields) {
+      this.projectedFields = newProjectedFields;
+      return this;
+    }
+
+    public Builder options(ScanOptions newOptions) {
+      this.options = newOptions;
+      return this;
+    }
+
+    public Builder schema(Schema newSchema) {
+      this.tableSchema = newSchema;
+      return this;
+    }
+
+    public Builder io(FileIO newIO) {
+      this.io = newIO;
+      return this;
+    }
+
+    public Builder encryption(EncryptionManager newEncryption) {
+      this.encryption = newEncryption;
+      return this;
+    }
+
+    public FlinkInputFormat build() {
+      return new FlinkInputFormat(

Review comment:
       We'd better to have a `Precondition#check` for those arguments in case of throw NPE in the following call stack.

##########
File path: flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.DataStructureConverter;
+import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * Test {@link FlinkInputFormat}.
+ */
+public class TestFlinkInputFormat extends TestFlinkScan {
+
+  private FlinkInputFormat.Builder builder;
+
+  public TestFlinkInputFormat(String fileFormat) {
+    super(fileFormat);
+  }
+
+  @Override
+  public void before() throws IOException {
+    super.before();
+    builder = FlinkInputFormat.builder().tableLoader(TableLoader.fromHadoopTable(warehouse));
+  }
+
+  @Override
+  protected List<Row> execute(Table table, List<String> projectFields) throws IOException {
+    return run(builder.table(table).projectedFields(projectFields).build());
+  }
+
+  @Override
+  protected List<Row> executeWithSnapshotId(Table table, long snapshotId) throws IOException {
+    return run(builder.table(table).options(ScanOptions.builder().snapshotId(snapshotId).build()).build());

Review comment:
       Should we also add unit tests for following cases: 
   1.  scan with both `startSnapshotId` and `endSnapshotId`; 
   2.  scan with only `asOfTimestamp`;
   3.  scan with only `startSnapshotId` . 

##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java
##########
@@ -98,4 +102,24 @@ public static TableSchema toSchema(RowType rowType) {
     }
     return builder.build();
   }
+
+  /**
+   * Prune columns from a {@link Schema} using a projected fields.
+   * TODO Why Spark care about filters?

Review comment:
       As the [javadoc](https://github.com/apache/iceberg/blob/ef801726bd6627ddcda1bd238894ea8ccdae5f39/spark/src/main/java/org/apache/iceberg/spark/SparkSchemaUtil.java#L193) said: 
   
   >    The filters list of {@link Expression} is used to ensure that columns referenced by filters are projected. 
   
   For my understanding, when doing filter push down, we need to keep the columns which has been involved in push-down filter even if it does not in the projection column list. 
   
   Now we do not implement the `FilterableTableSource` interface, so I think we don't need to consider it now. 

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/RowDataReader.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.flink.data.FlinkAvroReader;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.PartitionUtil;
+
+class RowDataReader extends BaseDataReader<RowData> {
+
+  private final int[] finalProjection;
+  private final Schema expectedSchema;
+  private final String nameMapping;
+
+  RowDataReader(CombinedScanTask task, FileIO fileIo, EncryptionManager encryption, int[] finalProjection,
+                Schema expectedSchema, String nameMapping) {
+    super(task, fileIo, encryption);
+    this.finalProjection = finalProjection;
+    this.expectedSchema = expectedSchema;
+    this.nameMapping = nameMapping;
+  }
+
+  @Override
+  protected CloseableIterator<RowData> nextTaskIterator(FileScanTask task) {
+    // schema or rows returned by readers
+    PartitionSpec spec = task.spec();
+    Set<Integer> idColumns = spec.identitySourceIds();
+    Schema partitionSchema = TypeUtil.select(expectedSchema, idColumns);
+
+    Map<Integer, ?> idToConstant = partitionSchema.columns().isEmpty() ? ImmutableMap.of() :
+        PartitionUtil.constantsMap(task, RowDataReader::convertConstant);
+    CloseableIterable<RowData> iterable = newIterable(task, idToConstant);
+    ProjectionRowData projectionRow = new ProjectionRowData();
+    return (finalProjection == null ? iterable : CloseableIterable.transform(
+        iterable, rowData -> (RowData) projectionRow.replace(rowData, finalProjection))).iterator();

Review comment:
       Q:   Why we need to transform the `CloseableIterable<RowData>`  to be a projected `RowData` iterable again,  I mean we've created an AVRO iterable with the projected read schema, it should guarantee  that iterable only contains the projected columns ?  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JingsongLi commented on a change in pull request #1293: Flink: Implement Flink InputFormat and integrate it to FlinkCatalog

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #1293:
URL: https://github.com/apache/iceberg/pull/1293#discussion_r480013553



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -335,6 +341,19 @@ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean
     throw new UnsupportedOperationException("Not support alterTable now.");
   }
 
+  CatalogLoader getCatalogLoader() {
+    return catalogLoader;
+  }
+
+  Configuration getHadoopConf() {
+    return hadoopConf;
+  }
+
+  @Override
+  public Optional<TableFactory> getTableFactory() {

Review comment:
       Until the new API is really ready...
   In the 1.11 and master, FLIP-95 interfaces still lack many things. I am not sure about Flink 1.12, maybe 1.13 is the time.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1293: Flink: Implement Flink InputFormat and integrate it to FlinkCatalog

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1293:
URL: https://github.com/apache/iceberg/pull/1293#discussion_r480003260



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -335,6 +341,19 @@ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean
     throw new UnsupportedOperationException("Not support alterTable now.");
   }
 
+  CatalogLoader getCatalogLoader() {
+    return catalogLoader;
+  }
+
+  Configuration getHadoopConf() {
+    return hadoopConf;
+  }
+
+  @Override
+  public Optional<TableFactory> getTableFactory() {

Review comment:
       Q: The javadoc says it's deprecated now,  What's  the time for us to use the `getFactory` in future ? 
   
   ```java
   	 * @deprecated Use {@link #getFactory()} for the new factory stack. The new factory stack uses the
   	 *             new table sources and sinks defined in FLIP-95 and a slightly different discovery mechanism.
   	 */
   	@Deprecated
   	default Optional<TableFactory> getTableFactory() {
   ``` 

##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkTableFactory.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.TableSourceFactory;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.source.FlinkTableSource;
+
+/**
+ * Flink Iceberg table factory to create table source and sink.
+ * Only works for catalog, can not be loaded from Java SPI(Service Provider Interface).
+ */
+class FlinkTableFactory implements TableSourceFactory<RowData> {

Review comment:
       We may put the tableSink creator in this class too. https://github.com/apache/iceberg/pull/1348/files#diff-0ad7dfff9cfa32fbb760796d976fd650R34

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/FlinkTableSource.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
+import org.apache.flink.table.sources.FilterableTableSource;
+import org.apache.flink.table.sources.LimitableTableSource;
+import org.apache.flink.table.sources.ProjectableTableSource;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.utils.TableConnectorUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.TableLoader;
+
+/**
+ * Flink Iceberg table source.
+ * TODO: Implement {@link FilterableTableSource} and {@link LimitableTableSource}.
+ */
+public class FlinkTableSource implements StreamTableSource<RowData>, ProjectableTableSource<RowData> {
+
+  private final TableIdentifier identifier;
+  private final Table table;
+  private final CatalogLoader catalogLoader;
+  private final Configuration hadoopConf;
+  private final TableSchema schema;
+  private final Map<String, String> options;

Review comment:
       How about renaming it to `scanOptions` ?  I was thought it's an options map of the iceberg table. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1293: Flink: Implement Flink InputFormat and integrate it to FlinkCatalog

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1293:
URL: https://github.com/apache/iceberg/pull/1293#discussion_r470376943



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -80,14 +85,16 @@ public FlinkCatalog(
       String catalogName,
       String defaultDatabase,
       String[] baseNamespace,
-      Catalog icebergCatalog,
+      CatalogLoader catalogLoader,
       boolean cacheEnabled) {
     super(catalogName, defaultDatabase);
-    this.originalCatalog = icebergCatalog;
-    this.icebergCatalog = cacheEnabled ? CachingCatalog.wrap(icebergCatalog) : icebergCatalog;
+    this.originalCatalog = catalogLoader.loadCatalog(
+        HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration()));
+    this.catalogLoader = catalogLoader;
+    this.icebergCatalog = cacheEnabled ? CachingCatalog.wrap(originalCatalog) : originalCatalog;
     this.baseNamespace = baseNamespace;
-    if (icebergCatalog instanceof SupportsNamespaces) {
-      asNamespaceCatalog = (SupportsNamespaces) icebergCatalog;

Review comment:
       I mean: if we enable the `cacheEnabled`, then the icebergCatalog is the wrapped `Catalog` which is a `CachingCatalog` instance  actually.    But seems `CachingCatalog` did not implement the interface `SupportsNamespaces`. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1293: Flink: Implement Flink InputFormat and integrate it to FlinkCatalog

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1293:
URL: https://github.com/apache/iceberg/pull/1293#discussion_r469847324



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -80,14 +85,16 @@ public FlinkCatalog(
       String catalogName,
       String defaultDatabase,
       String[] baseNamespace,
-      Catalog icebergCatalog,
+      CatalogLoader catalogLoader,
       boolean cacheEnabled) {
     super(catalogName, defaultDatabase);
-    this.originalCatalog = icebergCatalog;
-    this.icebergCatalog = cacheEnabled ? CachingCatalog.wrap(icebergCatalog) : icebergCatalog;
+    this.originalCatalog = catalogLoader.loadCatalog(
+        HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration()));
+    this.catalogLoader = catalogLoader;
+    this.icebergCatalog = cacheEnabled ? CachingCatalog.wrap(originalCatalog) : originalCatalog;
     this.baseNamespace = baseNamespace;
-    if (icebergCatalog instanceof SupportsNamespaces) {
-      asNamespaceCatalog = (SupportsNamespaces) icebergCatalog;

Review comment:
       Nice catching,  the CachingCatalog should never be a `SupportsNamespaces` catalog.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1293: Flink: Implement Flink InputFormat and integrate it to FlinkCatalog

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1293:
URL: https://github.com/apache/iceberg/pull/1293#discussion_r480020697



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkTableFactory.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.TableSourceFactory;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.source.FlinkTableSource;
+
+/**
+ * Flink Iceberg table factory to create table source and sink.
+ * Only works for catalog, can not be loaded from Java SPI(Service Provider Interface).
+ */
+class FlinkTableFactory implements TableSourceFactory<RowData> {
+
+  private final FlinkCatalog catalog;
+
+  FlinkTableFactory(FlinkCatalog catalog) {
+    this.catalog = catalog;
+  }
+
+  @Override
+  public Map<String, String> requiredContext() {
+    throw new UnsupportedOperationException("Iceberg Table Factory can not be loaded from Java SPI");
+  }
+
+  @Override
+  public List<String> supportedProperties() {
+    throw new UnsupportedOperationException("Iceberg Table Factory can not be loaded from Java SPI");
+  }
+
+  @Override
+  public TableSource<RowData> createTableSource(Context context) {
+    ObjectIdentifier identifier = context.getObjectIdentifier();
+    ObjectPath objectPath = new ObjectPath(identifier.getDatabaseName(), identifier.getObjectName());
+    TableIdentifier icebergIdentifier = catalog.toIdentifier(objectPath);

Review comment:
       OK , I did not take look into the `toNamespace`,  sounds good.   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1293: Flink: Implement Flink InputFormat and integrate it to FlinkCatalog

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1293:
URL: https://github.com/apache/iceberg/pull/1293#discussion_r469955623



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+
+/**
+ * Flink Iceberg {@link InputFormat}.
+ * - Calls that occur on the Job manager side: {@link #createInputSplits} and {@link #getInputSplitAssigner}.
+ * - Calls that occur on the Task side: {@link #open}, {@link #reachedEnd}, {@link #nextRecord} and {@link #close}.
+ */
+public class FlinkInputFormat extends RichInputFormat<RowData, FlinkInputSplit> {
+
+  private final TableLoader tableLoader;
+  private final Schema tableSchema;
+  private final List<String> projectedFields;
+  private final ScanOptions options;
+  private final List<Expression> filterExpressions;
+  private final FileIO io;
+  private final EncryptionManager encryption;
+
+  private transient RowDataReader reader;
+  private transient boolean fetched;
+  private transient boolean hasNext;
+
+  private FlinkInputFormat(
+      TableLoader tableLoader, Schema tableSchema, FileIO io, EncryptionManager encryption,
+      List<String> projectedFields, List<Expression> filterExpressions, ScanOptions options) {
+    this.tableLoader = tableLoader;
+    this.tableSchema = tableSchema;
+    this.projectedFields = projectedFields;
+    this.options = options;
+    this.filterExpressions = filterExpressions;
+    this.io = io;
+    this.encryption = encryption;
+  }
+
+  @VisibleForTesting
+  Schema getTableSchema() {
+    return tableSchema;
+  }
+
+  @VisibleForTesting
+  List<String> getProjectedFields() {
+    return projectedFields;
+  }
+
+  @Override
+  public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
+    // Legacy method, not be used.
+    return null;
+  }
+
+  @Override
+  public FlinkInputSplit[] createInputSplits(int minNumSplits) throws IOException {
+    // Invoked by Job manager, so it is OK to load table from catalog.
+    tableLoader.open(HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration()));

Review comment:
       Q:   Are we binding the iceberg table's hadoop configuration with flink job manager's hadoop configuration ?  Is it possible to access an iceberg table in the hadoop cluster which is different with flink's hadoop cluster ?  Seems a more reasonable way is:  Passing a customized `SerializeableConfiguration` from client, then job manager could access any hadoop clusters. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #1293: Flink: Implement Flink InputFormat and integrate it to FlinkCatalog

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #1293:
URL: https://github.com/apache/iceberg/pull/1293#discussion_r465520890



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkTableFactory.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.TableSourceFactory;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.source.FlinkTableSource;
+
+/**
+ * Flink Iceberg table factory to create table source and sink.
+ * Only works for catalog, can not be loaded from Java SPI.
+ */
+class FlinkTableFactory implements TableSourceFactory<RowData> {
+
+  private final FlinkCatalog catalog;
+
+  FlinkTableFactory(FlinkCatalog catalog) {
+    this.catalog = catalog;
+  }
+
+  @Override
+  public Map<String, String> requiredContext() {
+    throw new UnsupportedOperationException("Iceberg Table Factory can not loaded from Java SPI");

Review comment:
       Nit: Probable typo. Should possible be `can not *be* loaded from Java SPI`
   
   I also think it might help the average reader if you defined the acronym SPI at least once.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JingsongLi commented on pull request #1293: Flink: Implement Flink InputFormat and integrate it to FlinkCatalog

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on pull request #1293:
URL: https://github.com/apache/iceberg/pull/1293#issuecomment-705945755


   Thanks all for your help, all sub-PRs have been completed. I'll close this PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JingsongLi commented on a change in pull request #1293: Flink: Implement Flink InputFormat and integrate it to FlinkCatalog

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #1293:
URL: https://github.com/apache/iceberg/pull/1293#discussion_r480015331



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkTableFactory.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.TableSourceFactory;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.source.FlinkTableSource;
+
+/**
+ * Flink Iceberg table factory to create table source and sink.
+ * Only works for catalog, can not be loaded from Java SPI(Service Provider Interface).
+ */
+class FlinkTableFactory implements TableSourceFactory<RowData> {
+
+  private final FlinkCatalog catalog;
+
+  FlinkTableFactory(FlinkCatalog catalog) {
+    this.catalog = catalog;
+  }
+
+  @Override
+  public Map<String, String> requiredContext() {
+    throw new UnsupportedOperationException("Iceberg Table Factory can not be loaded from Java SPI");
+  }
+
+  @Override
+  public List<String> supportedProperties() {
+    throw new UnsupportedOperationException("Iceberg Table Factory can not be loaded from Java SPI");
+  }
+
+  @Override
+  public TableSource<RowData> createTableSource(Context context) {
+    ObjectIdentifier identifier = context.getObjectIdentifier();
+    ObjectPath objectPath = new ObjectPath(identifier.getDatabaseName(), identifier.getObjectName());
+    TableIdentifier icebergIdentifier = catalog.toIdentifier(objectPath);

Review comment:
       No, it can not, because it needs baseNamespace information.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JingsongLi commented on a change in pull request #1293: Flink: Implement Flink InputFormat and integrate it to FlinkCatalog

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #1293:
URL: https://github.com/apache/iceberg/pull/1293#discussion_r480018055



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkTableFactory.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.TableSourceFactory;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.source.FlinkTableSource;
+
+/**
+ * Flink Iceberg table factory to create table source and sink.
+ * Only works for catalog, can not be loaded from Java SPI(Service Provider Interface).
+ */
+class FlinkTableFactory implements TableSourceFactory<RowData> {
+
+  private final FlinkCatalog catalog;
+
+  FlinkTableFactory(FlinkCatalog catalog) {
+    this.catalog = catalog;
+  }
+
+  @Override
+  public Map<String, String> requiredContext() {
+    throw new UnsupportedOperationException("Iceberg Table Factory can not be loaded from Java SPI");
+  }
+
+  @Override
+  public List<String> supportedProperties() {
+    throw new UnsupportedOperationException("Iceberg Table Factory can not be loaded from Java SPI");
+  }
+
+  @Override
+  public TableSource<RowData> createTableSource(Context context) {
+    ObjectIdentifier identifier = context.getObjectIdentifier();
+    ObjectPath objectPath = new ObjectPath(identifier.getDatabaseName(), identifier.getObjectName());
+    TableIdentifier icebergIdentifier = catalog.toIdentifier(objectPath);
+    try {
+      Table table = catalog.getIcebergTable(objectPath);
+      // Excludes computed columns
+      TableSchema icebergSchema = TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema());

Review comment:
       Not works.
   Even if the iceberg table supports computed columns in the future, these computed columns will be generated by Flink instead of Iceberg source. (So here should always be `getPhysicalSchema`)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JingsongLi commented on a change in pull request #1293: Flink: Implement Flink InputFormat and integrate it to FlinkCatalog

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #1293:
URL: https://github.com/apache/iceberg/pull/1293#discussion_r470372017



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+
+/**
+ * Flink Iceberg {@link InputFormat}.
+ * - Calls that occur on the Job manager side: {@link #createInputSplits} and {@link #getInputSplitAssigner}.
+ * - Calls that occur on the Task side: {@link #open}, {@link #reachedEnd}, {@link #nextRecord} and {@link #close}.
+ */
+public class FlinkInputFormat extends RichInputFormat<RowData, FlinkInputSplit> {
+
+  private final TableLoader tableLoader;
+  private final Schema tableSchema;
+  private final List<String> projectedFields;
+  private final ScanOptions options;
+  private final List<Expression> filterExpressions;
+  private final FileIO io;
+  private final EncryptionManager encryption;
+
+  private transient RowDataReader reader;
+  private transient boolean fetched;
+  private transient boolean hasNext;
+
+  private FlinkInputFormat(
+      TableLoader tableLoader, Schema tableSchema, FileIO io, EncryptionManager encryption,
+      List<String> projectedFields, List<Expression> filterExpressions, ScanOptions options) {
+    this.tableLoader = tableLoader;
+    this.tableSchema = tableSchema;
+    this.projectedFields = projectedFields;
+    this.options = options;
+    this.filterExpressions = filterExpressions;
+    this.io = io;
+    this.encryption = encryption;
+  }
+
+  @VisibleForTesting
+  Schema getTableSchema() {
+    return tableSchema;
+  }
+
+  @VisibleForTesting
+  List<String> getProjectedFields() {
+    return projectedFields;
+  }
+
+  @Override
+  public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
+    // Legacy method, not be used.
+    return null;
+  }
+
+  @Override
+  public FlinkInputSplit[] createInputSplits(int minNumSplits) throws IOException {
+    // Invoked by Job manager, so it is OK to load table from catalog.
+    tableLoader.open(HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration()));

Review comment:
       I'm OK to pass a configuration to `FlinkInputFormat`, although there will be some serialization cost.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JingsongLi commented on pull request #1293: Flink: Implement Flink InputFormat and integrate it to FlinkCatalog

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on pull request #1293:
URL: https://github.com/apache/iceberg/pull/1293#issuecomment-705945755


   Thanks all for your help, all sub-PRs have been completed. I'll close this PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JingsongLi commented on a change in pull request #1293: Flink: Implement Flink InputFormat and integrate it to FlinkCatalog

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #1293:
URL: https://github.com/apache/iceberg/pull/1293#discussion_r480014634



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/FlinkTableSource.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
+import org.apache.flink.table.sources.FilterableTableSource;
+import org.apache.flink.table.sources.LimitableTableSource;
+import org.apache.flink.table.sources.ProjectableTableSource;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.utils.TableConnectorUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.TableLoader;
+
+/**
+ * Flink Iceberg table source.
+ * TODO: Implement {@link FilterableTableSource} and {@link LimitableTableSource}.
+ */
+public class FlinkTableSource implements StreamTableSource<RowData>, ProjectableTableSource<RowData> {
+
+  private final TableIdentifier identifier;
+  private final Table table;
+  private final CatalogLoader catalogLoader;
+  private final Configuration hadoopConf;
+  private final TableSchema schema;
+  private final Map<String, String> options;

Review comment:
       It also includes table hints.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org