You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/25 08:08:24 UTC

[GitHub] [flink-table-store] tsreaper opened a new pull request #14: [FLINK-25803] Implement partition and bucket filter in FileStoreScanImpl

tsreaper opened a new pull request #14:
URL: https://github.com/apache/flink-table-store/pull/14


   This PR implements partition and bucket filter in `FileStoreScanImpl`. It also perform some small optimizations such as concurrent reads in `FileStoreScanImpl`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #14: [FLINK-25803] Implement partition and bucket filter in FileStoreScanImpl

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #14:
URL: https://github.com/apache/flink-table-store/pull/14#discussion_r791549241



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
##########
@@ -109,34 +176,72 @@ public Long snapshotId() {
     }
 
     private List<ManifestEntry> scan() {
-        Map<ManifestEntry.Identifier, ManifestEntry> map = new LinkedHashMap<>();
-        for (ManifestFileMeta manifest : manifests) {
-            // TODO read each manifest file concurrently
-            for (ManifestEntry entry : manifestFile.read(manifest.fileName())) {
-                ManifestEntry.Identifier identifier = entry.identifier();
-                switch (entry.kind()) {
-                    case ADD:
-                        Preconditions.checkState(
-                                !map.containsKey(identifier),
-                                "Trying to add file %s which is already added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.put(identifier, entry);
-                        break;
-                    case DELETE:
-                        Preconditions.checkState(
-                                map.containsKey(identifier),
-                                "Trying to delete file %s which is not previously added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.remove(identifier);
-                        break;
-                    default:
-                        throw new UnsupportedOperationException(
-                                "Unknown value kind " + entry.kind().name());
-                }
+        List<ManifestEntry> entries;
+        try {
+            entries =
+                    new ForkJoinPool()

Review comment:
       I see...
   ```
       public final ForkJoinTask<V> fork() {
           Thread t;
           if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
               ((ForkJoinWorkerThread)t).workQueue.push(this);
           else
               ForkJoinPool.common.externalPush(this);
           return this;
       }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #14: [FLINK-25803] Implement partition and bucket filter in FileStoreScanImpl

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #14:
URL: https://github.com/apache/flink-table-store/pull/14#discussion_r791597053



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
##########
@@ -109,34 +176,72 @@ public Long snapshotId() {
     }
 
     private List<ManifestEntry> scan() {
-        Map<ManifestEntry.Identifier, ManifestEntry> map = new LinkedHashMap<>();
-        for (ManifestFileMeta manifest : manifests) {
-            // TODO read each manifest file concurrently
-            for (ManifestEntry entry : manifestFile.read(manifest.fileName())) {
-                ManifestEntry.Identifier identifier = entry.identifier();
-                switch (entry.kind()) {
-                    case ADD:
-                        Preconditions.checkState(
-                                !map.containsKey(identifier),
-                                "Trying to add file %s which is already added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.put(identifier, entry);
-                        break;
-                    case DELETE:
-                        Preconditions.checkState(
-                                map.containsKey(identifier),
-                                "Trying to delete file %s which is not previously added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.remove(identifier);
-                        break;
-                    default:
-                        throw new UnsupportedOperationException(
-                                "Unknown value kind " + entry.kind().name());
-                }
+        List<ManifestEntry> entries;
+        try {
+            entries =
+                    new ForkJoinPool()

Review comment:
       We can create static pool like JVM do:
   ```
       /**
        * Common (static) pool. Non-null for public use unless a static
        * construction exception, but internal usages null-check on use
        * to paranoically avoid potential initialization circularities
        * as well as to simplify generated code.
        */
       static final ForkJoinPool common;
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #14: [FLINK-25803] Implement partition and bucket filter in FileStoreScanImpl

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #14:
URL: https://github.com/apache/flink-table-store/pull/14#discussion_r791598277



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
##########
@@ -109,34 +176,72 @@ public Long snapshotId() {
     }
 
     private List<ManifestEntry> scan() {
-        Map<ManifestEntry.Identifier, ManifestEntry> map = new LinkedHashMap<>();
-        for (ManifestFileMeta manifest : manifests) {
-            // TODO read each manifest file concurrently
-            for (ManifestEntry entry : manifestFile.read(manifest.fileName())) {
-                ManifestEntry.Identifier identifier = entry.identifier();
-                switch (entry.kind()) {
-                    case ADD:
-                        Preconditions.checkState(
-                                !map.containsKey(identifier),
-                                "Trying to add file %s which is already added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.put(identifier, entry);
-                        break;
-                    case DELETE:
-                        Preconditions.checkState(
-                                map.containsKey(identifier),
-                                "Trying to delete file %s which is not previously added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.remove(identifier);
-                        break;
-                    default:
-                        throw new UnsupportedOperationException(
-                                "Unknown value kind " + entry.kind().name());
-                }
+        List<ManifestEntry> entries;
+        try {
+            entries =
+                    new ForkJoinPool()

Review comment:
       Also, user can configure pool size through `System.getProperty`, just like `ForkJoinPool.makeCommonPool`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #14: [FLINK-25803] Implement partition and bucket filter in FileStoreScanImpl

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #14:
URL: https://github.com/apache/flink-table-store/pull/14#discussion_r791468220



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
##########
@@ -18,62 +18,129 @@
 
 package org.apache.flink.table.store.file.operation;
 
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileFormat;
 import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.manifest.ManifestEntry;
 import org.apache.flink.table.store.file.manifest.ManifestFile;
 import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
 import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.predicate.And;
+import org.apache.flink.table.store.file.predicate.Equal;
+import org.apache.flink.table.store.file.predicate.Literal;
+import org.apache.flink.table.store.file.predicate.Or;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
-import java.util.LinkedHashMap;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 /** Default implementation of {@link FileStoreScan}. */
 public class FileStoreScanImpl implements FileStoreScan {
 
+    private final RowType partitionType;
+    private final RowType keyType;
+    private final RowType rowType;
+    private final FileFormat fileFormat;
     private final FileStorePathFactory pathFactory;
-    private final ManifestFile manifestFile;
+
+    private final List<RowData.FieldGetter> partitionFieldGetters;
     private final ManifestList manifestList;
 
     private Long snapshotId;
     private List<ManifestFileMeta> manifests;
+    private Predicate partitionFilter;
+    private Predicate keyFilter;
+    private Predicate valueFilter;
+    private Integer bucket;
 
     public FileStoreScanImpl(
-            FileStorePathFactory pathFactory,
-            ManifestFile manifestFile,
-            ManifestList manifestList) {
+            RowType partitionType,
+            RowType keyType,
+            RowType rowType,
+            FileFormat fileFormat,
+            FileStorePathFactory pathFactory) {
+        this.partitionType = partitionType;
+        this.keyType = keyType;
+        this.rowType = rowType;
+        this.fileFormat = fileFormat;
         this.pathFactory = pathFactory;
-        this.manifestFile = manifestFile;
-        this.manifestList = manifestList;
+
+        this.partitionFieldGetters =

Review comment:
       Can we extract `RowToArrayConverter`?

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
##########
@@ -31,6 +32,8 @@
 
     FileStoreScan withPartitionFilter(Predicate predicate);
 
+    FileStoreScan withPartitionFilter(List<BinaryRowData> partitions);

Review comment:
       `inPartitions`?

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
##########
@@ -106,10 +115,12 @@ public SstPathFactory createSstPathFactory(BinaryRowData partition, int bucket)
     }
 
     public String getPartitionString(BinaryRowData partition) {
-        return PartitionPathUtils.generatePartitionPath(
-                partitionComputer.generatePartValues(
-                        Preconditions.checkNotNull(
-                                partition, "Partition row data is null. This is unexpected.")));
+        synchronized (partitionComputer) {

Review comment:
       Why needs add `synchronized`?

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
##########
@@ -109,34 +176,72 @@ public Long snapshotId() {
     }
 
     private List<ManifestEntry> scan() {
-        Map<ManifestEntry.Identifier, ManifestEntry> map = new LinkedHashMap<>();
-        for (ManifestFileMeta manifest : manifests) {
-            // TODO read each manifest file concurrently
-            for (ManifestEntry entry : manifestFile.read(manifest.fileName())) {
-                ManifestEntry.Identifier identifier = entry.identifier();
-                switch (entry.kind()) {
-                    case ADD:
-                        Preconditions.checkState(
-                                !map.containsKey(identifier),
-                                "Trying to add file %s which is already added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.put(identifier, entry);
-                        break;
-                    case DELETE:
-                        Preconditions.checkState(
-                                map.containsKey(identifier),
-                                "Trying to delete file %s which is not previously added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.remove(identifier);
-                        break;
-                    default:
-                        throw new UnsupportedOperationException(
-                                "Unknown value kind " + entry.kind().name());
-                }
+        List<ManifestEntry> entries;
+        try {
+            entries =
+                    new ForkJoinPool()

Review comment:
       Why use `ForkJoinPool`?

##########
File path: flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTestBase.java
##########
@@ -71,7 +67,7 @@ public void beforeEach() throws IOException {
         root.getFileSystem().mkdirs(new Path(root + "/snapshot"));
     }
 
-    protected abstract String getSchema();
+    protected abstract String getScheme();

Review comment:
       `getSchema`?

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
##########
@@ -109,34 +176,72 @@ public Long snapshotId() {
     }
 
     private List<ManifestEntry> scan() {
-        Map<ManifestEntry.Identifier, ManifestEntry> map = new LinkedHashMap<>();
-        for (ManifestFileMeta manifest : manifests) {
-            // TODO read each manifest file concurrently
-            for (ManifestEntry entry : manifestFile.read(manifest.fileName())) {
-                ManifestEntry.Identifier identifier = entry.identifier();
-                switch (entry.kind()) {
-                    case ADD:
-                        Preconditions.checkState(
-                                !map.containsKey(identifier),
-                                "Trying to add file %s which is already added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.put(identifier, entry);
-                        break;
-                    case DELETE:
-                        Preconditions.checkState(
-                                map.containsKey(identifier),
-                                "Trying to delete file %s which is not previously added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.remove(identifier);
-                        break;
-                    default:
-                        throw new UnsupportedOperationException(
-                                "Unknown value kind " + entry.kind().name());
-                }
+        List<ManifestEntry> entries;
+        try {
+            entries =
+                    new ForkJoinPool()
+                            .submit(
+                                    () ->
+                                            manifests
+                                                    .parallelStream()
+                                                    .filter(this::filterManifestFileMeta)
+                                                    .flatMap(m -> readManifestFileMeta(m).stream())

Review comment:
       filter `filterManifestEntry` in `readManifestFileMeta(m).stream()`?

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
##########
@@ -109,34 +176,72 @@ public Long snapshotId() {
     }
 
     private List<ManifestEntry> scan() {
-        Map<ManifestEntry.Identifier, ManifestEntry> map = new LinkedHashMap<>();
-        for (ManifestFileMeta manifest : manifests) {
-            // TODO read each manifest file concurrently
-            for (ManifestEntry entry : manifestFile.read(manifest.fileName())) {
-                ManifestEntry.Identifier identifier = entry.identifier();
-                switch (entry.kind()) {
-                    case ADD:
-                        Preconditions.checkState(
-                                !map.containsKey(identifier),
-                                "Trying to add file %s which is already added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.put(identifier, entry);
-                        break;
-                    case DELETE:
-                        Preconditions.checkState(
-                                map.containsKey(identifier),
-                                "Trying to delete file %s which is not previously added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.remove(identifier);
-                        break;
-                    default:
-                        throw new UnsupportedOperationException(
-                                "Unknown value kind " + entry.kind().name());
-                }
+        List<ManifestEntry> entries;
+        try {
+            entries =
+                    new ForkJoinPool()
+                            .submit(
+                                    () ->
+                                            manifests
+                                                    .parallelStream()
+                                                    .filter(this::filterManifestFileMeta)
+                                                    .flatMap(m -> readManifestFileMeta(m).stream())
+                                                    .filter(this::filterManifestEntry)
+                                                    .collect(Collectors.toList()))
+                            .get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException("Failed to read ManifestEntry list concurrently", e);
+        }
+
+        Map<ManifestEntry.Identifier, ManifestEntry> map = new HashMap<>();
+        for (ManifestEntry entry : entries) {
+            ManifestEntry.Identifier identifier = entry.identifier();
+            switch (entry.kind()) {
+                case ADD:
+                    Preconditions.checkState(
+                            !map.containsKey(identifier),
+                            "Trying to add file %s which is already added. "
+                                    + "Manifest might be corrupted.",
+                            identifier);
+                    map.put(identifier, entry);
+                    break;
+                case DELETE:
+                    Preconditions.checkState(
+                            map.containsKey(identifier),
+                            "Trying to delete file %s which is not previously added. "
+                                    + "Manifest might be corrupted.",
+                            identifier);
+                    map.remove(identifier);
+                    break;
+                default:
+                    throw new UnsupportedOperationException(
+                            "Unknown value kind " + entry.kind().name());
             }
         }
         return new ArrayList<>(map.values());
     }
+
+    private boolean filterManifestFileMeta(ManifestFileMeta manifest) {
+        return partitionFilter == null
+                || partitionFilter.test(
+                        manifest.numAddedFiles() + manifest.numDeletedFiles(),
+                        manifest.partitionStats());
+    }
+
+    private boolean filterManifestEntry(ManifestEntry entry) {
+        // TODO apply key & value filter after field stats are collected in
+        //  SstFile.RollingFile#finish
+        return (partitionFilter == null
+                        || partitionFilter.test(
+                                partitionFieldGetters.stream()
+                                        .map(g -> g.getFieldOrNull(entry.partition()))
+                                        .toArray()))
+                && (bucket == null || entry.bucket() == bucket);
+    }
+
+    private List<ManifestEntry> readManifestFileMeta(ManifestFileMeta manifest) {
+        // ManifestFile is not thread safe
+        return new ManifestFile(partitionType, keyType, rowType, fileFormat, pathFactory)

Review comment:
       Create a thread local to reuse `ManifestFile` object?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on pull request #14: [FLINK-25803] Implement partition and bucket filter in FileStoreScanImpl

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on pull request #14:
URL: https://github.com/apache/flink-table-store/pull/14#issuecomment-1022865817


   We need think about how to reduced the cost of FileFormat.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #14: [FLINK-25803] Implement partition and bucket filter in FileStoreScanImpl

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #14:
URL: https://github.com/apache/flink-table-store/pull/14#discussion_r793263919



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
##########
@@ -109,34 +172,77 @@ public Long snapshotId() {
     }
 
     private List<ManifestEntry> scan() {
-        Map<ManifestEntry.Identifier, ManifestEntry> map = new LinkedHashMap<>();
-        for (ManifestFileMeta manifest : manifests) {
-            // TODO read each manifest file concurrently
-            for (ManifestEntry entry : manifestFile.read(manifest.fileName())) {
-                ManifestEntry.Identifier identifier = entry.identifier();
-                switch (entry.kind()) {
-                    case ADD:
-                        Preconditions.checkState(
-                                !map.containsKey(identifier),
-                                "Trying to add file %s which is already added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.put(identifier, entry);
-                        break;
-                    case DELETE:
-                        Preconditions.checkState(
-                                map.containsKey(identifier),
-                                "Trying to delete file %s which is not previously added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.remove(identifier);
-                        break;
-                    default:
-                        throw new UnsupportedOperationException(
-                                "Unknown value kind " + entry.kind().name());
-                }
+        List<ManifestEntry> entries;
+        try {
+            entries =
+                    FileUtils.COMMON_IO_FORK_JOIN_POOL
+                            .submit(
+                                    () ->
+                                            manifests
+                                                    .parallelStream()
+                                                    .filter(this::filterManifestFileMeta)
+                                                    .flatMap(m -> readManifestFileMeta(m).stream())
+                                                    .filter(this::filterManifestEntry)
+                                                    .collect(Collectors.toList()))
+                            .get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException("Failed to read ManifestEntry list concurrently", e);
+        }
+
+        Map<ManifestEntry.Identifier, ManifestEntry> map = new HashMap<>();
+        for (ManifestEntry entry : entries) {
+            ManifestEntry.Identifier identifier = entry.identifier();
+            switch (entry.kind()) {
+                case ADD:
+                    Preconditions.checkState(
+                            !map.containsKey(identifier),
+                            "Trying to add file %s which is already added. "
+                                    + "Manifest might be corrupted.",
+                            identifier);
+                    map.put(identifier, entry);
+                    break;
+                case DELETE:
+                    Preconditions.checkState(
+                            map.containsKey(identifier),
+                            "Trying to delete file %s which is not previously added. "
+                                    + "Manifest might be corrupted.",
+                            identifier);
+                    map.remove(identifier);
+                    break;
+                default:
+                    throw new UnsupportedOperationException(
+                            "Unknown value kind " + entry.kind().name());
             }
         }
         return new ArrayList<>(map.values());
     }
+
+    private boolean filterManifestFileMeta(ManifestFileMeta manifest) {
+        return partitionFilter == null
+                || partitionFilter.test(
+                        manifest.numAddedFiles() + manifest.numDeletedFiles(),
+                        manifest.partitionStats());
+    }
+
+    private boolean filterManifestEntry(ManifestEntry entry) {
+        // TODO apply key & value filter after field stats are collected in
+        //  SstFile.RollingFile#finish
+        return (partitionFilter == null
+                        || partitionFilter.test(partitionConverter.convert(entry.partition())))
+                && (bucket == null || entry.bucket() == bucket);
+    }
+
+    private List<ManifestEntry> readManifestFileMeta(ManifestFileMeta manifest) {
+        return manifestFile().read(manifest.fileName());
+    }
+
+    private ManifestFile manifestFile() {
+        ManifestFile manifestFile = manifestFileThreadLocal.get();
+        if (manifestFile == null) {
+            manifestFile =
+                    new ManifestFile(partitionType, keyType, rowType, fileFormat, pathFactory);
+            manifestFileThreadLocal.set(manifestFile);

Review comment:
       We don't need this thread local if we have reduced the cost of `FileFormat`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] tsreaper commented on a change in pull request #14: [FLINK-25803] Implement partition and bucket filter in FileStoreScanImpl

Posted by GitBox <gi...@apache.org>.
tsreaper commented on a change in pull request #14:
URL: https://github.com/apache/flink-table-store/pull/14#discussion_r791603751



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
##########
@@ -109,34 +176,72 @@ public Long snapshotId() {
     }
 
     private List<ManifestEntry> scan() {
-        Map<ManifestEntry.Identifier, ManifestEntry> map = new LinkedHashMap<>();
-        for (ManifestFileMeta manifest : manifests) {
-            // TODO read each manifest file concurrently
-            for (ManifestEntry entry : manifestFile.read(manifest.fileName())) {
-                ManifestEntry.Identifier identifier = entry.identifier();
-                switch (entry.kind()) {
-                    case ADD:
-                        Preconditions.checkState(
-                                !map.containsKey(identifier),
-                                "Trying to add file %s which is already added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.put(identifier, entry);
-                        break;
-                    case DELETE:
-                        Preconditions.checkState(
-                                map.containsKey(identifier),
-                                "Trying to delete file %s which is not previously added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.remove(identifier);
-                        break;
-                    default:
-                        throw new UnsupportedOperationException(
-                                "Unknown value kind " + entry.kind().name());
-                }
+        List<ManifestEntry> entries;
+        try {
+            entries =
+                    new ForkJoinPool()
+                            .submit(
+                                    () ->
+                                            manifests
+                                                    .parallelStream()
+                                                    .filter(this::filterManifestFileMeta)
+                                                    .flatMap(m -> readManifestFileMeta(m).stream())

Review comment:
       They are the same. Java's stream API is like Flink's job graph. It will not execute immediately but will build a stream graph and will only execute when collected.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] tsreaper commented on a change in pull request #14: [FLINK-25803] Implement partition and bucket filter in FileStoreScanImpl

Posted by GitBox <gi...@apache.org>.
tsreaper commented on a change in pull request #14:
URL: https://github.com/apache/flink-table-store/pull/14#discussion_r791590842



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
##########
@@ -106,10 +115,12 @@ public SstPathFactory createSstPathFactory(BinaryRowData partition, int bucket)
     }
 
     public String getPartitionString(BinaryRowData partition) {
-        return PartitionPathUtils.generatePartitionPath(
-                partitionComputer.generatePartValues(
-                        Preconditions.checkNotNull(
-                                partition, "Partition row data is null. This is unexpected.")));
+        synchronized (partitionComputer) {

Review comment:
       After discussion we decided to mark some methods as thread-safe while others as not-thread-safe.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] tsreaper commented on a change in pull request #14: [FLINK-25803] Implement partition and bucket filter in FileStoreScanImpl

Posted by GitBox <gi...@apache.org>.
tsreaper commented on a change in pull request #14:
URL: https://github.com/apache/flink-table-store/pull/14#discussion_r793200709



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.mergetree.MergeTree;
+import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
+import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
+import org.apache.flink.table.store.file.mergetree.sst.SstFile;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+/** Default implementation of {@link FileStoreWrite}. */
+public class FileStoreWriteImpl implements FileStoreWrite {
+
+    private final RowType keyType;
+    private final RowType rowType;
+    private final Comparator<RowData> keyComparator;
+    private final Accumulator accumulator;
+    private final FileFormat fileFormat;
+    private final FileStorePathFactory pathFactory;
+    private final MergeTreeOptions mergeTreeOptions;
+
+    private final FileStoreScan scan;
+
+    public FileStoreWriteImpl(
+            RowType partitionType,
+            RowType keyType,
+            RowType rowType,
+            Comparator<RowData> keyComparator,
+            Accumulator accumulator,
+            FileFormat fileFormat,
+            FileStorePathFactory pathFactory,
+            MergeTreeOptions mergeTreeOptions) {
+        this.keyType = keyType;
+        this.rowType = rowType;
+        this.keyComparator = keyComparator;
+        this.accumulator = accumulator;
+        this.fileFormat = fileFormat;
+        this.pathFactory = pathFactory;
+        this.mergeTreeOptions = mergeTreeOptions;
+
+        this.scan = new FileStoreScanImpl(partitionType, keyType, rowType, fileFormat, pathFactory);
+    }
+
+    @Override
+    public RecordWriter createWriter(
+            BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
+        Long latestSnapshotId = pathFactory.latestSnapshotId();
+        if (latestSnapshotId == null) {
+            return createEmptyWriter(partition, bucket, compactExecutor);
+        } else {
+            MergeTree mergeTree = createMergeTree(partition, bucket, compactExecutor);
+            return mergeTree.createWriter(
+                    scan.withSnapshot(latestSnapshotId)
+                            .withPartitionFilter(Collections.singletonList(partition))
+                            .withBucket(bucket).plan().files().stream()
+                            .map(ManifestEntry::file)
+                            .collect(Collectors.toList()));
+        }
+    }
+
+    @Override
+    public RecordWriter createEmptyWriter(
+            BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
+        MergeTree mergeTree = createMergeTree(partition, bucket, compactExecutor);
+        return mergeTree.createWriter(Collections.emptyList());
+    }
+
+    private MergeTree createMergeTree(

Review comment:
       Could you elaborate more on what exactly should be considered? Writers will be created when a new partition or bucket is written. As each bucket is itself a `MergeTree` we'll need to create one anyway.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #14: [FLINK-25803] Implement partition and bucket filter in FileStoreScanImpl

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #14:
URL: https://github.com/apache/flink-table-store/pull/14#discussion_r792394148



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.mergetree.MergeTree;
+import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
+import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
+import org.apache.flink.table.store.file.mergetree.sst.SstFile;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+/** Default implementation of {@link FileStoreWrite}. */
+public class FileStoreWriteImpl implements FileStoreWrite {
+
+    private final RowType keyType;
+    private final RowType rowType;
+    private final Comparator<RowData> keyComparator;
+    private final Accumulator accumulator;
+    private final FileFormat fileFormat;
+    private final FileStorePathFactory pathFactory;
+    private final MergeTreeOptions mergeTreeOptions;
+
+    private final FileStoreScan scan;
+
+    public FileStoreWriteImpl(
+            RowType partitionType,
+            RowType keyType,
+            RowType rowType,
+            Comparator<RowData> keyComparator,
+            Accumulator accumulator,
+            FileFormat fileFormat,
+            FileStorePathFactory pathFactory,
+            MergeTreeOptions mergeTreeOptions) {
+        this.keyType = keyType;
+        this.rowType = rowType;
+        this.keyComparator = keyComparator;
+        this.accumulator = accumulator;
+        this.fileFormat = fileFormat;
+        this.pathFactory = pathFactory;
+        this.mergeTreeOptions = mergeTreeOptions;
+
+        this.scan = new FileStoreScanImpl(partitionType, keyType, rowType, fileFormat, pathFactory);
+    }
+
+    @Override
+    public RecordWriter createWriter(
+            BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
+        Long latestSnapshotId = pathFactory.latestSnapshotId();
+        if (latestSnapshotId == null) {
+            return createEmptyWriter(partition, bucket, compactExecutor);
+        } else {
+            MergeTree mergeTree = createMergeTree(partition, bucket, compactExecutor);
+            return mergeTree.createWriter(
+                    scan.withSnapshot(latestSnapshotId)
+                            .withPartitionFilter(Collections.singletonList(partition))
+                            .withBucket(bucket).plan().files().stream()
+                            .map(ManifestEntry::file)
+                            .collect(Collectors.toList()));
+        }
+    }
+
+    @Override
+    public RecordWriter createEmptyWriter(
+            BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
+        MergeTree mergeTree = createMergeTree(partition, bucket, compactExecutor);
+        return mergeTree.createWriter(Collections.emptyList());
+    }
+
+    private MergeTree createMergeTree(
+            BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
+        SstFile sstFile =
+                new SstFile(

Review comment:
       Can `readerFactory` and `writerFactory` be reused in `SstFile`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] tsreaper commented on a change in pull request #14: [FLINK-25803] Implement partition and bucket filter in FileStoreScanImpl

Posted by GitBox <gi...@apache.org>.
tsreaper commented on a change in pull request #14:
URL: https://github.com/apache/flink-table-store/pull/14#discussion_r791536943



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
##########
@@ -109,34 +176,72 @@ public Long snapshotId() {
     }
 
     private List<ManifestEntry> scan() {
-        Map<ManifestEntry.Identifier, ManifestEntry> map = new LinkedHashMap<>();
-        for (ManifestFileMeta manifest : manifests) {
-            // TODO read each manifest file concurrently
-            for (ManifestEntry entry : manifestFile.read(manifest.fileName())) {
-                ManifestEntry.Identifier identifier = entry.identifier();
-                switch (entry.kind()) {
-                    case ADD:
-                        Preconditions.checkState(
-                                !map.containsKey(identifier),
-                                "Trying to add file %s which is already added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.put(identifier, entry);
-                        break;
-                    case DELETE:
-                        Preconditions.checkState(
-                                map.containsKey(identifier),
-                                "Trying to delete file %s which is not previously added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.remove(identifier);
-                        break;
-                    default:
-                        throw new UnsupportedOperationException(
-                                "Unknown value kind " + entry.kind().name());
-                }
+        List<ManifestEntry> entries;
+        try {
+            entries =
+                    new ForkJoinPool()

Review comment:
       If we want to use `parallelStream` we need `ForkJoinPool`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] tsreaper commented on a change in pull request #14: [FLINK-25803] Implement partition and bucket filter in FileStoreScanImpl

Posted by GitBox <gi...@apache.org>.
tsreaper commented on a change in pull request #14:
URL: https://github.com/apache/flink-table-store/pull/14#discussion_r791536162



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
##########
@@ -106,10 +115,12 @@ public SstPathFactory createSstPathFactory(BinaryRowData partition, int bucket)
     }
 
     public String getPartitionString(BinaryRowData partition) {
-        return PartitionPathUtils.generatePartitionPath(
-                partitionComputer.generatePartValues(
-                        Preconditions.checkNotNull(
-                                partition, "Partition row data is null. This is unexpected.")));
+        synchronized (partitionComputer) {

Review comment:
       Because we have to make sure that this class is thread safe. Partition computer is not thread safe.
   
   Another option is to create a read-only path factory which only supports a limited number of methods.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #14: [FLINK-25803] Implement partition and bucket filter in FileStoreScanImpl

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #14:
URL: https://github.com/apache/flink-table-store/pull/14#discussion_r793265462



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.mergetree.MergeTree;
+import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
+import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
+import org.apache.flink.table.store.file.mergetree.sst.SstFile;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+/** Default implementation of {@link FileStoreWrite}. */
+public class FileStoreWriteImpl implements FileStoreWrite {
+
+    private final RowType keyType;
+    private final RowType rowType;
+    private final Comparator<RowData> keyComparator;
+    private final Accumulator accumulator;
+    private final FileFormat fileFormat;
+    private final FileStorePathFactory pathFactory;
+    private final MergeTreeOptions mergeTreeOptions;
+
+    private final FileStoreScan scan;
+
+    public FileStoreWriteImpl(
+            RowType partitionType,
+            RowType keyType,
+            RowType rowType,
+            Comparator<RowData> keyComparator,
+            Accumulator accumulator,
+            FileFormat fileFormat,
+            FileStorePathFactory pathFactory,
+            MergeTreeOptions mergeTreeOptions) {
+        this.keyType = keyType;
+        this.rowType = rowType;
+        this.keyComparator = keyComparator;
+        this.accumulator = accumulator;
+        this.fileFormat = fileFormat;
+        this.pathFactory = pathFactory;
+        this.mergeTreeOptions = mergeTreeOptions;
+
+        this.scan = new FileStoreScanImpl(partitionType, keyType, rowType, fileFormat, pathFactory);
+    }
+
+    @Override
+    public RecordWriter createWriter(
+            BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
+        Long latestSnapshotId = pathFactory.latestSnapshotId();
+        if (latestSnapshotId == null) {
+            return createEmptyWriter(partition, bucket, compactExecutor);
+        } else {
+            MergeTree mergeTree = createMergeTree(partition, bucket, compactExecutor);
+            return mergeTree.createWriter(
+                    scan.withSnapshot(latestSnapshotId)
+                            .withPartitionFilter(Collections.singletonList(partition))
+                            .withBucket(bucket).plan().files().stream()
+                            .map(ManifestEntry::file)
+                            .collect(Collectors.toList()));
+        }
+    }
+
+    @Override
+    public RecordWriter createEmptyWriter(
+            BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
+        MergeTree mergeTree = createMergeTree(partition, bucket, compactExecutor);
+        return mergeTree.createWriter(Collections.emptyList());
+    }
+
+    private MergeTree createMergeTree(
+            BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
+        SstFile sstFile =
+                new SstFile(
+                        keyType,
+                        rowType,
+                        fileFormat,
+                        pathFactory.createSstPathFactory(partition, bucket),

Review comment:
       > Can uuid be reused in SstPathFactory from FileStorePathFactory?
    
   Any update?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi merged pull request #14: [FLINK-25803] Implement partition and bucket filter in FileStoreScanImpl

Posted by GitBox <gi...@apache.org>.
JingsongLi merged pull request #14:
URL: https://github.com/apache/flink-table-store/pull/14


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #14: [FLINK-25803] Implement partition and bucket filter in FileStoreScanImpl

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #14:
URL: https://github.com/apache/flink-table-store/pull/14#discussion_r791551477



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
##########
@@ -109,34 +176,72 @@ public Long snapshotId() {
     }
 
     private List<ManifestEntry> scan() {
-        Map<ManifestEntry.Identifier, ManifestEntry> map = new LinkedHashMap<>();
-        for (ManifestFileMeta manifest : manifests) {
-            // TODO read each manifest file concurrently
-            for (ManifestEntry entry : manifestFile.read(manifest.fileName())) {
-                ManifestEntry.Identifier identifier = entry.identifier();
-                switch (entry.kind()) {
-                    case ADD:
-                        Preconditions.checkState(
-                                !map.containsKey(identifier),
-                                "Trying to add file %s which is already added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.put(identifier, entry);
-                        break;
-                    case DELETE:
-                        Preconditions.checkState(
-                                map.containsKey(identifier),
-                                "Trying to delete file %s which is not previously added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.remove(identifier);
-                        break;
-                    default:
-                        throw new UnsupportedOperationException(
-                                "Unknown value kind " + entry.kind().name());
-                }
+        List<ManifestEntry> entries;
+        try {
+            entries =
+                    new ForkJoinPool()

Review comment:
       lack shutdown?
   I think we can pass this pool from outside.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #14: [FLINK-25803] Implement partition and bucket filter in FileStoreScanImpl

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #14:
URL: https://github.com/apache/flink-table-store/pull/14#discussion_r792393666



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.mergetree.MergeTree;
+import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
+import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
+import org.apache.flink.table.store.file.mergetree.sst.SstFile;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+/** Default implementation of {@link FileStoreWrite}. */
+public class FileStoreWriteImpl implements FileStoreWrite {
+
+    private final RowType keyType;
+    private final RowType rowType;
+    private final Comparator<RowData> keyComparator;
+    private final Accumulator accumulator;
+    private final FileFormat fileFormat;
+    private final FileStorePathFactory pathFactory;
+    private final MergeTreeOptions mergeTreeOptions;
+
+    private final FileStoreScan scan;
+
+    public FileStoreWriteImpl(
+            RowType partitionType,
+            RowType keyType,
+            RowType rowType,
+            Comparator<RowData> keyComparator,
+            Accumulator accumulator,
+            FileFormat fileFormat,
+            FileStorePathFactory pathFactory,
+            MergeTreeOptions mergeTreeOptions) {
+        this.keyType = keyType;
+        this.rowType = rowType;
+        this.keyComparator = keyComparator;
+        this.accumulator = accumulator;
+        this.fileFormat = fileFormat;
+        this.pathFactory = pathFactory;
+        this.mergeTreeOptions = mergeTreeOptions;
+
+        this.scan = new FileStoreScanImpl(partitionType, keyType, rowType, fileFormat, pathFactory);
+    }
+
+    @Override
+    public RecordWriter createWriter(
+            BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
+        Long latestSnapshotId = pathFactory.latestSnapshotId();
+        if (latestSnapshotId == null) {
+            return createEmptyWriter(partition, bucket, compactExecutor);
+        } else {
+            MergeTree mergeTree = createMergeTree(partition, bucket, compactExecutor);
+            return mergeTree.createWriter(
+                    scan.withSnapshot(latestSnapshotId)
+                            .withPartitionFilter(Collections.singletonList(partition))
+                            .withBucket(bucket).plan().files().stream()
+                            .map(ManifestEntry::file)
+                            .collect(Collectors.toList()));
+        }
+    }
+
+    @Override
+    public RecordWriter createEmptyWriter(
+            BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
+        MergeTree mergeTree = createMergeTree(partition, bucket, compactExecutor);
+        return mergeTree.createWriter(Collections.emptyList());
+    }
+
+    private MergeTree createMergeTree(
+            BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
+        SstFile sstFile =
+                new SstFile(
+                        keyType,
+                        rowType,
+                        fileFormat,
+                        pathFactory.createSstPathFactory(partition, bucket),

Review comment:
       Can uuid be reused in `SstPathFactory` from `FileStorePathFactory`?

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.mergetree.MergeTree;
+import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
+import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
+import org.apache.flink.table.store.file.mergetree.sst.SstFile;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+/** Default implementation of {@link FileStoreWrite}. */
+public class FileStoreWriteImpl implements FileStoreWrite {
+
+    private final RowType keyType;
+    private final RowType rowType;
+    private final Comparator<RowData> keyComparator;
+    private final Accumulator accumulator;
+    private final FileFormat fileFormat;
+    private final FileStorePathFactory pathFactory;
+    private final MergeTreeOptions mergeTreeOptions;
+
+    private final FileStoreScan scan;
+
+    public FileStoreWriteImpl(
+            RowType partitionType,
+            RowType keyType,
+            RowType rowType,
+            Comparator<RowData> keyComparator,
+            Accumulator accumulator,
+            FileFormat fileFormat,
+            FileStorePathFactory pathFactory,
+            MergeTreeOptions mergeTreeOptions) {
+        this.keyType = keyType;
+        this.rowType = rowType;
+        this.keyComparator = keyComparator;
+        this.accumulator = accumulator;
+        this.fileFormat = fileFormat;
+        this.pathFactory = pathFactory;
+        this.mergeTreeOptions = mergeTreeOptions;
+
+        this.scan = new FileStoreScanImpl(partitionType, keyType, rowType, fileFormat, pathFactory);
+    }
+
+    @Override
+    public RecordWriter createWriter(
+            BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
+        Long latestSnapshotId = pathFactory.latestSnapshotId();
+        if (latestSnapshotId == null) {
+            return createEmptyWriter(partition, bucket, compactExecutor);
+        } else {
+            MergeTree mergeTree = createMergeTree(partition, bucket, compactExecutor);
+            return mergeTree.createWriter(
+                    scan.withSnapshot(latestSnapshotId)
+                            .withPartitionFilter(Collections.singletonList(partition))
+                            .withBucket(bucket).plan().files().stream()
+                            .map(ManifestEntry::file)
+                            .collect(Collectors.toList()));
+        }
+    }
+
+    @Override
+    public RecordWriter createEmptyWriter(
+            BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
+        MergeTree mergeTree = createMergeTree(partition, bucket, compactExecutor);
+        return mergeTree.createWriter(Collections.emptyList());
+    }
+
+    private MergeTree createMergeTree(
+            BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
+        SstFile sstFile =
+                new SstFile(

Review comment:
       Can `readerFactory` and `writerFactory` be reused?

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.mergetree.MergeTree;
+import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
+import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
+import org.apache.flink.table.store.file.mergetree.sst.SstFile;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+/** Default implementation of {@link FileStoreWrite}. */
+public class FileStoreWriteImpl implements FileStoreWrite {
+
+    private final RowType keyType;
+    private final RowType rowType;
+    private final Comparator<RowData> keyComparator;
+    private final Accumulator accumulator;
+    private final FileFormat fileFormat;
+    private final FileStorePathFactory pathFactory;
+    private final MergeTreeOptions mergeTreeOptions;
+
+    private final FileStoreScan scan;
+
+    public FileStoreWriteImpl(
+            RowType partitionType,
+            RowType keyType,
+            RowType rowType,
+            Comparator<RowData> keyComparator,
+            Accumulator accumulator,
+            FileFormat fileFormat,
+            FileStorePathFactory pathFactory,
+            MergeTreeOptions mergeTreeOptions) {
+        this.keyType = keyType;
+        this.rowType = rowType;
+        this.keyComparator = keyComparator;
+        this.accumulator = accumulator;
+        this.fileFormat = fileFormat;
+        this.pathFactory = pathFactory;
+        this.mergeTreeOptions = mergeTreeOptions;
+
+        this.scan = new FileStoreScanImpl(partitionType, keyType, rowType, fileFormat, pathFactory);
+    }
+
+    @Override
+    public RecordWriter createWriter(
+            BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
+        Long latestSnapshotId = pathFactory.latestSnapshotId();
+        if (latestSnapshotId == null) {
+            return createEmptyWriter(partition, bucket, compactExecutor);
+        } else {
+            MergeTree mergeTree = createMergeTree(partition, bucket, compactExecutor);
+            return mergeTree.createWriter(
+                    scan.withSnapshot(latestSnapshotId)
+                            .withPartitionFilter(Collections.singletonList(partition))
+                            .withBucket(bucket).plan().files().stream()
+                            .map(ManifestEntry::file)
+                            .collect(Collectors.toList()));
+        }
+    }
+
+    @Override
+    public RecordWriter createEmptyWriter(
+            BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
+        MergeTree mergeTree = createMergeTree(partition, bucket, compactExecutor);
+        return mergeTree.createWriter(Collections.emptyList());
+    }
+
+    private MergeTree createMergeTree(

Review comment:
       We should consider the cost for creating a `MergeTree`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] tsreaper commented on a change in pull request #14: [FLINK-25803] Implement partition and bucket filter in FileStoreScanImpl

Posted by GitBox <gi...@apache.org>.
tsreaper commented on a change in pull request #14:
URL: https://github.com/apache/flink-table-store/pull/14#discussion_r791536943



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
##########
@@ -109,34 +176,72 @@ public Long snapshotId() {
     }
 
     private List<ManifestEntry> scan() {
-        Map<ManifestEntry.Identifier, ManifestEntry> map = new LinkedHashMap<>();
-        for (ManifestFileMeta manifest : manifests) {
-            // TODO read each manifest file concurrently
-            for (ManifestEntry entry : manifestFile.read(manifest.fileName())) {
-                ManifestEntry.Identifier identifier = entry.identifier();
-                switch (entry.kind()) {
-                    case ADD:
-                        Preconditions.checkState(
-                                !map.containsKey(identifier),
-                                "Trying to add file %s which is already added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.put(identifier, entry);
-                        break;
-                    case DELETE:
-                        Preconditions.checkState(
-                                map.containsKey(identifier),
-                                "Trying to delete file %s which is not previously added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.remove(identifier);
-                        break;
-                    default:
-                        throw new UnsupportedOperationException(
-                                "Unknown value kind " + entry.kind().name());
-                }
+        List<ManifestEntry> entries;
+        try {
+            entries =
+                    new ForkJoinPool()

Review comment:
       If we want to use `parallelStream` we need `ForkJoinPool`. If we don't specify one it will use a public pool shared across the whole JVM. As we're doing IO here it is very likely to block other tasks which use the public pool.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] tsreaper commented on a change in pull request #14: [FLINK-25803] Implement partition and bucket filter in FileStoreScanImpl

Posted by GitBox <gi...@apache.org>.
tsreaper commented on a change in pull request #14:
URL: https://github.com/apache/flink-table-store/pull/14#discussion_r791589339



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
##########
@@ -31,6 +32,8 @@
 
     FileStoreScan withPartitionFilter(Predicate predicate);
 
+    FileStoreScan withPartitionFilter(List<BinaryRowData> partitions);

Review comment:
       No need. If I want to filter out partitions I would change the method name.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #14: [FLINK-25803] Implement partition and bucket filter in FileStoreScanImpl

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #14:
URL: https://github.com/apache/flink-table-store/pull/14#discussion_r793271957



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java
##########
@@ -51,6 +52,8 @@
         DEFAULT_READER_CONFIG.setInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 1);
     }
 
+    public static final ForkJoinPool COMMON_IO_FORK_JOIN_POOL = new ForkJoinPool();

Review comment:
       A good habits is naming the thread.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] tsreaper commented on a change in pull request #14: [FLINK-25803] Implement partition and bucket filter in FileStoreScanImpl

Posted by GitBox <gi...@apache.org>.
tsreaper commented on a change in pull request #14:
URL: https://github.com/apache/flink-table-store/pull/14#discussion_r791535456



##########
File path: flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTestBase.java
##########
@@ -71,7 +67,7 @@ public void beforeEach() throws IOException {
         root.getFileSystem().mkdirs(new Path(root + "/snapshot"));
     }
 
-    protected abstract String getSchema();
+    protected abstract String getScheme();

Review comment:
       Scheme. This is the file system scheme, not table schema.

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
##########
@@ -106,10 +115,12 @@ public SstPathFactory createSstPathFactory(BinaryRowData partition, int bucket)
     }
 
     public String getPartitionString(BinaryRowData partition) {
-        return PartitionPathUtils.generatePartitionPath(
-                partitionComputer.generatePartValues(
-                        Preconditions.checkNotNull(
-                                partition, "Partition row data is null. This is unexpected.")));
+        synchronized (partitionComputer) {

Review comment:
       Because we have to make sure that this class is thread safe. Partition computer is not thread safe.
   
   Another option is to create a read-only path factory which only supports a limited number of methods.

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
##########
@@ -109,34 +176,72 @@ public Long snapshotId() {
     }
 
     private List<ManifestEntry> scan() {
-        Map<ManifestEntry.Identifier, ManifestEntry> map = new LinkedHashMap<>();
-        for (ManifestFileMeta manifest : manifests) {
-            // TODO read each manifest file concurrently
-            for (ManifestEntry entry : manifestFile.read(manifest.fileName())) {
-                ManifestEntry.Identifier identifier = entry.identifier();
-                switch (entry.kind()) {
-                    case ADD:
-                        Preconditions.checkState(
-                                !map.containsKey(identifier),
-                                "Trying to add file %s which is already added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.put(identifier, entry);
-                        break;
-                    case DELETE:
-                        Preconditions.checkState(
-                                map.containsKey(identifier),
-                                "Trying to delete file %s which is not previously added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.remove(identifier);
-                        break;
-                    default:
-                        throw new UnsupportedOperationException(
-                                "Unknown value kind " + entry.kind().name());
-                }
+        List<ManifestEntry> entries;
+        try {
+            entries =
+                    new ForkJoinPool()

Review comment:
       If we want to use `parallelStream` we need `ForkJoinPool`.

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
##########
@@ -109,34 +176,72 @@ public Long snapshotId() {
     }
 
     private List<ManifestEntry> scan() {
-        Map<ManifestEntry.Identifier, ManifestEntry> map = new LinkedHashMap<>();
-        for (ManifestFileMeta manifest : manifests) {
-            // TODO read each manifest file concurrently
-            for (ManifestEntry entry : manifestFile.read(manifest.fileName())) {
-                ManifestEntry.Identifier identifier = entry.identifier();
-                switch (entry.kind()) {
-                    case ADD:
-                        Preconditions.checkState(
-                                !map.containsKey(identifier),
-                                "Trying to add file %s which is already added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.put(identifier, entry);
-                        break;
-                    case DELETE:
-                        Preconditions.checkState(
-                                map.containsKey(identifier),
-                                "Trying to delete file %s which is not previously added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.remove(identifier);
-                        break;
-                    default:
-                        throw new UnsupportedOperationException(
-                                "Unknown value kind " + entry.kind().name());
-                }
+        List<ManifestEntry> entries;
+        try {
+            entries =
+                    new ForkJoinPool()

Review comment:
       If we want to use `parallelStream` we need `ForkJoinPool`. If we don't specify one it will use a public pool shared across the whole JVM. As we're doing IO here it is very likely to block other tasks which use the public pool.

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
##########
@@ -31,6 +32,8 @@
 
     FileStoreScan withPartitionFilter(Predicate predicate);
 
+    FileStoreScan withPartitionFilter(List<BinaryRowData> partitions);

Review comment:
       No need. If I want to filter out partitions I would change the method name.

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
##########
@@ -106,10 +115,12 @@ public SstPathFactory createSstPathFactory(BinaryRowData partition, int bucket)
     }
 
     public String getPartitionString(BinaryRowData partition) {
-        return PartitionPathUtils.generatePartitionPath(
-                partitionComputer.generatePartValues(
-                        Preconditions.checkNotNull(
-                                partition, "Partition row data is null. This is unexpected.")));
+        synchronized (partitionComputer) {

Review comment:
       After discussion we decided to mark some methods as thread-safe while others as not-thread-safe.

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
##########
@@ -109,34 +176,72 @@ public Long snapshotId() {
     }
 
     private List<ManifestEntry> scan() {
-        Map<ManifestEntry.Identifier, ManifestEntry> map = new LinkedHashMap<>();
-        for (ManifestFileMeta manifest : manifests) {
-            // TODO read each manifest file concurrently
-            for (ManifestEntry entry : manifestFile.read(manifest.fileName())) {
-                ManifestEntry.Identifier identifier = entry.identifier();
-                switch (entry.kind()) {
-                    case ADD:
-                        Preconditions.checkState(
-                                !map.containsKey(identifier),
-                                "Trying to add file %s which is already added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.put(identifier, entry);
-                        break;
-                    case DELETE:
-                        Preconditions.checkState(
-                                map.containsKey(identifier),
-                                "Trying to delete file %s which is not previously added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.remove(identifier);
-                        break;
-                    default:
-                        throw new UnsupportedOperationException(
-                                "Unknown value kind " + entry.kind().name());
-                }
+        List<ManifestEntry> entries;
+        try {
+            entries =
+                    new ForkJoinPool()
+                            .submit(
+                                    () ->
+                                            manifests
+                                                    .parallelStream()
+                                                    .filter(this::filterManifestFileMeta)
+                                                    .flatMap(m -> readManifestFileMeta(m).stream())

Review comment:
       They are the same. Java's stream API is like Flink's job graph. It will not execute immediately but will build a stream graph and will only execute when collected.

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
##########
@@ -106,10 +115,12 @@ public SstPathFactory createSstPathFactory(BinaryRowData partition, int bucket)
     }
 
     public String getPartitionString(BinaryRowData partition) {
-        return PartitionPathUtils.generatePartitionPath(
-                partitionComputer.generatePartValues(
-                        Preconditions.checkNotNull(
-                                partition, "Partition row data is null. This is unexpected.")));
+        synchronized (partitionComputer) {

Review comment:
       After discussion we decided to mark some methods as thread-safe while others as not-thread-safe.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] tsreaper commented on a change in pull request #14: [FLINK-25803] Implement partition and bucket filter in FileStoreScanImpl

Posted by GitBox <gi...@apache.org>.
tsreaper commented on a change in pull request #14:
URL: https://github.com/apache/flink-table-store/pull/14#discussion_r793202205



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.mergetree.MergeTree;
+import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
+import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
+import org.apache.flink.table.store.file.mergetree.sst.SstFile;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+/** Default implementation of {@link FileStoreWrite}. */
+public class FileStoreWriteImpl implements FileStoreWrite {
+
+    private final RowType keyType;
+    private final RowType rowType;
+    private final Comparator<RowData> keyComparator;
+    private final Accumulator accumulator;
+    private final FileFormat fileFormat;
+    private final FileStorePathFactory pathFactory;
+    private final MergeTreeOptions mergeTreeOptions;
+
+    private final FileStoreScan scan;
+
+    public FileStoreWriteImpl(
+            RowType partitionType,
+            RowType keyType,
+            RowType rowType,
+            Comparator<RowData> keyComparator,
+            Accumulator accumulator,
+            FileFormat fileFormat,
+            FileStorePathFactory pathFactory,
+            MergeTreeOptions mergeTreeOptions) {
+        this.keyType = keyType;
+        this.rowType = rowType;
+        this.keyComparator = keyComparator;
+        this.accumulator = accumulator;
+        this.fileFormat = fileFormat;
+        this.pathFactory = pathFactory;
+        this.mergeTreeOptions = mergeTreeOptions;
+
+        this.scan = new FileStoreScanImpl(partitionType, keyType, rowType, fileFormat, pathFactory);
+    }
+
+    @Override
+    public RecordWriter createWriter(
+            BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
+        Long latestSnapshotId = pathFactory.latestSnapshotId();
+        if (latestSnapshotId == null) {
+            return createEmptyWriter(partition, bucket, compactExecutor);
+        } else {
+            MergeTree mergeTree = createMergeTree(partition, bucket, compactExecutor);
+            return mergeTree.createWriter(
+                    scan.withSnapshot(latestSnapshotId)
+                            .withPartitionFilter(Collections.singletonList(partition))
+                            .withBucket(bucket).plan().files().stream()
+                            .map(ManifestEntry::file)
+                            .collect(Collectors.toList()));
+        }
+    }
+
+    @Override
+    public RecordWriter createEmptyWriter(
+            BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
+        MergeTree mergeTree = createMergeTree(partition, bucket, compactExecutor);
+        return mergeTree.createWriter(Collections.emptyList());
+    }
+
+    private MergeTree createMergeTree(
+            BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
+        SstFile sstFile =
+                new SstFile(

Review comment:
       I guess it depends on the implementation of `BulkFormat` and `BulkWriter`. We can also reuse them with the current interface, just return the same `readerFactory` and `writerFactory` for the same `fileFormat`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] tsreaper commented on a change in pull request #14: [FLINK-25803] Implement partition and bucket filter in FileStoreScanImpl

Posted by GitBox <gi...@apache.org>.
tsreaper commented on a change in pull request #14:
URL: https://github.com/apache/flink-table-store/pull/14#discussion_r793305142



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.mergetree.MergeTree;
+import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
+import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
+import org.apache.flink.table.store.file.mergetree.sst.SstFile;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+/** Default implementation of {@link FileStoreWrite}. */
+public class FileStoreWriteImpl implements FileStoreWrite {
+
+    private final RowType keyType;
+    private final RowType rowType;
+    private final Comparator<RowData> keyComparator;
+    private final Accumulator accumulator;
+    private final FileFormat fileFormat;
+    private final FileStorePathFactory pathFactory;
+    private final MergeTreeOptions mergeTreeOptions;
+
+    private final FileStoreScan scan;
+
+    public FileStoreWriteImpl(
+            RowType partitionType,
+            RowType keyType,
+            RowType rowType,
+            Comparator<RowData> keyComparator,
+            Accumulator accumulator,
+            FileFormat fileFormat,
+            FileStorePathFactory pathFactory,
+            MergeTreeOptions mergeTreeOptions) {
+        this.keyType = keyType;
+        this.rowType = rowType;
+        this.keyComparator = keyComparator;
+        this.accumulator = accumulator;
+        this.fileFormat = fileFormat;
+        this.pathFactory = pathFactory;
+        this.mergeTreeOptions = mergeTreeOptions;
+
+        this.scan = new FileStoreScanImpl(partitionType, keyType, rowType, fileFormat, pathFactory);
+    }
+
+    @Override
+    public RecordWriter createWriter(
+            BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
+        Long latestSnapshotId = pathFactory.latestSnapshotId();
+        if (latestSnapshotId == null) {
+            return createEmptyWriter(partition, bucket, compactExecutor);
+        } else {
+            MergeTree mergeTree = createMergeTree(partition, bucket, compactExecutor);
+            return mergeTree.createWriter(
+                    scan.withSnapshot(latestSnapshotId)
+                            .withPartitionFilter(Collections.singletonList(partition))
+                            .withBucket(bucket).plan().files().stream()
+                            .map(ManifestEntry::file)
+                            .collect(Collectors.toList()));
+        }
+    }
+
+    @Override
+    public RecordWriter createEmptyWriter(
+            BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
+        MergeTree mergeTree = createMergeTree(partition, bucket, compactExecutor);
+        return mergeTree.createWriter(Collections.emptyList());
+    }
+
+    private MergeTree createMergeTree(
+            BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
+        SstFile sstFile =
+                new SstFile(
+                        keyType,
+                        rowType,
+                        fileFormat,
+                        pathFactory.createSstPathFactory(partition, bucket),

Review comment:
       Why are we reusing this? One of the benefits I can see is that files written by the same job will contain the same uuid but currently I don't think this is a must.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] tsreaper commented on a change in pull request #14: [FLINK-25803] Implement partition and bucket filter in FileStoreScanImpl

Posted by GitBox <gi...@apache.org>.
tsreaper commented on a change in pull request #14:
URL: https://github.com/apache/flink-table-store/pull/14#discussion_r791535456



##########
File path: flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTestBase.java
##########
@@ -71,7 +67,7 @@ public void beforeEach() throws IOException {
         root.getFileSystem().mkdirs(new Path(root + "/snapshot"));
     }
 
-    protected abstract String getSchema();
+    protected abstract String getScheme();

Review comment:
       Scheme. This is the file system scheme, not table schema.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] tsreaper commented on a change in pull request #14: [FLINK-25803] Implement partition and bucket filter in FileStoreScanImpl

Posted by GitBox <gi...@apache.org>.
tsreaper commented on a change in pull request #14:
URL: https://github.com/apache/flink-table-store/pull/14#discussion_r791590842



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
##########
@@ -106,10 +115,12 @@ public SstPathFactory createSstPathFactory(BinaryRowData partition, int bucket)
     }
 
     public String getPartitionString(BinaryRowData partition) {
-        return PartitionPathUtils.generatePartitionPath(
-                partitionComputer.generatePartValues(
-                        Preconditions.checkNotNull(
-                                partition, "Partition row data is null. This is unexpected.")));
+        synchronized (partitionComputer) {

Review comment:
       After discussion we decided to mark some methods as thread-safe while others as not-thread-safe.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #14: [FLINK-25803] Implement partition and bucket filter in FileStoreScanImpl

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #14:
URL: https://github.com/apache/flink-table-store/pull/14#discussion_r791468220



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
##########
@@ -18,62 +18,129 @@
 
 package org.apache.flink.table.store.file.operation;
 
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileFormat;
 import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.manifest.ManifestEntry;
 import org.apache.flink.table.store.file.manifest.ManifestFile;
 import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
 import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.predicate.And;
+import org.apache.flink.table.store.file.predicate.Equal;
+import org.apache.flink.table.store.file.predicate.Literal;
+import org.apache.flink.table.store.file.predicate.Or;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
-import java.util.LinkedHashMap;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 /** Default implementation of {@link FileStoreScan}. */
 public class FileStoreScanImpl implements FileStoreScan {
 
+    private final RowType partitionType;
+    private final RowType keyType;
+    private final RowType rowType;
+    private final FileFormat fileFormat;
     private final FileStorePathFactory pathFactory;
-    private final ManifestFile manifestFile;
+
+    private final List<RowData.FieldGetter> partitionFieldGetters;
     private final ManifestList manifestList;
 
     private Long snapshotId;
     private List<ManifestFileMeta> manifests;
+    private Predicate partitionFilter;
+    private Predicate keyFilter;
+    private Predicate valueFilter;
+    private Integer bucket;
 
     public FileStoreScanImpl(
-            FileStorePathFactory pathFactory,
-            ManifestFile manifestFile,
-            ManifestList manifestList) {
+            RowType partitionType,
+            RowType keyType,
+            RowType rowType,
+            FileFormat fileFormat,
+            FileStorePathFactory pathFactory) {
+        this.partitionType = partitionType;
+        this.keyType = keyType;
+        this.rowType = rowType;
+        this.fileFormat = fileFormat;
         this.pathFactory = pathFactory;
-        this.manifestFile = manifestFile;
-        this.manifestList = manifestList;
+
+        this.partitionFieldGetters =

Review comment:
       Can we extract `RowToArrayConverter`?

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
##########
@@ -31,6 +32,8 @@
 
     FileStoreScan withPartitionFilter(Predicate predicate);
 
+    FileStoreScan withPartitionFilter(List<BinaryRowData> partitions);

Review comment:
       `inPartitions`?

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
##########
@@ -106,10 +115,12 @@ public SstPathFactory createSstPathFactory(BinaryRowData partition, int bucket)
     }
 
     public String getPartitionString(BinaryRowData partition) {
-        return PartitionPathUtils.generatePartitionPath(
-                partitionComputer.generatePartValues(
-                        Preconditions.checkNotNull(
-                                partition, "Partition row data is null. This is unexpected.")));
+        synchronized (partitionComputer) {

Review comment:
       Why needs add `synchronized`?

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
##########
@@ -109,34 +176,72 @@ public Long snapshotId() {
     }
 
     private List<ManifestEntry> scan() {
-        Map<ManifestEntry.Identifier, ManifestEntry> map = new LinkedHashMap<>();
-        for (ManifestFileMeta manifest : manifests) {
-            // TODO read each manifest file concurrently
-            for (ManifestEntry entry : manifestFile.read(manifest.fileName())) {
-                ManifestEntry.Identifier identifier = entry.identifier();
-                switch (entry.kind()) {
-                    case ADD:
-                        Preconditions.checkState(
-                                !map.containsKey(identifier),
-                                "Trying to add file %s which is already added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.put(identifier, entry);
-                        break;
-                    case DELETE:
-                        Preconditions.checkState(
-                                map.containsKey(identifier),
-                                "Trying to delete file %s which is not previously added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.remove(identifier);
-                        break;
-                    default:
-                        throw new UnsupportedOperationException(
-                                "Unknown value kind " + entry.kind().name());
-                }
+        List<ManifestEntry> entries;
+        try {
+            entries =
+                    new ForkJoinPool()

Review comment:
       Why use `ForkJoinPool`?

##########
File path: flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTestBase.java
##########
@@ -71,7 +67,7 @@ public void beforeEach() throws IOException {
         root.getFileSystem().mkdirs(new Path(root + "/snapshot"));
     }
 
-    protected abstract String getSchema();
+    protected abstract String getScheme();

Review comment:
       `getSchema`?

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
##########
@@ -109,34 +176,72 @@ public Long snapshotId() {
     }
 
     private List<ManifestEntry> scan() {
-        Map<ManifestEntry.Identifier, ManifestEntry> map = new LinkedHashMap<>();
-        for (ManifestFileMeta manifest : manifests) {
-            // TODO read each manifest file concurrently
-            for (ManifestEntry entry : manifestFile.read(manifest.fileName())) {
-                ManifestEntry.Identifier identifier = entry.identifier();
-                switch (entry.kind()) {
-                    case ADD:
-                        Preconditions.checkState(
-                                !map.containsKey(identifier),
-                                "Trying to add file %s which is already added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.put(identifier, entry);
-                        break;
-                    case DELETE:
-                        Preconditions.checkState(
-                                map.containsKey(identifier),
-                                "Trying to delete file %s which is not previously added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.remove(identifier);
-                        break;
-                    default:
-                        throw new UnsupportedOperationException(
-                                "Unknown value kind " + entry.kind().name());
-                }
+        List<ManifestEntry> entries;
+        try {
+            entries =
+                    new ForkJoinPool()
+                            .submit(
+                                    () ->
+                                            manifests
+                                                    .parallelStream()
+                                                    .filter(this::filterManifestFileMeta)
+                                                    .flatMap(m -> readManifestFileMeta(m).stream())

Review comment:
       filter `filterManifestEntry` in `readManifestFileMeta(m).stream()`?

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
##########
@@ -109,34 +176,72 @@ public Long snapshotId() {
     }
 
     private List<ManifestEntry> scan() {
-        Map<ManifestEntry.Identifier, ManifestEntry> map = new LinkedHashMap<>();
-        for (ManifestFileMeta manifest : manifests) {
-            // TODO read each manifest file concurrently
-            for (ManifestEntry entry : manifestFile.read(manifest.fileName())) {
-                ManifestEntry.Identifier identifier = entry.identifier();
-                switch (entry.kind()) {
-                    case ADD:
-                        Preconditions.checkState(
-                                !map.containsKey(identifier),
-                                "Trying to add file %s which is already added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.put(identifier, entry);
-                        break;
-                    case DELETE:
-                        Preconditions.checkState(
-                                map.containsKey(identifier),
-                                "Trying to delete file %s which is not previously added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.remove(identifier);
-                        break;
-                    default:
-                        throw new UnsupportedOperationException(
-                                "Unknown value kind " + entry.kind().name());
-                }
+        List<ManifestEntry> entries;
+        try {
+            entries =
+                    new ForkJoinPool()
+                            .submit(
+                                    () ->
+                                            manifests
+                                                    .parallelStream()
+                                                    .filter(this::filterManifestFileMeta)
+                                                    .flatMap(m -> readManifestFileMeta(m).stream())
+                                                    .filter(this::filterManifestEntry)
+                                                    .collect(Collectors.toList()))
+                            .get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException("Failed to read ManifestEntry list concurrently", e);
+        }
+
+        Map<ManifestEntry.Identifier, ManifestEntry> map = new HashMap<>();
+        for (ManifestEntry entry : entries) {
+            ManifestEntry.Identifier identifier = entry.identifier();
+            switch (entry.kind()) {
+                case ADD:
+                    Preconditions.checkState(
+                            !map.containsKey(identifier),
+                            "Trying to add file %s which is already added. "
+                                    + "Manifest might be corrupted.",
+                            identifier);
+                    map.put(identifier, entry);
+                    break;
+                case DELETE:
+                    Preconditions.checkState(
+                            map.containsKey(identifier),
+                            "Trying to delete file %s which is not previously added. "
+                                    + "Manifest might be corrupted.",
+                            identifier);
+                    map.remove(identifier);
+                    break;
+                default:
+                    throw new UnsupportedOperationException(
+                            "Unknown value kind " + entry.kind().name());
             }
         }
         return new ArrayList<>(map.values());
     }
+
+    private boolean filterManifestFileMeta(ManifestFileMeta manifest) {
+        return partitionFilter == null
+                || partitionFilter.test(
+                        manifest.numAddedFiles() + manifest.numDeletedFiles(),
+                        manifest.partitionStats());
+    }
+
+    private boolean filterManifestEntry(ManifestEntry entry) {
+        // TODO apply key & value filter after field stats are collected in
+        //  SstFile.RollingFile#finish
+        return (partitionFilter == null
+                        || partitionFilter.test(
+                                partitionFieldGetters.stream()
+                                        .map(g -> g.getFieldOrNull(entry.partition()))
+                                        .toArray()))
+                && (bucket == null || entry.bucket() == bucket);
+    }
+
+    private List<ManifestEntry> readManifestFileMeta(ManifestFileMeta manifest) {
+        // ManifestFile is not thread safe
+        return new ManifestFile(partitionType, keyType, rowType, fileFormat, pathFactory)

Review comment:
       Create a thread local to reuse `ManifestFile` object?

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
##########
@@ -109,34 +176,72 @@ public Long snapshotId() {
     }
 
     private List<ManifestEntry> scan() {
-        Map<ManifestEntry.Identifier, ManifestEntry> map = new LinkedHashMap<>();
-        for (ManifestFileMeta manifest : manifests) {
-            // TODO read each manifest file concurrently
-            for (ManifestEntry entry : manifestFile.read(manifest.fileName())) {
-                ManifestEntry.Identifier identifier = entry.identifier();
-                switch (entry.kind()) {
-                    case ADD:
-                        Preconditions.checkState(
-                                !map.containsKey(identifier),
-                                "Trying to add file %s which is already added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.put(identifier, entry);
-                        break;
-                    case DELETE:
-                        Preconditions.checkState(
-                                map.containsKey(identifier),
-                                "Trying to delete file %s which is not previously added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.remove(identifier);
-                        break;
-                    default:
-                        throw new UnsupportedOperationException(
-                                "Unknown value kind " + entry.kind().name());
-                }
+        List<ManifestEntry> entries;
+        try {
+            entries =
+                    new ForkJoinPool()

Review comment:
       I see...
   ```
       public final ForkJoinTask<V> fork() {
           Thread t;
           if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
               ((ForkJoinWorkerThread)t).workQueue.push(this);
           else
               ForkJoinPool.common.externalPush(this);
           return this;
       }
   ```

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
##########
@@ -109,34 +176,72 @@ public Long snapshotId() {
     }
 
     private List<ManifestEntry> scan() {
-        Map<ManifestEntry.Identifier, ManifestEntry> map = new LinkedHashMap<>();
-        for (ManifestFileMeta manifest : manifests) {
-            // TODO read each manifest file concurrently
-            for (ManifestEntry entry : manifestFile.read(manifest.fileName())) {
-                ManifestEntry.Identifier identifier = entry.identifier();
-                switch (entry.kind()) {
-                    case ADD:
-                        Preconditions.checkState(
-                                !map.containsKey(identifier),
-                                "Trying to add file %s which is already added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.put(identifier, entry);
-                        break;
-                    case DELETE:
-                        Preconditions.checkState(
-                                map.containsKey(identifier),
-                                "Trying to delete file %s which is not previously added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.remove(identifier);
-                        break;
-                    default:
-                        throw new UnsupportedOperationException(
-                                "Unknown value kind " + entry.kind().name());
-                }
+        List<ManifestEntry> entries;
+        try {
+            entries =
+                    new ForkJoinPool()

Review comment:
       lack shutdown?
   I think we can pass this pool from outside.

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
##########
@@ -109,34 +176,72 @@ public Long snapshotId() {
     }
 
     private List<ManifestEntry> scan() {
-        Map<ManifestEntry.Identifier, ManifestEntry> map = new LinkedHashMap<>();
-        for (ManifestFileMeta manifest : manifests) {
-            // TODO read each manifest file concurrently
-            for (ManifestEntry entry : manifestFile.read(manifest.fileName())) {
-                ManifestEntry.Identifier identifier = entry.identifier();
-                switch (entry.kind()) {
-                    case ADD:
-                        Preconditions.checkState(
-                                !map.containsKey(identifier),
-                                "Trying to add file %s which is already added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.put(identifier, entry);
-                        break;
-                    case DELETE:
-                        Preconditions.checkState(
-                                map.containsKey(identifier),
-                                "Trying to delete file %s which is not previously added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.remove(identifier);
-                        break;
-                    default:
-                        throw new UnsupportedOperationException(
-                                "Unknown value kind " + entry.kind().name());
-                }
+        List<ManifestEntry> entries;
+        try {
+            entries =
+                    new ForkJoinPool()

Review comment:
       We can create static pool like JVM do:
   ```
       /**
        * Common (static) pool. Non-null for public use unless a static
        * construction exception, but internal usages null-check on use
        * to paranoically avoid potential initialization circularities
        * as well as to simplify generated code.
        */
       static final ForkJoinPool common;
   ```

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
##########
@@ -109,34 +176,72 @@ public Long snapshotId() {
     }
 
     private List<ManifestEntry> scan() {
-        Map<ManifestEntry.Identifier, ManifestEntry> map = new LinkedHashMap<>();
-        for (ManifestFileMeta manifest : manifests) {
-            // TODO read each manifest file concurrently
-            for (ManifestEntry entry : manifestFile.read(manifest.fileName())) {
-                ManifestEntry.Identifier identifier = entry.identifier();
-                switch (entry.kind()) {
-                    case ADD:
-                        Preconditions.checkState(
-                                !map.containsKey(identifier),
-                                "Trying to add file %s which is already added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.put(identifier, entry);
-                        break;
-                    case DELETE:
-                        Preconditions.checkState(
-                                map.containsKey(identifier),
-                                "Trying to delete file %s which is not previously added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.remove(identifier);
-                        break;
-                    default:
-                        throw new UnsupportedOperationException(
-                                "Unknown value kind " + entry.kind().name());
-                }
+        List<ManifestEntry> entries;
+        try {
+            entries =
+                    new ForkJoinPool()

Review comment:
       Also, user can configure pool size through `System.getProperty`, just like `ForkJoinPool.makeCommonPool`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #14: [FLINK-25803] Implement partition and bucket filter in FileStoreScanImpl

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #14:
URL: https://github.com/apache/flink-table-store/pull/14#discussion_r791597053



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
##########
@@ -109,34 +176,72 @@ public Long snapshotId() {
     }
 
     private List<ManifestEntry> scan() {
-        Map<ManifestEntry.Identifier, ManifestEntry> map = new LinkedHashMap<>();
-        for (ManifestFileMeta manifest : manifests) {
-            // TODO read each manifest file concurrently
-            for (ManifestEntry entry : manifestFile.read(manifest.fileName())) {
-                ManifestEntry.Identifier identifier = entry.identifier();
-                switch (entry.kind()) {
-                    case ADD:
-                        Preconditions.checkState(
-                                !map.containsKey(identifier),
-                                "Trying to add file %s which is already added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.put(identifier, entry);
-                        break;
-                    case DELETE:
-                        Preconditions.checkState(
-                                map.containsKey(identifier),
-                                "Trying to delete file %s which is not previously added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.remove(identifier);
-                        break;
-                    default:
-                        throw new UnsupportedOperationException(
-                                "Unknown value kind " + entry.kind().name());
-                }
+        List<ManifestEntry> entries;
+        try {
+            entries =
+                    new ForkJoinPool()

Review comment:
       We can create static pool like JDK do:
   ```
       /**
        * Common (static) pool. Non-null for public use unless a static
        * construction exception, but internal usages null-check on use
        * to paranoically avoid potential initialization circularities
        * as well as to simplify generated code.
        */
       static final ForkJoinPool common;
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #14: [FLINK-25803] Implement partition and bucket filter in FileStoreScanImpl

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #14:
URL: https://github.com/apache/flink-table-store/pull/14#discussion_r792405101



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.mergetree.MergeTree;
+import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
+import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
+import org.apache.flink.table.store.file.mergetree.sst.SstFile;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+/** Default implementation of {@link FileStoreWrite}. */
+public class FileStoreWriteImpl implements FileStoreWrite {
+
+    private final RowType keyType;
+    private final RowType rowType;
+    private final Comparator<RowData> keyComparator;
+    private final Accumulator accumulator;
+    private final FileFormat fileFormat;
+    private final FileStorePathFactory pathFactory;
+    private final MergeTreeOptions mergeTreeOptions;
+
+    private final FileStoreScan scan;
+
+    public FileStoreWriteImpl(
+            RowType partitionType,
+            RowType keyType,
+            RowType rowType,
+            Comparator<RowData> keyComparator,
+            Accumulator accumulator,
+            FileFormat fileFormat,
+            FileStorePathFactory pathFactory,
+            MergeTreeOptions mergeTreeOptions) {
+        this.keyType = keyType;
+        this.rowType = rowType;
+        this.keyComparator = keyComparator;
+        this.accumulator = accumulator;
+        this.fileFormat = fileFormat;
+        this.pathFactory = pathFactory;
+        this.mergeTreeOptions = mergeTreeOptions;
+
+        this.scan = new FileStoreScanImpl(partitionType, keyType, rowType, fileFormat, pathFactory);
+    }
+
+    @Override
+    public RecordWriter createWriter(
+            BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
+        Long latestSnapshotId = pathFactory.latestSnapshotId();
+        if (latestSnapshotId == null) {
+            return createEmptyWriter(partition, bucket, compactExecutor);
+        } else {
+            MergeTree mergeTree = createMergeTree(partition, bucket, compactExecutor);
+            return mergeTree.createWriter(
+                    scan.withSnapshot(latestSnapshotId)
+                            .withPartitionFilter(Collections.singletonList(partition))
+                            .withBucket(bucket).plan().files().stream()
+                            .map(ManifestEntry::file)
+                            .collect(Collectors.toList()));
+        }
+    }
+
+    @Override
+    public RecordWriter createEmptyWriter(
+            BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
+        MergeTree mergeTree = createMergeTree(partition, bucket, compactExecutor);
+        return mergeTree.createWriter(Collections.emptyList());
+    }
+
+    private MergeTree createMergeTree(

Review comment:
       We can create a `MergeTreeFactory`, it is required by reader too.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org