You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/07/02 23:46:47 UTC

[GitHub] [iceberg] sundargates commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

sundargates commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r663150205



##########
File path: build.gradle
##########
@@ -140,10 +140,11 @@ subprojects {
     def buildLog = new File(logFile)
     addTestOutputListener(new TestOutputListener() {
       def lastDescriptor
+
       @Override
       void onOutput(TestDescriptor testDescriptor, TestOutputEvent testOutputEvent) {
         if (lastDescriptor != testDescriptor) {
-          buildLog << "--------\n- Test log for: "<< testDescriptor << "\n--------\n"
+          buildLog << "--------\n- Test log for: " << testDescriptor << "\n--------\n"

Review comment:
       looks like extraneous changes unrelated to the diff?

##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
##########
@@ -40,4 +40,10 @@ private FlinkConfigOptions() {
           .intType()
           .defaultValue(100)
           .withDescription("Sets max infer parallelism for source operator.");
+
+  public static final ConfigOption<Integer> SOURCE_READER_FETCH_BATCH_SIZE = ConfigOptions
+      .key("source.iceberg.reader.fetch-batch-size")
+      .intType()
+      .defaultValue(2048)

Review comment:
       nit: seems rather large. 

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/ReaderFactory.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.IOException;
+import java.io.Serializable;
+import javax.annotation.Nullable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.util.MutableRecordAndPosition;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+
+public interface ReaderFactory<T> extends Serializable {
+
+  /**
+   * An iterator over records with their position in the file. The iterator is closeable to
+   * support clean resource release and recycling.
+   *
+   * @param <T> The type of the record.
+   */
+  interface RecordIterator<T> {
+
+    /**
+     * Gets the next record from the file, together with its position.
+     *
+     * <p>The position information returned with the record point to the record AFTER the
+     * returned record, because it defines the point where the reading should resume once the
+     * current record is emitted. The position information is put in the source's state when the
+     * record is emitted. If a checkpoint is taken directly after the record is emitted, the
+     * checkpoint must to describe where to resume the source reading from after that record.
+     *
+     * <p>Objects returned by this method may be reused by the iterator. By the time that this
+     * method is called again, no object returned from the previous call will be referenced any
+     * more. That makes it possible to have a single {@link MutableRecordAndPosition} object and
+     * return the same instance (with updated record and position) on every call.
+     */
+    @Nullable
+    RecordAndPosition<T> next();
+
+    /**
+     * Releases the batch that this iterator iterated over. This is not supposed to close the
+     * reader and its resources, but is simply a signal that this iterator is no used any more.
+     * This method can be used as a hook to recycle/reuse heavyweight object structures.
+     */
+    void releaseBatch();
+  }
+
+  /**
+   * A batch reader for a {@link IcebergSourceSplit}
+   *
+   * @param <T> output record type
+   */
+  interface Reader<T> {
+
+    /**
+     * Reads one batch. The method should return null when reaching the end of the input. The
+     * returned batch will be handed over to the processing threads as one.
+     *
+     * <p>The returned iterator object and any contained objects may be held onto by the
+     * source for some time, so it should not be immediately reused by the reader.
+     *
+     * <p>To implement reuse and to save object allocation, consider using a {@link
+     * org.apache.flink.connector.file.src.util.Pool} and recycle objects into the Pool in the
+     * the {@link RecordIterator#releaseBatch()} method.
+     */
+    @Nullable
+    RecordIterator<T> readBatch() throws IOException;
+
+    /**
+     * Closes the reader and release all resources
+     */
+    void close() throws IOException;
+  }
+
+
+  /**
+   * Create a batch reader for the input split
+   *
+   * @param config Flink configuration
+   * @param split  Iceberg source split
+   * @return a batch reader
+   */
+  Reader<T> create(Configuration config, IcebergSourceSplit split);

Review comment:
       Why is the configuration passed per split? Wouldn't it be the same for the full table? If so, should it be passed to the constructor as a property of the implementation rather than per split?

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/FileRecords.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.util.Collections;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+
+public class FileRecords<T> implements RecordsWithSplitIds<RecordAndPosition<T>> {
+
+  @Nullable
+  private final ReaderFactory.RecordIterator recordsForSplit;
+  private final Set<String> finishedSplits;
+
+  @Nullable
+  private String splitId;
+  @Nullable
+  private ReaderFactory.RecordIterator recordsForSplitCurrent;
+
+  private FileRecords(
+      @Nullable String splitId,
+      @Nullable ReaderFactory.RecordIterator recordsForSplit,

Review comment:
       Same as above.

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
##########
@@ -114,4 +145,43 @@ public void close() throws IOException {
     currentIterator.close();
     tasks = null;
   }
+
+  public Position position() {

Review comment:
       It appears that you are using CheckpointedPosition DS to communicate the position that the iterator has to seek to from outside. However, in order to communicate the current position to the outside world, you are using the internal Position DS. Wondering if we can keep this consistent to be either CheckpointedPosition or the mutable Position?

##########
File path: build.gradle
##########
@@ -329,8 +330,17 @@ project(':iceberg-flink') {
       exclude group: 'org.apache.avro', module: 'avro'
     }
 
+    // flink-dist doesn't include these two modules (FLINK-20098).
+    // Hence we should add them as compile deps.
+    // This doesn't affect iceberg-flink-runtime shadow jar,
+    // since it excludes all flink jars.
+    compile "org.apache.flink:flink-connector-base"
+    compile "org.apache.flink:flink-connector-files"

Review comment:
       nit: is this still needed?

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/ReaderFactory.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.IOException;
+import java.io.Serializable;
+import javax.annotation.Nullable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.util.MutableRecordAndPosition;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+
+public interface ReaderFactory<T> extends Serializable {
+
+  /**
+   * An iterator over records with their position in the file. The iterator is closeable to
+   * support clean resource release and recycling.
+   *
+   * @param <T> The type of the record.
+   */
+  interface RecordIterator<T> {

Review comment:
       I'm not sure we need this internal interface given RecordsWithSplitIds is pretty much a superset of this DS. 

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
##########
@@ -79,6 +83,27 @@ InputFile getInputFile(String location) {
     return inputFiles.get(location);
   }
 
+  public void seek(CheckpointedPosition checkpointedPosition)  {

Review comment:
       is there a need to make use of this DS given that BulkFormat is not going to be supported initially?

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/ReaderFactory.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.IOException;
+import java.io.Serializable;
+import javax.annotation.Nullable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.util.MutableRecordAndPosition;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+
+public interface ReaderFactory<T> extends Serializable {

Review comment:
       I think it might be better to avoid this interface and replace it with an existing Java type such as java.util.Function,  as it would lead to less need for understanding new types. If you want to have this abstraction, then you can just define this interface as 
   ```
   public interface ReaderFactory<T> extends Function<IcebergSourceSplit, CloseableIterable<RecordsWithSplitIds<T>> {}
   ```
   

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/FileRecords.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.util.Collections;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+
+public class FileRecords<T> implements RecordsWithSplitIds<RecordAndPosition<T>> {
+
+  @Nullable
+  private final ReaderFactory.RecordIterator recordsForSplit;

Review comment:
       why is the RecordIterator type here not parameterized? 

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataIteratorReaderFactory.java
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.IOException;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
+import org.apache.flink.connector.file.src.util.Pool;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.data.RowDataUtil;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.RowDataIterator;
+import org.apache.iceberg.flink.source.ScanContext;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+
+public class RowDataIteratorReaderFactory implements ReaderFactory<RowData> {

Review comment:
       Can we make this generic by asking the user to provide the factory for generating T (RowData in this specific case) from a given CombinedScanTask or make that abstract and have a default implementation for RowData that uses the DataIterator?
   
   ```
   class BatchDataIteratorFactory<T> implements Function<FileScanTaskSplit, CloseableIterable<RecordsWithSplitIds<T>> {
     protected abstract DataIterator<T> getIteratorFor(CombinedScanTask task);
   }
   ```

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/ReaderFactory.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.IOException;
+import java.io.Serializable;
+import javax.annotation.Nullable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.util.MutableRecordAndPosition;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+
+public interface ReaderFactory<T> extends Serializable {
+
+  /**
+   * An iterator over records with their position in the file. The iterator is closeable to
+   * support clean resource release and recycling.
+   *
+   * @param <T> The type of the record.
+   */
+  interface RecordIterator<T> {
+
+    /**
+     * Gets the next record from the file, together with its position.
+     *
+     * <p>The position information returned with the record point to the record AFTER the
+     * returned record, because it defines the point where the reading should resume once the
+     * current record is emitted. The position information is put in the source's state when the
+     * record is emitted. If a checkpoint is taken directly after the record is emitted, the
+     * checkpoint must to describe where to resume the source reading from after that record.
+     *
+     * <p>Objects returned by this method may be reused by the iterator. By the time that this
+     * method is called again, no object returned from the previous call will be referenced any
+     * more. That makes it possible to have a single {@link MutableRecordAndPosition} object and
+     * return the same instance (with updated record and position) on every call.
+     */
+    @Nullable
+    RecordAndPosition<T> next();
+
+    /**
+     * Releases the batch that this iterator iterated over. This is not supposed to close the
+     * reader and its resources, but is simply a signal that this iterator is no used any more.
+     * This method can be used as a hook to recycle/reuse heavyweight object structures.
+     */
+    void releaseBatch();
+  }
+
+  /**
+   * A batch reader for a {@link IcebergSourceSplit}
+   *
+   * @param <T> output record type
+   */
+  interface Reader<T> {

Review comment:
       Would it make sense to have the reader extend CloseableIterable<RecordsWithSplitIds<T>>? This way the intermediate DS called FileRecords can be completely avoided. A lot of the abstraction makes it slightly complex to read otherwise IMO. 
   
   ```
   interface Reader<T> extends CloseableIterable<RecordsWithSplitIds<T>> {
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org