You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/02 02:45:30 UTC

[GitHub] [flink-table-store] tsreaper opened a new pull request, #145: [FLINK-27875] Introduce TableScan and TableRead as an abstraction layer above FileStore for reading RowData

tsreaper opened a new pull request, #145:
URL: https://github.com/apache/flink-table-store/pull/145

   In this step we introduce `TableScan` and `TableRead` They are an abstraction layer above `FileStoreScan` and `FileStoreRead` to provide `RowData` reading.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi merged pull request #145: [FLINK-27875] Introduce TableScan and TableRead as an abstraction layer above FileStore for reading RowData

Posted by GitBox <gi...@apache.org>.
JingsongLi merged PR #145:
URL: https://github.com/apache/flink-table-store/pull/145


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #145: [FLINK-27875] Introduce TableScan and TableRead as an abstraction layer above FileStore for reading RowData

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #145:
URL: https://github.com/apache/flink-table-store/pull/145#discussion_r887754365


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.file.FileStoreImpl;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.operation.FileStoreRead;
+import org.apache.flink.table.store.file.operation.FileStoreScan;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.table.store.table.source.TableScan;
+import org.apache.flink.table.store.table.source.ValueContentRowDataIterator;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Iterator;
+
+/** {@link FileStoreTable} for {@link WriteMode#APPEND_ONLY} write mode. */
+public class AppendOnlyFileStoreTable implements FileStoreTable {
+
+    private final Schema schema;
+    private final FileStoreImpl store;
+
+    AppendOnlyFileStoreTable(Schema schema, Configuration conf, String user) {
+        this.schema = schema;
+        this.store =
+                new FileStoreImpl(
+                        schema.id(),
+                        new FileStoreOptions(conf),
+                        WriteMode.APPEND_ONLY,
+                        user,
+                        schema.logicalPartitionType(),
+                        RowType.of(),
+                        schema.logicalRowType(),
+                        null);
+    }
+
+    @Override
+    public TableScan newScan(boolean isStreaming) {
+        FileStoreScan scan = store.newScan();
+        if (isStreaming) {
+            scan.withIncremental(true);
+        }
+
+        return new TableScan(scan, schema, store.pathFactory()) {
+            @Override
+            protected void withNonPartitionFilter(Predicate predicate) {
+                scan.withValueFilter(predicate);
+            }
+        };
+    }
+
+    @Override
+    public TableRead newRead(boolean isStreaming) {
+        FileStoreRead read = store.newRead();
+        if (isStreaming) {
+            read.withDropDelete(false);

Review Comment:
   Yes, but there is a check in `FileStoreReadImpl`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #145: [FLINK-27875] Introduce TableScan and TableRead as an abstraction layer above FileStore for reading RowData

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #145:
URL: https://github.com/apache/flink-table-store/pull/145#discussion_r887643249


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.operation.FileStoreScan;
+import org.apache.flink.table.store.file.predicate.And;
+import org.apache.flink.table.store.file.predicate.CompoundPredicate;
+import org.apache.flink.table.store.file.predicate.LeafPredicate;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.predicate.PredicateBuilder;
+import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/** An abstraction layer above {@link FileStoreScan} to provide input split generation. */
+public abstract class TableScan {
+
+    protected final FileStoreScan scan;
+    private final int[] fieldIdxToPartitionIdx;

Review Comment:
   The `fieldIdxToPartitionIdx` can be a local field. It looks a bit strange. We don't need to reuse it in class member.
   We can just store a schema.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.operation.FileStoreScan;
+import org.apache.flink.table.store.file.predicate.And;
+import org.apache.flink.table.store.file.predicate.CompoundPredicate;
+import org.apache.flink.table.store.file.predicate.LeafPredicate;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.predicate.PredicateBuilder;
+import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/** An abstraction layer above {@link FileStoreScan} to provide input split generation. */
+public abstract class TableScan {
+
+    protected final FileStoreScan scan;
+    private final int[] fieldIdxToPartitionIdx;
+    private final FileStorePathFactory pathFactory;
+
+    protected TableScan(FileStoreScan scan, Schema schema, FileStorePathFactory pathFactory) {
+        this.scan = scan;
+        List<String> partitionKeys = schema.partitionKeys();
+        this.fieldIdxToPartitionIdx =
+                schema.fields().stream().mapToInt(f -> partitionKeys.indexOf(f.name())).toArray();
+        this.pathFactory = pathFactory;
+    }
+
+    public TableScan withSnapshot(long snapshotId) {
+        scan.withSnapshot(snapshotId);
+        return this;
+    }
+
+    public TableScan withFilter(Predicate predicate) {
+        List<Predicate> partitionFilters = new ArrayList<>();
+        List<Predicate> nonPartitionFilters = new ArrayList<>();
+        for (Predicate p : PredicateBuilder.splitAnd(predicate)) {
+            Optional<Predicate> mapped = mapToPartitionFilter(p);
+            if (mapped.isPresent()) {
+                partitionFilters.add(mapped.get());
+            } else {
+                nonPartitionFilters.add(p);
+            }
+        }
+
+        scan.withPartitionFilter(new CompoundPredicate(And.INSTANCE, partitionFilters));
+        withNonPartitionFilter(new CompoundPredicate(And.INSTANCE, nonPartitionFilters));
+        return this;
+    }
+
+    public Plan plan() {
+        FileStoreScan.Plan plan = scan.plan();
+        List<Split> splits = new ArrayList<>();
+        for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> entryWithPartition :
+                plan.groupByPartFiles().entrySet()) {
+            BinaryRowData partition = entryWithPartition.getKey();
+            for (Map.Entry<Integer, List<DataFileMeta>> entryWithBucket :

Review Comment:
   These logical should in a `SplitGenerator`, it can be improved.
   See https://github.com/apache/flink-table-store/pull/131



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.file.FileStoreImpl;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.operation.FileStoreRead;
+import org.apache.flink.table.store.file.operation.FileStoreScan;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.table.store.table.source.TableScan;
+import org.apache.flink.table.store.table.source.ValueContentRowDataIterator;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Iterator;
+
+/** {@link FileStoreTable} for {@link WriteMode#APPEND_ONLY} write mode. */
+public class AppendOnlyFileStoreTable implements FileStoreTable {
+
+    private final Schema schema;
+    private final FileStoreImpl store;
+
+    AppendOnlyFileStoreTable(Schema schema, Configuration conf, String user) {
+        this.schema = schema;
+        this.store =
+                new FileStoreImpl(
+                        schema.id(),
+                        new FileStoreOptions(conf),
+                        WriteMode.APPEND_ONLY,
+                        user,
+                        schema.logicalPartitionType(),
+                        RowType.of(),
+                        schema.logicalRowType(),
+                        null);
+    }
+
+    @Override
+    public TableScan newScan(boolean isStreaming) {
+        FileStoreScan scan = store.newScan();
+        if (isStreaming) {
+            scan.withIncremental(true);

Review Comment:
   `FileStoreScan scan = store.newScan().withIncremental(isStreaming);`



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.operation.FileStoreRead;
+import org.apache.flink.table.store.file.utils.RecordReader;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+/** An abstraction layer above {@link FileStoreRead} to provide reading of {@link RowData}. */
+public abstract class TableRead {
+
+    protected final FileStoreRead read;
+
+    protected TableRead(FileStoreRead read) {
+        this.read = read;
+    }
+
+    public TableRead withProjection(int[][] projection) {
+        withProjectionImpl(projection);
+        return this;
+    }
+
+    protected abstract void withProjectionImpl(int[][] projection);

Review Comment:
   Why not just `withProjection`?
   We dont need to reuse `return this;` only.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.file.FileStoreImpl;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.operation.FileStoreRead;
+import org.apache.flink.table.store.file.operation.FileStoreScan;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.table.store.table.source.TableScan;
+import org.apache.flink.table.store.table.source.ValueContentRowDataIterator;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Iterator;
+
+/** {@link FileStoreTable} for {@link WriteMode#APPEND_ONLY} write mode. */
+public class AppendOnlyFileStoreTable implements FileStoreTable {
+
+    private final Schema schema;
+    private final FileStoreImpl store;
+
+    AppendOnlyFileStoreTable(Schema schema, Configuration conf, String user) {
+        this.schema = schema;
+        this.store =
+                new FileStoreImpl(
+                        schema.id(),
+                        new FileStoreOptions(conf),
+                        WriteMode.APPEND_ONLY,
+                        user,
+                        schema.logicalPartitionType(),
+                        RowType.of(),
+                        schema.logicalRowType(),
+                        null);
+    }
+
+    @Override
+    public TableScan newScan(boolean isStreaming) {
+        FileStoreScan scan = store.newScan();
+        if (isStreaming) {
+            scan.withIncremental(true);
+        }
+
+        return new TableScan(scan, schema, store.pathFactory()) {
+            @Override
+            protected void withNonPartitionFilter(Predicate predicate) {
+                scan.withValueFilter(predicate);
+            }
+        };
+    }
+
+    @Override
+    public TableRead newRead(boolean isStreaming) {

Review Comment:
   ditto



##########
flink-table-store-hive/src/test/java/org/apache/flink/table/store/FileStoreTestHelper.java:
##########
@@ -48,37 +50,33 @@
 import java.util.concurrent.Executors;
 import java.util.function.BiFunction;
 import java.util.function.Function;
-import java.util.stream.Collectors;
 
 /** Helper class to write and read {@link RowData} with {@link FileStoreImpl}. */
 public class FileStoreTestHelper {
 
-    private final FileStoreImpl store;
+    private final FileStoreTable table;
+    private final FileStore store;
     private final BiFunction<RowData, RowData, BinaryRowData> partitionCalculator;
     private final Function<RowData, Integer> bucketCalculator;
     private final Map<BinaryRowData, Map<Integer, RecordWriter>> writers;
     private final ExecutorService compactExecutor;
 
     public FileStoreTestHelper(
             Configuration conf,
-            RowType partitionType,
-            RowType keyType,
-            RowType valueType,
-            MergeFunction mergeFunction,
+            RowType rowType,
+            List<String> partitionKeys,
+            List<String> primaryKeys,
             BiFunction<RowData, RowData, BinaryRowData> partitionCalculator,
-            Function<RowData, Integer> bucketCalculator) {
-        FileStoreOptions options = new FileStoreOptions(conf);
-        this.store =
-                new FileStoreImpl(
-                        options.path().toString(),
-                        0,
-                        options,
-                        WriteMode.CHANGE_LOG,
-                        UUID.randomUUID().toString(),
-                        partitionType,
-                        keyType,
-                        valueType,
-                        mergeFunction);
+            Function<RowData, Integer> bucketCalculator)
+            throws Exception {
+        Path tablePath = FileStoreOptions.path(conf);
+        Schema schema =

Review Comment:
   Can we have a `FileStoreTableFactory`?
   We don't need to make `SchemaManager` public.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.operation.FileStoreRead;
+import org.apache.flink.table.store.file.utils.RecordReader;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+/** An abstraction layer above {@link FileStoreRead} to provide reading of {@link RowData}. */
+public abstract class TableRead {
+
+    protected final FileStoreRead read;
+
+    protected TableRead(FileStoreRead read) {
+        this.read = read;
+    }
+
+    public TableRead withProjection(int[][] projection) {
+        withProjectionImpl(projection);
+        return this;
+    }
+
+    protected abstract void withProjectionImpl(int[][] projection);
+
+    public RecordReader<RowData> createReader(
+            BinaryRowData partition, int bucket, List<DataFileMeta> files) throws IOException {
+        return new RowDataRecordReader(read.createReader(partition, bucket, files));
+    }
+
+    protected abstract Iterator<RowData> rowDataIteratorFromKv(KeyValue kv);

Review Comment:
   Can subclass just provide a `Iterator` which wraps `RecordIterator`?
   1. Return a `Iterator` for one key value looks a bit strange.
   2. This needs to new a `Iterator` for each record.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.file.FileStoreImpl;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.operation.FileStoreRead;
+import org.apache.flink.table.store.file.operation.FileStoreScan;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.table.store.table.source.TableScan;
+import org.apache.flink.table.store.table.source.ValueContentRowDataIterator;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Iterator;
+
+/** {@link FileStoreTable} for {@link WriteMode#APPEND_ONLY} write mode. */
+public class AppendOnlyFileStoreTable implements FileStoreTable {
+
+    private final Schema schema;
+    private final FileStoreImpl store;
+
+    AppendOnlyFileStoreTable(Schema schema, Configuration conf, String user) {
+        this.schema = schema;
+        this.store =
+                new FileStoreImpl(
+                        schema.id(),
+                        new FileStoreOptions(conf),
+                        WriteMode.APPEND_ONLY,
+                        user,
+                        schema.logicalPartitionType(),
+                        RowType.of(),
+                        schema.logicalRowType(),
+                        null);
+    }
+
+    @Override
+    public TableScan newScan(boolean isStreaming) {
+        FileStoreScan scan = store.newScan();
+        if (isStreaming) {
+            scan.withIncremental(true);
+        }
+
+        return new TableScan(scan, schema, store.pathFactory()) {
+            @Override
+            protected void withNonPartitionFilter(Predicate predicate) {
+                scan.withValueFilter(predicate);
+            }
+        };
+    }
+
+    @Override
+    public TableRead newRead(boolean isStreaming) {
+        FileStoreRead read = store.newRead();
+        if (isStreaming) {
+            read.withDropDelete(false);

Review Comment:
   `Cannot drop delete message for append-only table.` in `FileStoreReadImpl`



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.file.FileStoreImpl;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.operation.FileStoreRead;
+import org.apache.flink.table.store.file.operation.FileStoreScan;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.table.store.table.source.TableScan;
+import org.apache.flink.table.store.table.source.ValueContentRowDataIterator;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Iterator;
+
+/** {@link FileStoreTable} for {@link WriteMode#APPEND_ONLY} write mode. */
+public class AppendOnlyFileStoreTable implements FileStoreTable {
+
+    private final Schema schema;
+    private final FileStoreImpl store;
+
+    AppendOnlyFileStoreTable(Schema schema, Configuration conf, String user) {
+        this.schema = schema;
+        this.store =
+                new FileStoreImpl(
+                        schema.id(),
+                        new FileStoreOptions(conf),
+                        WriteMode.APPEND_ONLY,
+                        user,
+                        schema.logicalPartitionType(),
+                        RowType.of(),
+                        schema.logicalRowType(),
+                        null);
+    }
+
+    @Override
+    public TableScan newScan(boolean isStreaming) {

Review Comment:
   isStream -> incremental?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #145: [FLINK-27875] Introduce TableScan and TableRead as an abstraction layer above FileStore for reading RowData

Posted by GitBox <gi...@apache.org>.
tsreaper commented on code in PR #145:
URL: https://github.com/apache/flink-table-store/pull/145#discussion_r887672733


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.file.FileStoreImpl;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.operation.FileStoreRead;
+import org.apache.flink.table.store.file.operation.FileStoreScan;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.table.store.table.source.TableScan;
+import org.apache.flink.table.store.table.source.ValueContentRowDataIterator;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Iterator;
+
+/** {@link FileStoreTable} for {@link WriteMode#APPEND_ONLY} write mode. */
+public class AppendOnlyFileStoreTable implements FileStoreTable {
+
+    private final Schema schema;
+    private final FileStoreImpl store;
+
+    AppendOnlyFileStoreTable(Schema schema, Configuration conf, String user) {
+        this.schema = schema;
+        this.store =
+                new FileStoreImpl(
+                        schema.id(),
+                        new FileStoreOptions(conf),
+                        WriteMode.APPEND_ONLY,
+                        user,
+                        schema.logicalPartitionType(),
+                        RowType.of(),
+                        schema.logicalRowType(),
+                        null);
+    }
+
+    @Override
+    public TableScan newScan(boolean isStreaming) {
+        FileStoreScan scan = store.newScan();
+        if (isStreaming) {
+            scan.withIncremental(true);
+        }
+
+        return new TableScan(scan, schema, store.pathFactory()) {
+            @Override
+            protected void withNonPartitionFilter(Predicate predicate) {
+                scan.withValueFilter(predicate);
+            }
+        };
+    }
+
+    @Override
+    public TableRead newRead(boolean isStreaming) {
+        FileStoreRead read = store.newRead();
+        if (isStreaming) {
+            read.withDropDelete(false);

Review Comment:
   True. But append-only tables only accepts `INSERT` messages. No `DELETE` messages will ever occur.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org