You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/22 09:15:42 UTC

[GitHub] [flink-table-store] openinx opened a new pull request, #99: [FLINK-27307] Flink table store support append-only ingestion without primary keys.

openinx opened a new pull request, #99:
URL: https://github.com/apache/flink-table-store/pull/99

   This PR is trying to provide table store the ability to accept append-only ingestion without any defined primary keys.
   
   The previous table store abstraction are built on top of primary keys,  so in theory all the read & write path will need to be reconsidered  or refactored, so that we can abstract the correct API which works fine for both primary keys storage and immutable logs (without primary keys). 
   
   The current version is a draft PR (Actually,  I'm not quite familiar with the flink-table-store project before, so I'm trying to implement this append-only abstraction to understand the API & implementation better). 
   
   There are TODO issues that I didn't consider clearly in this PRs ( I think I will need the next update to address those things): 
   
   1.  The append-only table's file level statistics are quite different with the primary key tables.  For example, the primary key tables will generate a  collection of `SstFileMeta`  when calling the `writer#prepareCommit()`, and then accomplish the first stage commit in the flink's two-phrase commit.  The `SstFileMeta`  will include the statistics for both key fields and value fields, while in the append-only table we don't have any key fields (its statistic information should include all columns' max-min, count etc.) . So  in theory, we are required to abstract the common file level statistic informations data structure for both two kinds of table; 
   
   2.  The different manifests design for both two kinds of tables.
   
   3.  What's the read API abstraction for those two kinds of tables.  I still don't have a clearly propose for it. Will try to update this PR for this.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #99: [FLINK-27307] Flink table store support append-only ingestion without primary keys.

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #99:
URL: https://github.com/apache/flink-table-store/pull/99#discussion_r865755197


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java:
##########
@@ -81,14 +87,42 @@ public FileStoreRead withValueProjection(int[][] projectedFields) {
     }
 
     @Override
-    public RecordReader createReader(BinaryRowData partition, int bucket, List<SstFileMeta> files)
+    public RecordReader createReader(BinaryRowData partition, int bucket, List<DataFileMeta> files)
             throws IOException {
+        switch (writeMode) {
+            case APPEND_ONLY:
+                return createAppendOnlyReader(partition, bucket, files);
+
+            case CHANGE_LOG:
+                return createLSMReader(partition, bucket, files);
+
+            default:
+                throw new UnsupportedOperationException("Unknown write mode: " + writeMode);
+        }
+    }
+
+    private RecordReader createAppendOnlyReader(
+            BinaryRowData partition, int bucket, List<DataFileMeta> files) throws IOException {
+        Preconditions.checkArgument(
+                !dropDelete, "Cannot drop delete message for append-only table.");
+
+        SstFileReader fileReader = sstFileReaderFactory.create(partition, bucket);

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on pull request #99: [FLINK-27307] Flink table store support append-only ingestion without primary keys.

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on PR #99:
URL: https://github.com/apache/flink-table-store/pull/99#issuecomment-1106466192

   > The different manifests design for both two kinds of tables.
   
   Can you clarify which parts of the design are different?
   
   > What's the read API abstraction for those two kinds of tables. I still don't have a clearly propose for it. Will try to update this PR for this.
   
   A new `RecordReader` too?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] openinx closed pull request #99: [FLINK-27307] Flink table store support append-only ingestion without primary keys.

Posted by GitBox <gi...@apache.org>.
openinx closed pull request #99: [FLINK-27307] Flink table store support append-only ingestion without primary keys. 
URL: https://github.com/apache/flink-table-store/pull/99


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #99: [FLINK-27307] Flink table store support append-only ingestion without primary keys.

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #99:
URL: https://github.com/apache/flink-table-store/pull/99#discussion_r865947712


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java:
##########
@@ -81,14 +87,42 @@ public FileStoreRead withValueProjection(int[][] projectedFields) {
     }
 
     @Override
-    public RecordReader createReader(BinaryRowData partition, int bucket, List<SstFileMeta> files)
+    public RecordReader createReader(BinaryRowData partition, int bucket, List<DataFileMeta> files)
             throws IOException {
+        switch (writeMode) {
+            case APPEND_ONLY:
+                return createAppendOnlyReader(partition, bucket, files);
+
+            case CHANGE_LOG:
+                return createLSMReader(partition, bucket, files);
+
+            default:
+                throw new UnsupportedOperationException("Unknown write mode: " + writeMode);
+        }
+    }
+
+    private RecordReader createAppendOnlyReader(
+            BinaryRowData partition, int bucket, List<DataFileMeta> files) throws IOException {
+        Preconditions.checkArgument(
+                !dropDelete, "Cannot drop delete message for append-only table.");
+
+        SstFileReader fileReader = sstFileReaderFactory.create(partition, bucket);

Review Comment:
   Changing very much classes can cause many conflicts, for example with https://github.com/apache/flink-table-store/pull/107
   Let's get this refactored as early as possible, I'm advancing it here: https://github.com/apache/flink-table-store/pull/108



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] openinx commented on a diff in pull request #99: [FLINK-27307] Flink table store support append-only ingestion without primary keys.

Posted by GitBox <gi...@apache.org>.
openinx commented on code in PR #99:
URL: https://github.com/apache/flink-table-store/pull/99#discussion_r858167070


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/WriteMode.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file;
+
+/** Defines the write mode for flink table store. */
+public enum WriteMode {
+    INSERT_ONLY(
+            "insert-only",
+            "The table can only accept append-only insert operations. All rows will be "
+                    + "inserted into the table store without any deduplication or primary/unique key constraint"),
+    DELETABLE("deletable", "The table can accept both insert and operations.");

Review Comment:
   Yes,  using the `changelog` sounds good to me.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/WriteMode.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file;
+
+/** Defines the write mode for flink table store. */
+public enum WriteMode {
+    INSERT_ONLY(
+            "insert-only",
+            "The table can only accept append-only insert operations. All rows will be "
+                    + "inserted into the table store without any deduplication or primary/unique key constraint"),
+    DELETABLE("deletable", "The table can accept both insert and operations.");

Review Comment:
   Yes,  using the `changelog` sounds good to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] openinx commented on a diff in pull request #99: [FLINK-27307] Flink table store support append-only ingestion without primary keys.

Posted by GitBox <gi...@apache.org>.
openinx commented on code in PR #99:
URL: https://github.com/apache/flink-table-store/pull/99#discussion_r870118913


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java:
##########
@@ -83,6 +89,34 @@ public FileStoreRead withValueProjection(int[][] projectedFields) {
     @Override
     public RecordReader createReader(BinaryRowData partition, int bucket, List<DataFileMeta> files)
             throws IOException {
+        switch (writeMode) {
+            case APPEND_ONLY:
+                return createAppendOnlyReader(partition, bucket, files);
+
+            case CHANGE_LOG:
+                return createLSMReader(partition, bucket, files);
+
+            default:
+                throw new UnsupportedOperationException("Unknown write mode: " + writeMode);
+        }
+    }
+
+    private RecordReader createAppendOnlyReader(
+            BinaryRowData partition, int bucket, List<DataFileMeta> files) throws IOException {
+        Preconditions.checkArgument(
+                !dropDelete, "Cannot drop delete message for append-only table.");

Review Comment:
   Oh,  I see there is a `withDropDelete` method to set `dropDelete` for the upper layer caller. 
   https://github.com/apache/flink-table-store/pull/99/files#diff-d148162de413a2e9954720d746159f062c0f05ec30b1bcd9bdb54161fb1d015fL65-L68
   
   So it's good to keep this check here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] openinx commented on a diff in pull request #99: [FLINK-27307] Flink table store support append-only ingestion without primary keys.

Posted by GitBox <gi...@apache.org>.
openinx commented on code in PR #99:
URL: https://github.com/apache/flink-table-store/pull/99#discussion_r865668148


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java:
##########
@@ -81,14 +87,42 @@ public FileStoreRead withValueProjection(int[][] projectedFields) {
     }
 
     @Override
-    public RecordReader createReader(BinaryRowData partition, int bucket, List<SstFileMeta> files)
+    public RecordReader createReader(BinaryRowData partition, int bucket, List<DataFileMeta> files)
             throws IOException {
+        switch (writeMode) {
+            case APPEND_ONLY:
+                return createAppendOnlyReader(partition, bucket, files);
+
+            case CHANGE_LOG:
+                return createLSMReader(partition, bucket, files);
+
+            default:
+                throw new UnsupportedOperationException("Unknown write mode: " + writeMode);
+        }
+    }
+
+    private RecordReader createAppendOnlyReader(
+            BinaryRowData partition, int bucket, List<DataFileMeta> files) throws IOException {
+        Preconditions.checkArgument(
+                !dropDelete, "Cannot drop delete message for append-only table.");
+
+        SstFileReader fileReader = sstFileReaderFactory.create(partition, bucket);

Review Comment:
   *TODO*
   
   Currently, both AppendOnlyReader and LSMReader are using the `SstFileReader`, while the `sst` concept should not be appeared in the append-only read path. So it's better to move this into a totally new `DataFileReader ` ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] openinx commented on a diff in pull request #99: [FLINK-27307] Flink table store support append-only ingestion without primary keys.

Posted by GitBox <gi...@apache.org>.
openinx commented on code in PR #99:
URL: https://github.com/apache/flink-table-store/pull/99#discussion_r865678648


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/AppendOnlyWriter.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.file.stats.FieldStatsCollector;
+import org.apache.flink.table.store.file.stats.FileStatsExtractor;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+/** A {@link RecordWriter} implementation that only accepts append-only records. */
+public class AppendOnlyWriter implements RecordWriter {
+    private final BulkWriter.Factory<RowData> writerFactory;
+    private RollingFileWriter<RowData, List<Path>> writer;
+    private final long targetFileSize;
+    private final Comparator<RowData> keyComparator;
+    private final RowDataSerializer keySerializer;
+    private final List<DataFileMeta> existingFiles;
+    private final List<DataFileMeta> newFiles;
+    private final Supplier<Path> pathCreator;
+    private final FileStatsExtractor fileStatsExtractor;
+    private final FieldStatsCollector fieldStatsCollector;
+
+    private final AtomicLong startingSeqNum;
+    private final AtomicLong nextSeqNum;
+    private final AtomicInteger rowCount;
+
+    private final AtomicReference<BinaryRowData> minKeyRef = new AtomicReference<>(null);
+    private final AtomicReference<BinaryRowData> maxKeyRef = new AtomicReference<>(null);
+
+    public AppendOnlyWriter(
+            FileFormat fileFormat,
+            long targetFileSize,
+            RowType writeSchema,
+            Comparator<RowData> keyComparator,
+            List<DataFileMeta> existingFiles,
+            Supplier<Path> pathCreator,
+            FileStatsExtractor fileStatsExtractor) {
+
+        this.writerFactory = fileFormat.createWriterFactory(writeSchema);
+
+        this.writer = new BaseRollingFileWriter(writerFactory, targetFileSize, pathCreator);
+        this.writer.register(new CloseWriterListener());
+
+        this.targetFileSize = targetFileSize;
+
+        this.keyComparator = keyComparator;
+        this.keySerializer = new RowDataSerializer(writeSchema);
+
+        this.existingFiles = existingFiles;
+        this.newFiles = new ArrayList<>();
+        this.pathCreator = pathCreator;
+        this.fileStatsExtractor = fileStatsExtractor;
+        this.fieldStatsCollector =
+                fileStatsExtractor == null ? new FieldStatsCollector(writeSchema) : null;
+
+        this.startingSeqNum = new AtomicLong(0L);
+        this.nextSeqNum = new AtomicLong(maxSequenceNumber() + 1);
+        this.rowCount = new AtomicInteger(0);
+    }
+
+    private long maxSequenceNumber() {
+        return existingFiles.stream()
+                .map(DataFileMeta::maxSequenceNumber)
+                .max(Long::compare)
+                .orElse(-1L);
+    }
+
+    @Override
+    public void append(RowData row) throws Exception {
+        writer.write(row);
+        if (fieldStatsCollector != null) {
+            fieldStatsCollector.collect(row);
+        }
+        updateMinKey(row);
+        updateMaxKey(row);
+
+        rowCount.incrementAndGet();
+        nextSeqNum.incrementAndGet();
+    }
+
+    private void updateMinKey(RowData row) {
+        BinaryRowData minKey = minKeyRef.get();
+        if ((minKey == null) || (keyComparator.compare(minKey, row) > 0)) {
+            minKey = keySerializer.toBinaryRow(row);
+        }
+        minKeyRef.set(minKey);
+    }
+
+    private void updateMaxKey(RowData row) {
+        BinaryRowData maxKey = maxKeyRef.get();
+        if ((maxKey == null) || (keyComparator.compare(maxKey, row) < 0)) {
+            maxKey = keySerializer.toBinaryRow(row);
+        }
+        maxKeyRef.set(maxKey);

Review Comment:
   That's quite time-consuming for the append-only table, because we will need to compare the whole row each time to get the max key and min key. 
   
   It's better to set the dummy constant key and set the whole row as the value, then we can skip to compare the whole row columns to get the max-min keys ( Because the key is always a constant).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #99: [FLINK-27307] Flink table store support append-only ingestion without primary keys.

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #99:
URL: https://github.com/apache/flink-table-store/pull/99#discussion_r856178226


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/WriteMode.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file;
+
+/** Defines the write mode for flink table store. */
+public enum WriteMode {
+    INSERT_ONLY(
+            "insert-only",
+            "The table can only accept append-only insert operations. All rows will be "
+                    + "inserted into the table store without any deduplication or primary/unique key constraint"),
+    DELETABLE("deletable", "The table can accept both insert and operations.");

Review Comment:
   Maybe just `changelog`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] openinx commented on pull request #99: [FLINK-27307] Flink table store support append-only ingestion without primary keys.

Posted by GitBox <gi...@apache.org>.
openinx commented on PR #99:
URL: https://github.com/apache/flink-table-store/pull/99#issuecomment-1118255607

   The failing case is : 
   
   ```
   [INFO] Running org.apache.flink.table.store.connector.StreamingWarehouseITCase
   Error:  Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 66.71 s <<< FAILURE! - in org.apache.flink.table.store.connector.StreamingWarehouseITCase
   Error:  testUserStory  Time elapsed: 47.664 s  <<< FAILURE!
   org.opentest4j.AssertionFailedError: 
   
   expected: "_ANONYMOUS_USER_"
    but was: "404NotFound"
   	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
   	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
   	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
   	at org.apache.flink.table.store.connector.StreamingWarehouseITCase.lambda$testUserStory$4(StreamingWarehouseITCase.java:211)
   	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
   	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
   	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
   	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
   	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
   	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
   	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
   	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
   	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
   	at org.apache.flink.table.store.connector.StreamingWarehouseITCase.testUserStory(StreamingWarehouseITCase.java:209)
   	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.lang.reflect.Method.invoke(Method.java:498)
   	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
   	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
   	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
   	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
   	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
   	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
   	at org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
   	at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
   	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
   	at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
   	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
   	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
   	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
   	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
   	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
   	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
   	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
   	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
   	at org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:30)
   	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
   	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
   	at org.junit.rules.RunRules.evaluate(RunRules.java:20)
   	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
   	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
   	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
   	at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
   	at org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
   	at org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
   	at org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
   	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:220)
   	at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:188)
   	at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:202)
   	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:181)
   	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128)
   	at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:142)
   	at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:113)
   	at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
   	at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
   	at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
   	at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
   
   [INFO] Running org.apache.flink.table.store.connector.ContinuousFileStoreITCase
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #99: [FLINK-27307] Flink table store support append-only ingestion without primary keys.

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #99:
URL: https://github.com/apache/flink-table-store/pull/99#discussion_r865755555


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java:
##########
@@ -81,14 +87,42 @@ public FileStoreRead withValueProjection(int[][] projectedFields) {
     }
 
     @Override
-    public RecordReader createReader(BinaryRowData partition, int bucket, List<SstFileMeta> files)
+    public RecordReader createReader(BinaryRowData partition, int bucket, List<DataFileMeta> files)
             throws IOException {
+        switch (writeMode) {
+            case APPEND_ONLY:
+                return createAppendOnlyReader(partition, bucket, files);
+
+            case CHANGE_LOG:
+                return createLSMReader(partition, bucket, files);
+
+            default:
+                throw new UnsupportedOperationException("Unknown write mode: " + writeMode);
+        }
+    }
+
+    private RecordReader createAppendOnlyReader(
+            BinaryRowData partition, int bucket, List<DataFileMeta> files) throws IOException {
+        Preconditions.checkArgument(
+                !dropDelete, "Cannot drop delete message for append-only table.");
+
+        SstFileReader fileReader = sstFileReaderFactory.create(partition, bucket);

Review Comment:
   I think we can start renaming in a separate JIRA/PR.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java:
##########
@@ -81,14 +87,42 @@ public FileStoreRead withValueProjection(int[][] projectedFields) {
     }
 
     @Override
-    public RecordReader createReader(BinaryRowData partition, int bucket, List<SstFileMeta> files)
+    public RecordReader createReader(BinaryRowData partition, int bucket, List<DataFileMeta> files)
             throws IOException {
+        switch (writeMode) {
+            case APPEND_ONLY:
+                return createAppendOnlyReader(partition, bucket, files);
+
+            case CHANGE_LOG:
+                return createLSMReader(partition, bucket, files);
+
+            default:
+                throw new UnsupportedOperationException("Unknown write mode: " + writeMode);
+        }
+    }
+
+    private RecordReader createAppendOnlyReader(
+            BinaryRowData partition, int bucket, List<DataFileMeta> files) throws IOException {
+        Preconditions.checkArgument(
+                !dropDelete, "Cannot drop delete message for append-only table.");
+
+        SstFileReader fileReader = sstFileReaderFactory.create(partition, bucket);

Review Comment:
   I think we can start renaming in a separate JIRA/PR now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on pull request #99: [FLINK-27307] Flink table store support append-only ingestion without primary keys.

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on PR #99:
URL: https://github.com/apache/flink-table-store/pull/99#issuecomment-1118388556

   > [INFO] Running org.apache.flink.table.store.connector.StreamingWarehouseITCase
   > Error:  Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 66.71 s <<< FAILURE! - in org.apache.flink.table.store.connector.StreamingWarehouseITCase
   > Error:  testUserStory  Time elapsed: 47.664 s  <<< FAILURE!
   > org.opentest4j.AssertionFailedError: 
   > 
   > expected: "_ANONYMOUS_USER_"
   >  but was: "404NotFound"
   
   This is a known unstable case...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] openinx commented on a diff in pull request #99: [FLINK-27307] Flink table store support append-only ingestion without primary keys.

Posted by GitBox <gi...@apache.org>.
openinx commented on code in PR #99:
URL: https://github.com/apache/flink-table-store/pull/99#discussion_r868744983


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java:
##########
@@ -82,18 +95,41 @@ public FileStoreWriteImpl(
     public RecordWriter createWriter(
             BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
         Long latestSnapshotId = pathFactory.latestSnapshotId();
-        if (latestSnapshotId == null) {
-            return createEmptyWriter(partition, bucket, compactExecutor);
-        } else {
-            return createMergeTreeWriter(
-                    partition,
-                    bucket,
-                    scan.withSnapshot(latestSnapshotId)
-                            .withPartitionFilter(Collections.singletonList(partition))
-                            .withBucket(bucket).plan().files().stream()
-                            .map(ManifestEntry::file)
-                            .collect(Collectors.toList()),
-                    compactExecutor);
+        List<DataFileMeta> existingFileMetas = Lists.newArrayList();
+        if (latestSnapshotId != null) {
+            // Concat all the DataFileMeta of existing files into existingFileMetas.
+            scan.withSnapshot(latestSnapshotId)
+                    .withPartitionFilter(Collections.singletonList(partition)).withBucket(bucket)
+                    .plan().files().stream()
+                    .map(ManifestEntry::file)
+                    .forEach(existingFileMetas::add);
+        }
+
+        switch (writeMode) {
+            case APPEND_ONLY:
+                DataFilePathFactory factory =
+                        pathFactory.createDataFilePathFactory(partition, bucket);
+                FileStatsExtractor fileStatsExtractor =
+                        fileFormat.createStatsExtractor(writeSchema).orElse(null);
+
+                return new AppendOnlyWriter(
+                        fileFormat,
+                        options.targetFileSize,
+                        writeSchema,
+                        existingFileMetas,
+                        factory,
+                        fileStatsExtractor);
+
+            case CHANGE_LOG:
+                if (latestSnapshotId == null) {
+                    return createEmptyWriter(partition, bucket, compactExecutor);
+                } else {
+                    return createMergeTreeWriter(
+                            partition, bucket, existingFileMetas, compactExecutor);
+                }

Review Comment:
   Make sense for me !



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java:
##########
@@ -34,30 +36,38 @@
 import org.apache.flink.table.store.file.mergetree.compact.CompactStrategy;
 import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
 import org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction;
+import org.apache.flink.table.store.file.stats.FileStatsExtractor;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.file.writer.AppendOnlyWriter;
 import org.apache.flink.table.store.file.writer.RecordWriter;
 import org.apache.flink.table.types.logical.RowType;
 
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Supplier;
-import java.util.stream.Collectors;
 
 /** Default implementation of {@link FileStoreWrite}. */
 public class FileStoreWriteImpl implements FileStoreWrite {
 
     private final DataFileReader.Factory dataFileReaderFactory;
     private final DataFileWriter.Factory dataFileWriterFactory;
+    private final WriteMode writeMode;
+    private final FileFormat fileFormat;
+    private final RowType writeSchema;
     private final Supplier<Comparator<RowData>> keyComparatorSupplier;
     private final MergeFunction mergeFunction;
     private final FileStorePathFactory pathFactory;
     private final FileStoreScan scan;
     private final MergeTreeOptions options;
 
     public FileStoreWriteImpl(
+            WriteMode writeMode,
+            RowType writeSchema,
             RowType keyType,
             RowType valueType,

Review Comment:
   The `writeSchema` can be removed now.  In the previous version, I was thinking that valueType would be some customized table schema (instead of the original table schema), such as `int _VALUE_COUNT` or some other type. But in fact, for append-only table the `valueType` will always be `tableSchema`.  So we don't have an extra `writeSchema` any more. Thanks for the comment.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/AppendOnlyWriter.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.data.DataFilePathFactory;
+import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.file.stats.FieldStatsCollector;
+import org.apache.flink.table.store.file.stats.FileStatsExtractor;
+import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+
+/** A {@link RecordWriter} implementation that only accepts append-only records. */
+public class AppendOnlyWriter implements RecordWriter {
+    private final BulkWriter.Factory<RowData> writerFactory;
+    private final RowType writeSchema;
+    private final long targetFileSize;
+    private final List<DataFileMeta> existingFiles;
+    private final DataFilePathFactory pathFactory;
+    private final FileStatsExtractor fileStatsExtractor;
+
+    private final AtomicLong nextSeqNum;
+
+    private RowRollingWriter writer;
+
+    public AppendOnlyWriter(
+            FileFormat fileFormat,
+            long targetFileSize,
+            RowType writeSchema,
+            List<DataFileMeta> existingFiles,
+            DataFilePathFactory pathFactory,
+            FileStatsExtractor fileStatsExtractor) {
+
+        this.writerFactory = fileFormat.createWriterFactory(writeSchema);
+        this.writeSchema = writeSchema;
+        this.targetFileSize = targetFileSize;
+
+        this.existingFiles = existingFiles;
+        this.pathFactory = pathFactory;
+        this.fileStatsExtractor = fileStatsExtractor;
+
+        this.nextSeqNum = new AtomicLong(maxSequenceNumber() + 1);
+
+        this.writer = createRollingRowWriter();
+    }
+
+    private long maxSequenceNumber() {
+        return existingFiles.stream()
+                .map(DataFileMeta::maxSequenceNumber)
+                .max(Long::compare)
+                .orElse(-1L);
+    }
+
+    @Override
+    public void write(ValueKind valueKind, RowData key, RowData value) throws Exception {
+        Preconditions.checkArgument(
+                valueKind == ValueKind.ADD,
+                "Append-only writer cannot accept ValueKind: " + valueKind);
+
+        writer.write(value);
+    }
+
+    @Override
+    public Increment prepareCommit() throws Exception {
+        List<DataFileMeta> newFiles = Lists.newArrayList();
+
+        if (writer != null) {
+            writer.close();
+
+            // Reopen the writer to accept further records.
+            newFiles.addAll(writer.result());
+            writer = createRollingRowWriter();
+        }
+
+        return new Increment(Lists.newArrayList(newFiles));
+    }
+
+    @Override
+    public void sync() throws Exception {
+        // TODO add the flush method for FileWriter.

Review Comment:
   This `TODO` can be removed now,  just addressed this in the sub-PR
   https://github.com/apache/flink-table-store/pull/115/files#diff-6051f27b47400cd176377579e93a35e31906ac53c44defa3d10cbafdd6b3af78R99-R102



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/AppendOnlyWriter.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.data.DataFilePathFactory;
+import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.file.stats.FieldStatsCollector;
+import org.apache.flink.table.store.file.stats.FileStatsExtractor;
+import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+
+/** A {@link RecordWriter} implementation that only accepts append-only records. */
+public class AppendOnlyWriter implements RecordWriter {
+    private final BulkWriter.Factory<RowData> writerFactory;
+    private final RowType writeSchema;
+    private final long targetFileSize;
+    private final List<DataFileMeta> existingFiles;
+    private final DataFilePathFactory pathFactory;
+    private final FileStatsExtractor fileStatsExtractor;
+
+    private final AtomicLong nextSeqNum;
+
+    private RowRollingWriter writer;
+
+    public AppendOnlyWriter(
+            FileFormat fileFormat,
+            long targetFileSize,
+            RowType writeSchema,
+            List<DataFileMeta> existingFiles,
+            DataFilePathFactory pathFactory,
+            FileStatsExtractor fileStatsExtractor) {
+
+        this.writerFactory = fileFormat.createWriterFactory(writeSchema);
+        this.writeSchema = writeSchema;
+        this.targetFileSize = targetFileSize;
+
+        this.existingFiles = existingFiles;
+        this.pathFactory = pathFactory;
+        this.fileStatsExtractor = fileStatsExtractor;
+
+        this.nextSeqNum = new AtomicLong(maxSequenceNumber() + 1);

Review Comment:
   Because we are accessing this `nextSeqNum` in an inner class https://github.com/apache/flink-table-store/pull/99/files#diff-6051f27b47400cd176377579e93a35e31906ac53c44defa3d10cbafdd6b3af78R173, which requires the variable to be defined as `final` reference.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMeta.java:
##########
@@ -36,6 +37,14 @@
 /** Metadata of a data file. */
 public class DataFileMeta {
 
+    // Append only data files don't have any value columns and meaningful level value. it will use

Review Comment:
   Yes, you are correct. I also found this issue.  Just addressed it in this sub-PR: https://github.com/apache/flink-table-store/pull/115/files#diff-267cc22799cbdc25038fd2a60cd6f292d5998d01b4b2dfe4334690bbc0deff0eR40-R41
   
   Thanks for the comment, @tsreaper ! 



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java:
##########
@@ -83,6 +89,34 @@ public FileStoreRead withValueProjection(int[][] projectedFields) {
     @Override
     public RecordReader createReader(BinaryRowData partition, int bucket, List<DataFileMeta> files)
             throws IOException {
+        switch (writeMode) {
+            case APPEND_ONLY:
+                return createAppendOnlyReader(partition, bucket, files);
+
+            case CHANGE_LOG:
+                return createLSMReader(partition, bucket, files);
+
+            default:
+                throw new UnsupportedOperationException("Unknown write mode: " + writeMode);
+        }
+    }
+
+    private RecordReader createAppendOnlyReader(
+            BinaryRowData partition, int bucket, List<DataFileMeta> files) throws IOException {
+        Preconditions.checkArgument(
+                !dropDelete, "Cannot drop delete message for append-only table.");

Review Comment:
   It's true. This check can be removed now, as the `dropDelete` will always be false for the append-only table.  In fact,  in the latest version I'm thinking that the best abstraction is decoupling the append-only reader/writers with the LSM reader writer so that the append-only reader/writer won't see any internal variable from LSM reader/writers,  but I'm still struggling some details.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/AppendOnlyWriter.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.data.DataFilePathFactory;
+import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.file.stats.FieldStatsCollector;
+import org.apache.flink.table.store.file.stats.FileStatsExtractor;
+import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+
+/** A {@link RecordWriter} implementation that only accepts append-only records. */
+public class AppendOnlyWriter implements RecordWriter {
+    private final BulkWriter.Factory<RowData> writerFactory;
+    private final RowType writeSchema;
+    private final long targetFileSize;
+    private final List<DataFileMeta> existingFiles;
+    private final DataFilePathFactory pathFactory;
+    private final FileStatsExtractor fileStatsExtractor;
+
+    private final AtomicLong nextSeqNum;
+
+    private RowRollingWriter writer;
+
+    public AppendOnlyWriter(
+            FileFormat fileFormat,
+            long targetFileSize,
+            RowType writeSchema,
+            List<DataFileMeta> existingFiles,
+            DataFilePathFactory pathFactory,
+            FileStatsExtractor fileStatsExtractor) {
+
+        this.writerFactory = fileFormat.createWriterFactory(writeSchema);
+        this.writeSchema = writeSchema;
+        this.targetFileSize = targetFileSize;
+
+        this.existingFiles = existingFiles;
+        this.pathFactory = pathFactory;
+        this.fileStatsExtractor = fileStatsExtractor;
+
+        this.nextSeqNum = new AtomicLong(maxSequenceNumber() + 1);
+
+        this.writer = createRollingRowWriter();
+    }
+
+    private long maxSequenceNumber() {
+        return existingFiles.stream()
+                .map(DataFileMeta::maxSequenceNumber)
+                .max(Long::compare)
+                .orElse(-1L);
+    }
+
+    @Override
+    public void write(ValueKind valueKind, RowData key, RowData value) throws Exception {
+        Preconditions.checkArgument(
+                valueKind == ValueKind.ADD,
+                "Append-only writer cannot accept ValueKind: " + valueKind);
+
+        writer.write(value);
+    }
+
+    @Override
+    public Increment prepareCommit() throws Exception {
+        List<DataFileMeta> newFiles = Lists.newArrayList();
+
+        if (writer != null) {
+            writer.close();
+
+            // Reopen the writer to accept further records.
+            newFiles.addAll(writer.result());
+            writer = createRollingRowWriter();
+        }
+
+        return new Increment(Lists.newArrayList(newFiles));
+    }
+
+    @Override
+    public void sync() throws Exception {
+        // TODO add the flush method for FileWriter.
+    }
+
+    @Override
+    public List<DataFileMeta> close() throws Exception {
+        List<DataFileMeta> result = Lists.newArrayList();
+        if (writer != null) {
+            writer.close();
+
+            result.addAll(writer.result());
+            writer = null;
+        }
+
+        return result;

Review Comment:
   Addressed this in the separate PR: https://github.com/apache/flink-table-store/pull/115/files#diff-6051f27b47400cd176377579e93a35e31906ac53c44defa3d10cbafdd6b3af78R111



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] openinx commented on pull request #99: [FLINK-27307] Flink table store support append-only ingestion without primary keys.

Posted by GitBox <gi...@apache.org>.
openinx commented on PR #99:
URL: https://github.com/apache/flink-table-store/pull/99#issuecomment-1132426379

   Closing this PR now because we've merge most of the changes. Thanks all for the reviewing & checking.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on pull request #99: [FLINK-27307] Flink table store support append-only ingestion without primary keys.

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on PR #99:
URL: https://github.com/apache/flink-table-store/pull/99#issuecomment-1106461636

   Can we set the append-only write file to an empty key? This allows for a good integration of these two modes.
   Actually, `SstFileMeta` can be renamed to `DataFileMeta`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #99: [FLINK-27307] Flink table store support append-only ingestion without primary keys.

Posted by GitBox <gi...@apache.org>.
tsreaper commented on code in PR #99:
URL: https://github.com/apache/flink-table-store/pull/99#discussion_r867696446


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/AppendOnlyWriter.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.data.DataFilePathFactory;
+import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.file.stats.FieldStatsCollector;
+import org.apache.flink.table.store.file.stats.FileStatsExtractor;
+import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+
+/** A {@link RecordWriter} implementation that only accepts append-only records. */
+public class AppendOnlyWriter implements RecordWriter {
+    private final BulkWriter.Factory<RowData> writerFactory;
+    private final RowType writeSchema;
+    private final long targetFileSize;
+    private final List<DataFileMeta> existingFiles;
+    private final DataFilePathFactory pathFactory;
+    private final FileStatsExtractor fileStatsExtractor;
+
+    private final AtomicLong nextSeqNum;
+
+    private RowRollingWriter writer;
+
+    public AppendOnlyWriter(
+            FileFormat fileFormat,
+            long targetFileSize,
+            RowType writeSchema,
+            List<DataFileMeta> existingFiles,
+            DataFilePathFactory pathFactory,
+            FileStatsExtractor fileStatsExtractor) {
+
+        this.writerFactory = fileFormat.createWriterFactory(writeSchema);
+        this.writeSchema = writeSchema;
+        this.targetFileSize = targetFileSize;
+
+        this.existingFiles = existingFiles;
+        this.pathFactory = pathFactory;
+        this.fileStatsExtractor = fileStatsExtractor;
+
+        this.nextSeqNum = new AtomicLong(maxSequenceNumber() + 1);
+
+        this.writer = createRollingRowWriter();
+    }
+
+    private long maxSequenceNumber() {
+        return existingFiles.stream()
+                .map(DataFileMeta::maxSequenceNumber)
+                .max(Long::compare)
+                .orElse(-1L);
+    }
+
+    @Override
+    public void write(ValueKind valueKind, RowData key, RowData value) throws Exception {
+        Preconditions.checkArgument(
+                valueKind == ValueKind.ADD,
+                "Append-only writer cannot accept ValueKind: " + valueKind);
+
+        writer.write(value);
+    }
+
+    @Override
+    public Increment prepareCommit() throws Exception {
+        List<DataFileMeta> newFiles = Lists.newArrayList();
+
+        if (writer != null) {
+            writer.close();
+
+            // Reopen the writer to accept further records.
+            newFiles.addAll(writer.result());
+            writer = createRollingRowWriter();
+        }
+
+        return new Increment(Lists.newArrayList(newFiles));
+    }
+
+    @Override
+    public void sync() throws Exception {
+        // TODO add the flush method for FileWriter.

Review Comment:
   Is this a must? Before each checkpoint and before the end of input `prepareCommit` will be called and in that method `writer.close()` is called. `writer.close()` will flush the writer.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/AppendOnlyWriter.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.data.DataFilePathFactory;
+import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.file.stats.FieldStatsCollector;
+import org.apache.flink.table.store.file.stats.FileStatsExtractor;
+import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+
+/** A {@link RecordWriter} implementation that only accepts append-only records. */
+public class AppendOnlyWriter implements RecordWriter {
+    private final BulkWriter.Factory<RowData> writerFactory;
+    private final RowType writeSchema;
+    private final long targetFileSize;
+    private final List<DataFileMeta> existingFiles;
+    private final DataFilePathFactory pathFactory;
+    private final FileStatsExtractor fileStatsExtractor;
+
+    private final AtomicLong nextSeqNum;
+
+    private RowRollingWriter writer;
+
+    public AppendOnlyWriter(
+            FileFormat fileFormat,
+            long targetFileSize,
+            RowType writeSchema,
+            List<DataFileMeta> existingFiles,
+            DataFilePathFactory pathFactory,
+            FileStatsExtractor fileStatsExtractor) {
+
+        this.writerFactory = fileFormat.createWriterFactory(writeSchema);
+        this.writeSchema = writeSchema;
+        this.targetFileSize = targetFileSize;
+
+        this.existingFiles = existingFiles;
+        this.pathFactory = pathFactory;
+        this.fileStatsExtractor = fileStatsExtractor;
+
+        this.nextSeqNum = new AtomicLong(maxSequenceNumber() + 1);
+
+        this.writer = createRollingRowWriter();
+    }
+
+    private long maxSequenceNumber() {
+        return existingFiles.stream()
+                .map(DataFileMeta::maxSequenceNumber)
+                .max(Long::compare)
+                .orElse(-1L);
+    }
+
+    @Override
+    public void write(ValueKind valueKind, RowData key, RowData value) throws Exception {
+        Preconditions.checkArgument(
+                valueKind == ValueKind.ADD,
+                "Append-only writer cannot accept ValueKind: " + valueKind);
+
+        writer.write(value);
+    }
+
+    @Override
+    public Increment prepareCommit() throws Exception {
+        List<DataFileMeta> newFiles = Lists.newArrayList();
+
+        if (writer != null) {
+            writer.close();
+
+            // Reopen the writer to accept further records.
+            newFiles.addAll(writer.result());
+            writer = createRollingRowWriter();
+        }
+
+        return new Increment(Lists.newArrayList(newFiles));
+    }
+
+    @Override
+    public void sync() throws Exception {
+        // TODO add the flush method for FileWriter.
+    }
+
+    @Override
+    public List<DataFileMeta> close() throws Exception {
+        List<DataFileMeta> result = Lists.newArrayList();
+        if (writer != null) {
+            writer.close();
+
+            result.addAll(writer.result());
+            writer = null;
+        }
+
+        return result;

Review Comment:
   Everything in the `result` are not committed and thus should be deleted. The return value of this method is the list of deleted files. See `MergeTreeWriter#close`.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMeta.java:
##########
@@ -36,6 +37,14 @@
 /** Metadata of a data file. */
 public class DataFileMeta {
 
+    // Append only data files don't have any value columns and meaningful level value. it will use

Review Comment:
   `don't have any value columns` -> `don't have any key columns`?



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java:
##########
@@ -34,30 +36,38 @@
 import org.apache.flink.table.store.file.mergetree.compact.CompactStrategy;
 import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
 import org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction;
+import org.apache.flink.table.store.file.stats.FileStatsExtractor;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.file.writer.AppendOnlyWriter;
 import org.apache.flink.table.store.file.writer.RecordWriter;
 import org.apache.flink.table.types.logical.RowType;
 
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Supplier;
-import java.util.stream.Collectors;
 
 /** Default implementation of {@link FileStoreWrite}. */
 public class FileStoreWriteImpl implements FileStoreWrite {
 
     private final DataFileReader.Factory dataFileReaderFactory;
     private final DataFileWriter.Factory dataFileWriterFactory;
+    private final WriteMode writeMode;
+    private final FileFormat fileFormat;
+    private final RowType writeSchema;
     private final Supplier<Comparator<RowData>> keyComparatorSupplier;
     private final MergeFunction mergeFunction;
     private final FileStorePathFactory pathFactory;
     private final FileStoreScan scan;
     private final MergeTreeOptions options;
 
     public FileStoreWriteImpl(
+            WriteMode writeMode,
+            RowType writeSchema,
             RowType keyType,
             RowType valueType,

Review Comment:
   What is this `writeSchema` for? We already have a `valueType`.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/AppendOnlyWriter.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.data.DataFilePathFactory;
+import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.file.stats.FieldStatsCollector;
+import org.apache.flink.table.store.file.stats.FileStatsExtractor;
+import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+
+/** A {@link RecordWriter} implementation that only accepts append-only records. */
+public class AppendOnlyWriter implements RecordWriter {
+    private final BulkWriter.Factory<RowData> writerFactory;
+    private final RowType writeSchema;
+    private final long targetFileSize;
+    private final List<DataFileMeta> existingFiles;
+    private final DataFilePathFactory pathFactory;
+    private final FileStatsExtractor fileStatsExtractor;
+
+    private final AtomicLong nextSeqNum;
+
+    private RowRollingWriter writer;
+
+    public AppendOnlyWriter(
+            FileFormat fileFormat,
+            long targetFileSize,
+            RowType writeSchema,
+            List<DataFileMeta> existingFiles,
+            DataFilePathFactory pathFactory,
+            FileStatsExtractor fileStatsExtractor) {
+
+        this.writerFactory = fileFormat.createWriterFactory(writeSchema);
+        this.writeSchema = writeSchema;
+        this.targetFileSize = targetFileSize;
+
+        this.existingFiles = existingFiles;
+        this.pathFactory = pathFactory;
+        this.fileStatsExtractor = fileStatsExtractor;
+
+        this.nextSeqNum = new AtomicLong(maxSequenceNumber() + 1);

Review Comment:
   Why `AtomicLong`? Each writer instance is used by only one thread.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java:
##########
@@ -82,18 +95,41 @@ public FileStoreWriteImpl(
     public RecordWriter createWriter(
             BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
         Long latestSnapshotId = pathFactory.latestSnapshotId();
-        if (latestSnapshotId == null) {
-            return createEmptyWriter(partition, bucket, compactExecutor);
-        } else {
-            return createMergeTreeWriter(
-                    partition,
-                    bucket,
-                    scan.withSnapshot(latestSnapshotId)
-                            .withPartitionFilter(Collections.singletonList(partition))
-                            .withBucket(bucket).plan().files().stream()
-                            .map(ManifestEntry::file)
-                            .collect(Collectors.toList()),
-                    compactExecutor);
+        List<DataFileMeta> existingFileMetas = Lists.newArrayList();
+        if (latestSnapshotId != null) {
+            // Concat all the DataFileMeta of existing files into existingFileMetas.
+            scan.withSnapshot(latestSnapshotId)
+                    .withPartitionFilter(Collections.singletonList(partition)).withBucket(bucket)
+                    .plan().files().stream()
+                    .map(ManifestEntry::file)
+                    .forEach(existingFileMetas::add);
+        }
+
+        switch (writeMode) {
+            case APPEND_ONLY:
+                DataFilePathFactory factory =
+                        pathFactory.createDataFilePathFactory(partition, bucket);
+                FileStatsExtractor fileStatsExtractor =
+                        fileFormat.createStatsExtractor(writeSchema).orElse(null);
+
+                return new AppendOnlyWriter(
+                        fileFormat,
+                        options.targetFileSize,
+                        writeSchema,
+                        existingFileMetas,
+                        factory,
+                        fileStatsExtractor);
+
+            case CHANGE_LOG:
+                if (latestSnapshotId == null) {
+                    return createEmptyWriter(partition, bucket, compactExecutor);
+                } else {
+                    return createMergeTreeWriter(
+                            partition, bucket, existingFileMetas, compactExecutor);
+                }

Review Comment:
   As we've calculated `existingFileMetas` we don't need a special `createEmptyWriter` method. Remove this method and this if branching.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java:
##########
@@ -83,6 +89,34 @@ public FileStoreRead withValueProjection(int[][] projectedFields) {
     @Override
     public RecordReader createReader(BinaryRowData partition, int bucket, List<DataFileMeta> files)
             throws IOException {
+        switch (writeMode) {
+            case APPEND_ONLY:
+                return createAppendOnlyReader(partition, bucket, files);
+
+            case CHANGE_LOG:
+                return createLSMReader(partition, bucket, files);
+
+            default:
+                throw new UnsupportedOperationException("Unknown write mode: " + writeMode);
+        }
+    }
+
+    private RecordReader createAppendOnlyReader(
+            BinaryRowData partition, int bucket, List<DataFileMeta> files) throws IOException {
+        Preconditions.checkArgument(
+                !dropDelete, "Cannot drop delete message for append-only table.");

Review Comment:
   Does this check make sense? There is no delete messages in an append only table so I guess the value of `dropDelete` does not matter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] openinx commented on a diff in pull request #99: [FLINK-27307] Flink table store support append-only ingestion without primary keys.

Posted by GitBox <gi...@apache.org>.
openinx commented on code in PR #99:
URL: https://github.com/apache/flink-table-store/pull/99#discussion_r865671595


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java:
##########
@@ -81,19 +94,43 @@ public FileStoreWriteImpl(
     @Override
     public RecordWriter createWriter(
             BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
+
         Long latestSnapshotId = pathFactory.latestSnapshotId();
-        if (latestSnapshotId == null) {
-            return createEmptyWriter(partition, bucket, compactExecutor);
-        } else {
-            return createMergeTreeWriter(
-                    partition,
-                    bucket,
-                    scan.withSnapshot(latestSnapshotId)
-                            .withPartitionFilter(Collections.singletonList(partition))
-                            .withBucket(bucket).plan().files().stream()
-                            .map(ManifestEntry::file)
-                            .collect(Collectors.toList()),
-                    compactExecutor);
+        List<DataFileMeta> existingFileMetas = Lists.newArrayList();
+        if (latestSnapshotId != null) {
+            // Concat all the DataFileMeta of existing files into existingFileMetas.
+            scan.withSnapshot(latestSnapshotId)
+                    .withPartitionFilter(Collections.singletonList(partition)).withBucket(bucket)
+                    .plan().files().stream()
+                    .map(ManifestEntry::file)
+                    .forEach(existingFileMetas::add);
+        }
+
+        switch (writeMode) {
+            case APPEND_ONLY:
+                SstPathFactory factory = pathFactory.createSstPathFactory(partition, bucket);

Review Comment:
   **TODO** :  This `SstPathFactory`  will also need to be refactored so that we don't expose the SST concept to the append-table table.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] openinx commented on a diff in pull request #99: [FLINK-27307] Flink table store support append-only ingestion without primary keys.

Posted by GitBox <gi...@apache.org>.
openinx commented on code in PR #99:
URL: https://github.com/apache/flink-table-store/pull/99#discussion_r865679996


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/AppendOnlyWriter.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.file.stats.FieldStatsCollector;
+import org.apache.flink.table.store.file.stats.FileStatsExtractor;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+/** A {@link RecordWriter} implementation that only accepts append-only records. */
+public class AppendOnlyWriter implements RecordWriter {
+    private final BulkWriter.Factory<RowData> writerFactory;
+    private RollingFileWriter<RowData, List<Path>> writer;
+    private final long targetFileSize;
+    private final Comparator<RowData> keyComparator;
+    private final RowDataSerializer keySerializer;
+    private final List<DataFileMeta> existingFiles;
+    private final List<DataFileMeta> newFiles;
+    private final Supplier<Path> pathCreator;
+    private final FileStatsExtractor fileStatsExtractor;
+    private final FieldStatsCollector fieldStatsCollector;
+
+    private final AtomicLong startingSeqNum;
+    private final AtomicLong nextSeqNum;
+    private final AtomicInteger rowCount;
+
+    private final AtomicReference<BinaryRowData> minKeyRef = new AtomicReference<>(null);
+    private final AtomicReference<BinaryRowData> maxKeyRef = new AtomicReference<>(null);
+
+    public AppendOnlyWriter(
+            FileFormat fileFormat,
+            long targetFileSize,
+            RowType writeSchema,
+            Comparator<RowData> keyComparator,
+            List<DataFileMeta> existingFiles,
+            Supplier<Path> pathCreator,
+            FileStatsExtractor fileStatsExtractor) {
+
+        this.writerFactory = fileFormat.createWriterFactory(writeSchema);
+
+        this.writer = new BaseRollingFileWriter(writerFactory, targetFileSize, pathCreator);
+        this.writer.register(new CloseWriterListener());
+
+        this.targetFileSize = targetFileSize;
+
+        this.keyComparator = keyComparator;
+        this.keySerializer = new RowDataSerializer(writeSchema);
+
+        this.existingFiles = existingFiles;
+        this.newFiles = new ArrayList<>();
+        this.pathCreator = pathCreator;
+        this.fileStatsExtractor = fileStatsExtractor;
+        this.fieldStatsCollector =
+                fileStatsExtractor == null ? new FieldStatsCollector(writeSchema) : null;
+
+        this.startingSeqNum = new AtomicLong(0L);
+        this.nextSeqNum = new AtomicLong(maxSequenceNumber() + 1);
+        this.rowCount = new AtomicInteger(0);
+    }
+
+    private long maxSequenceNumber() {
+        return existingFiles.stream()
+                .map(DataFileMeta::maxSequenceNumber)
+                .max(Long::compare)
+                .orElse(-1L);
+    }
+
+    @Override
+    public void append(RowData row) throws Exception {
+        writer.write(row);
+        if (fieldStatsCollector != null) {
+            fieldStatsCollector.collect(row);
+        }
+        updateMinKey(row);
+        updateMaxKey(row);
+
+        rowCount.incrementAndGet();
+        nextSeqNum.incrementAndGet();
+    }
+
+    private void updateMinKey(RowData row) {
+        BinaryRowData minKey = minKeyRef.get();
+        if ((minKey == null) || (keyComparator.compare(minKey, row) > 0)) {
+            minKey = keySerializer.toBinaryRow(row);
+        }
+        minKeyRef.set(minKey);
+    }
+
+    private void updateMaxKey(RowData row) {
+        BinaryRowData maxKey = maxKeyRef.get();
+        if ((maxKey == null) || (keyComparator.compare(maxKey, row) < 0)) {
+            maxKey = keySerializer.toBinaryRow(row);
+        }
+        maxKeyRef.set(maxKey);

Review Comment:
   We can also skip the key value serialization for each encountered row...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] openinx commented on a diff in pull request #99: [FLINK-27307] Flink table store support append-only ingestion without primary keys.

Posted by GitBox <gi...@apache.org>.
openinx commented on code in PR #99:
URL: https://github.com/apache/flink-table-store/pull/99#discussion_r865755438


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java:
##########
@@ -52,10 +55,13 @@
         implements StatefulSinkWriter<RowData, WriterStateT>,
                 PrecommittingSinkWriter<RowData, Committable> {
 
+    private static final BinaryRowData DUMMY_KEY = new BinaryRowData(0);

Review Comment:
   It's recommended to use `BinaryRowDataUtil.EMPTY_ROW` to get ride of NullPointerException when accessing the `segments` inside the `BinaryRowData`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org