You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/05/10 09:05:31 UTC

[incubator-paimon] branch master updated: [core] [revert] Revert abstract manifest entry (#1114)

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 274688de0 [core] [revert] Revert abstract manifest entry (#1114)
274688de0 is described below

commit 274688de027ac04026d36665e0f869e92696b07c
Author: YeJunHao <41...@users.noreply.github.com>
AuthorDate: Wed May 10 17:05:27 2023 +0800

    [core] [revert] Revert abstract manifest entry (#1114)
---
 .../shortcodes/generated/core_configuration.html   |   4 +-
 .../main/java/org/apache/paimon/CoreOptions.java   |   4 +-
 .../paimon/manifest/AbstractManifestEntry.java     | 189 ---------------------
 .../org/apache/paimon/manifest/ManifestEntry.java  | 122 ++++++++++++-
 .../paimon/operation/AbstractFileStoreScan.java    |  29 +---
 5 files changed, 133 insertions(+), 215 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 418149ae3..65f084aa2 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -35,7 +35,7 @@
         <tr>
             <td><h5>changelog-producer.row-deduplicate</h5></td>
             <td style="word-wrap: break-word;">false</td>
-            <td><p>Boolean</p></td>
+            <td>Boolean</td>
             <td>Whether to generate -U, +U changelog for the same record. This configuration is only valid for the changelog-producer is lookup or full-compaction.</td>
         </tr>
         <tr>
@@ -300,7 +300,7 @@
             <td><h5>scan.manifest.parallelism</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>Integer</td>
-            <td>The parallelism of scanning manifest files, default value is the size of cpu processor.Note: Scale-up this parameter will increase memory usage while scanning manifest files.We can consider downsize it when we encounter an out of memory exception while scanning</td>
+            <td>The parallelism of scanning manifest files, default value is the size of cpu processor. Note: Scale-up this parameter will increase memory usage while scanning manifest files. We can consider downsize it when we encounter an out of memory exception while scanning</td>
         </tr>
         <tr>
             <td><h5>scan.mode</h5></td>
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index ad7ccda11..c49833e0b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -399,8 +399,8 @@ public class CoreOptions implements Serializable {
                     .intType()
                     .noDefaultValue()
                     .withDescription(
-                            "The parallelism of scanning manifest files, default value is the size of cpu processor."
-                                    + "Note: Scale-up this parameter will increase memory usage while scanning manifest files."
+                            "The parallelism of scanning manifest files, default value is the size of cpu processor. "
+                                    + "Note: Scale-up this parameter will increase memory usage while scanning manifest files. "
                                     + "We can consider downsize it when we encounter an out of memory exception while scanning");
 
     public static final ConfigOption<LogConsistency> LOG_CONSISTENCY =
diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/AbstractManifestEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/AbstractManifestEntry.java
deleted file mode 100644
index cdd54cc0a..000000000
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/AbstractManifestEntry.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.manifest;
-
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.utils.FileStorePathFactory;
-import org.apache.paimon.utils.Preconditions;
-
-import java.util.Collection;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Objects;
-
-/** Abstract a simplest model of manifest file. */
-public abstract class AbstractManifestEntry {
-    protected final FileKind kind;
-    protected final String fileName;
-    // for tables without partition this field should be a row with 0 columns (not null)
-    protected final BinaryRow partition;
-    protected final int bucket;
-    protected final int totalBuckets;
-    protected final int level;
-
-    public AbstractManifestEntry(
-            FileKind kind,
-            String fileName,
-            BinaryRow partition,
-            int bucket,
-            int totalBuckets,
-            int level) {
-        this.kind = kind;
-        this.fileName = fileName;
-        this.partition = partition;
-        this.bucket = bucket;
-        this.totalBuckets = totalBuckets;
-        this.level = level;
-    }
-
-    public FileKind kind() {
-        return kind;
-    }
-
-    public BinaryRow partition() {
-        return partition;
-    }
-
-    public int bucket() {
-        return bucket;
-    }
-
-    public int totalBuckets() {
-        return totalBuckets;
-    }
-
-    public int level() {
-        return level;
-    }
-
-    public Identifier identifier() {
-        return new Identifier(partition, bucket, level, fileName);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (!(o instanceof AbstractManifestEntry)) {
-            return false;
-        }
-        AbstractManifestEntry that = (AbstractManifestEntry) o;
-        return Objects.equals(kind, that.kind)
-                && Objects.equals(partition, that.partition)
-                && bucket == that.bucket
-                && level == that.level
-                && Objects.equals(fileName, that.fileName);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(kind, partition, bucket, level, fileName);
-    }
-
-    @Override
-    public String toString() {
-        return String.format("{%s, %s, %d, %d, %s}", kind, partition, bucket, level, fileName);
-    }
-
-    public static <T extends AbstractManifestEntry> Collection<T> mergeEntries(
-            Iterable<T> entries) {
-        LinkedHashMap<Identifier, T> map = new LinkedHashMap<>();
-        mergeEntries(entries, map);
-        return map.values();
-    }
-
-    public static <T extends AbstractManifestEntry> void mergeEntries(
-            Iterable<T> entries, Map<Identifier, T> map) {
-        for (T entry : entries) {
-            Identifier identifier = entry.identifier();
-            switch (entry.kind()) {
-                case ADD:
-                    Preconditions.checkState(
-                            !map.containsKey(identifier),
-                            "Trying to add file %s which is already added. Manifest might be corrupted.",
-                            identifier);
-                    map.put(identifier, entry);
-                    break;
-                case DELETE:
-                    // each dataFile will only be added once and deleted once,
-                    // if we know that it is added before then both add and delete entry can be
-                    // removed because there won't be further operations on this file,
-                    // otherwise we have to keep the delete entry because the add entry must be
-                    // in the previous manifest files
-                    if (map.containsKey(identifier)) {
-                        map.remove(identifier);
-                    } else {
-                        map.put(identifier, entry);
-                    }
-                    break;
-                default:
-                    throw new UnsupportedOperationException(
-                            "Unknown value kind " + entry.kind().name());
-            }
-        }
-    }
-
-    /**
-     * The same {@link Identifier} indicates that the {@link AbstractManifestEntry} refers to the
-     * same data file.
-     */
-    public static class Identifier {
-        public final BinaryRow partition;
-        public final int bucket;
-        public final int level;
-        public final String fileName;
-
-        private Identifier(BinaryRow partition, int bucket, int level, String fileName) {
-            this.partition = partition;
-            this.bucket = bucket;
-            this.level = level;
-            this.fileName = fileName;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (!(o instanceof Identifier)) {
-                return false;
-            }
-            Identifier that = (Identifier) o;
-            return Objects.equals(partition, that.partition)
-                    && bucket == that.bucket
-                    && level == that.level
-                    && Objects.equals(fileName, that.fileName);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(partition, bucket, level, fileName);
-        }
-
-        @Override
-        public String toString() {
-            return String.format("{%s, %d, %d, %s}", partition, bucket, level, fileName);
-        }
-
-        public String toString(FileStorePathFactory pathFactory) {
-            return pathFactory.getPartitionString(partition)
-                    + ", bucket "
-                    + bucket
-                    + ", level "
-                    + level
-                    + ", file "
-                    + fileName;
-        }
-    }
-}
diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
index 5ad9f9972..1c175e589 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
@@ -24,30 +24,61 @@ import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.Preconditions;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
 import static org.apache.paimon.utils.SerializationUtils.newBytesType;
 
 /** Entry of a manifest file, representing an addition / deletion of a data file. */
-public class ManifestEntry extends AbstractManifestEntry {
+public class ManifestEntry {
 
+    private final FileKind kind;
+    // for tables without partition this field should be a row with 0 columns (not null)
+    private final BinaryRow partition;
+    private final int bucket;
+    private final int totalBuckets;
     private final DataFileMeta file;
 
     public ManifestEntry(
             FileKind kind, BinaryRow partition, int bucket, int totalBuckets, DataFileMeta file) {
-        super(kind, file.fileName(), partition, bucket, totalBuckets, file.level());
+        this.kind = kind;
+        this.partition = partition;
+        this.bucket = bucket;
+        this.totalBuckets = totalBuckets;
         this.file = file;
     }
 
+    public FileKind kind() {
+        return kind;
+    }
+
+    public BinaryRow partition() {
+        return partition;
+    }
+
+    public int bucket() {
+        return bucket;
+    }
+
+    public int totalBuckets() {
+        return totalBuckets;
+    }
+
     public DataFileMeta file() {
         return file;
     }
 
+    public Identifier identifier() {
+        return new Identifier(partition, bucket, file.level(), file.fileName());
+    }
+
     public static RowType schema() {
         List<DataField> fields = new ArrayList<>();
         fields.add(new DataField(0, "_KIND", new TinyIntType(false)));
@@ -81,6 +112,43 @@ public class ManifestEntry extends AbstractManifestEntry {
         return String.format("{%s, %s, %d, %d, %s}", kind, partition, bucket, totalBuckets, file);
     }
 
+    public static Collection<ManifestEntry> mergeEntries(Iterable<ManifestEntry> entries) {
+        LinkedHashMap<Identifier, ManifestEntry> map = new LinkedHashMap<>();
+        mergeEntries(entries, map);
+        return map.values();
+    }
+
+    public static void mergeEntries(
+            Iterable<ManifestEntry> entries, Map<Identifier, ManifestEntry> map) {
+        for (ManifestEntry entry : entries) {
+            ManifestEntry.Identifier identifier = entry.identifier();
+            switch (entry.kind()) {
+                case ADD:
+                    Preconditions.checkState(
+                            !map.containsKey(identifier),
+                            "Trying to add file %s which is already added. Manifest might be corrupted.",
+                            identifier);
+                    map.put(identifier, entry);
+                    break;
+                case DELETE:
+                    // each dataFile will only be added once and deleted once,
+                    // if we know that it is added before then both add and delete entry can be
+                    // removed because there won't be further operations on this file,
+                    // otherwise we have to keep the delete entry because the add entry must be
+                    // in the previous manifest files
+                    if (map.containsKey(identifier)) {
+                        map.remove(identifier);
+                    } else {
+                        map.put(identifier, entry);
+                    }
+                    break;
+                default:
+                    throw new UnsupportedOperationException(
+                            "Unknown value kind " + entry.kind().name());
+            }
+        }
+    }
+
     public static void assertNoDelete(Collection<ManifestEntry> entries) {
         for (ManifestEntry entry : entries) {
             Preconditions.checkState(
@@ -89,4 +157,54 @@ public class ManifestEntry extends AbstractManifestEntry {
                     entry.file().fileName());
         }
     }
+
+    /**
+     * The same {@link Identifier} indicates that the {@link ManifestEntry} refers to the same data
+     * file.
+     */
+    public static class Identifier {
+        public final BinaryRow partition;
+        public final int bucket;
+        public final int level;
+        public final String fileName;
+
+        private Identifier(BinaryRow partition, int bucket, int level, String fileName) {
+            this.partition = partition;
+            this.bucket = bucket;
+            this.level = level;
+            this.fileName = fileName;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (!(o instanceof Identifier)) {
+                return false;
+            }
+            Identifier that = (Identifier) o;
+            return Objects.equals(partition, that.partition)
+                    && bucket == that.bucket
+                    && level == that.level
+                    && Objects.equals(fileName, that.fileName);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(partition, bucket, level, fileName);
+        }
+
+        @Override
+        public String toString() {
+            return String.format("{%s, %d, %d, %s}", partition, bucket, level, fileName);
+        }
+
+        public String toString(FileStorePathFactory pathFactory) {
+            return pathFactory.getPartitionString(partition)
+                    + ", bucket "
+                    + bucket
+                    + ", level "
+                    + level
+                    + ", file "
+                    + fileName;
+        }
+    }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index f81a5cdc5..4f3b3e3ba 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -21,7 +21,6 @@ package org.apache.paimon.operation;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.manifest.AbstractManifestEntry;
 import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestEntrySerializer;
@@ -219,8 +218,8 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {
         };
     }
 
-    private <T extends AbstractManifestEntry> Pair<Long, List<T>> doPlan(
-            Function<ManifestFileMeta, List<T>> readManifest) {
+    private Pair<Long, List<ManifestEntry>> doPlan(
+            Function<ManifestFileMeta, List<ManifestEntry>> readManifest) {
         List<ManifestFileMeta> manifests = specifiedManifests;
         Long snapshotId = specifiedSnapshotId;
         if (manifests == null) {
@@ -237,7 +236,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {
 
         final List<ManifestFileMeta> readManifests = manifests;
 
-        Iterable<T> entries =
+        Iterable<ManifestEntry> entries =
                 ParallellyExecuteUtils.parallelismBatchIterable(
                         files ->
                                 files.parallelStream()
@@ -248,8 +247,8 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {
                         readManifests,
                         scanManifestParallelism);
 
-        List<T> files = new ArrayList<>();
-        for (T file : AbstractManifestEntry.mergeEntries(entries)) {
+        List<ManifestEntry> files = new ArrayList<>();
+        for (ManifestEntry file : ManifestEntry.mergeEntries(entries)) {
             if (checkNumOfBuckets && file.totalBuckets() != numOfBuckets) {
                 String partInfo =
                         partitionConverter.getArity() > 0
@@ -324,29 +323,19 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {
     }
 
     /** Note: Keep this thread-safe. */
-    private boolean filterByBucket(AbstractManifestEntry entry) {
+    private boolean filterByBucket(ManifestEntry entry) {
         return (specifiedBucket == null || entry.bucket() == specifiedBucket);
     }
 
     /** Note: Keep this thread-safe. */
-    private boolean filterByBucketSelector(AbstractManifestEntry entry) {
+    private boolean filterByBucketSelector(ManifestEntry entry) {
         return (bucketSelector == null
                 || bucketSelector.select(entry.bucket(), entry.totalBuckets()));
     }
 
     /** Note: Keep this thread-safe. */
-    private boolean filterByLevel(AbstractManifestEntry entry) {
-        return (levelFilter == null || levelFilter.test(entry.level()));
-    }
-
-    /** Note: Keep this thread-safe. */
-    private boolean filterByStats(AbstractManifestEntry entry) {
-        // filterByStats is an action that is completed as much as possible and does not have an
-        // impact if it is not done.
-        if (entry instanceof ManifestEntry) {
-            return filterByStats((ManifestEntry) entry);
-        }
-        return true;
+    private boolean filterByLevel(ManifestEntry entry) {
+        return (levelFilter == null || levelFilter.test(entry.file().level()));
     }
 
     /** Note: Keep this thread-safe. */