You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/05/10 09:05:31 UTC
[incubator-paimon] branch master updated: [core] [revert] Revert abstract manifest entry (#1114)
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 274688de0 [core] [revert] Revert abstract manifest entry (#1114)
274688de0 is described below
commit 274688de027ac04026d36665e0f869e92696b07c
Author: YeJunHao <41...@users.noreply.github.com>
AuthorDate: Wed May 10 17:05:27 2023 +0800
[core] [revert] Revert abstract manifest entry (#1114)
---
.../shortcodes/generated/core_configuration.html | 4 +-
.../main/java/org/apache/paimon/CoreOptions.java | 4 +-
.../paimon/manifest/AbstractManifestEntry.java | 189 ---------------------
.../org/apache/paimon/manifest/ManifestEntry.java | 122 ++++++++++++-
.../paimon/operation/AbstractFileStoreScan.java | 29 +---
5 files changed, 133 insertions(+), 215 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 418149ae3..65f084aa2 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -35,7 +35,7 @@
<tr>
<td><h5>changelog-producer.row-deduplicate</h5></td>
<td style="word-wrap: break-word;">false</td>
- <td><p>Boolean</p></td>
+ <td>Boolean</td>
<td>Whether to generate -U, +U changelog for the same record. This configuration is only valid for the changelog-producer is lookup or full-compaction.</td>
</tr>
<tr>
@@ -300,7 +300,7 @@
<td><h5>scan.manifest.parallelism</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
- <td>The parallelism of scanning manifest files, default value is the size of cpu processor.Note: Scale-up this parameter will increase memory usage while scanning manifest files.We can consider downsize it when we encounter an out of memory exception while scanning</td>
+ <td>The parallelism of scanning manifest files, default value is the size of cpu processor. Note: Scale-up this parameter will increase memory usage while scanning manifest files. We can consider downsize it when we encounter an out of memory exception while scanning</td>
</tr>
<tr>
<td><h5>scan.mode</h5></td>
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index ad7ccda11..c49833e0b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -399,8 +399,8 @@ public class CoreOptions implements Serializable {
.intType()
.noDefaultValue()
.withDescription(
- "The parallelism of scanning manifest files, default value is the size of cpu processor."
- + "Note: Scale-up this parameter will increase memory usage while scanning manifest files."
+ "The parallelism of scanning manifest files, default value is the size of cpu processor. "
+ + "Note: Scale-up this parameter will increase memory usage while scanning manifest files. "
+ "We can consider downsize it when we encounter an out of memory exception while scanning");
public static final ConfigOption<LogConsistency> LOG_CONSISTENCY =
diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/AbstractManifestEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/AbstractManifestEntry.java
deleted file mode 100644
index cdd54cc0a..000000000
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/AbstractManifestEntry.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.manifest;
-
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.utils.FileStorePathFactory;
-import org.apache.paimon.utils.Preconditions;
-
-import java.util.Collection;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Objects;
-
-/** Abstract a simplest model of manifest file. */
-public abstract class AbstractManifestEntry {
- protected final FileKind kind;
- protected final String fileName;
- // for tables without partition this field should be a row with 0 columns (not null)
- protected final BinaryRow partition;
- protected final int bucket;
- protected final int totalBuckets;
- protected final int level;
-
- public AbstractManifestEntry(
- FileKind kind,
- String fileName,
- BinaryRow partition,
- int bucket,
- int totalBuckets,
- int level) {
- this.kind = kind;
- this.fileName = fileName;
- this.partition = partition;
- this.bucket = bucket;
- this.totalBuckets = totalBuckets;
- this.level = level;
- }
-
- public FileKind kind() {
- return kind;
- }
-
- public BinaryRow partition() {
- return partition;
- }
-
- public int bucket() {
- return bucket;
- }
-
- public int totalBuckets() {
- return totalBuckets;
- }
-
- public int level() {
- return level;
- }
-
- public Identifier identifier() {
- return new Identifier(partition, bucket, level, fileName);
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof AbstractManifestEntry)) {
- return false;
- }
- AbstractManifestEntry that = (AbstractManifestEntry) o;
- return Objects.equals(kind, that.kind)
- && Objects.equals(partition, that.partition)
- && bucket == that.bucket
- && level == that.level
- && Objects.equals(fileName, that.fileName);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(kind, partition, bucket, level, fileName);
- }
-
- @Override
- public String toString() {
- return String.format("{%s, %s, %d, %d, %s}", kind, partition, bucket, level, fileName);
- }
-
- public static <T extends AbstractManifestEntry> Collection<T> mergeEntries(
- Iterable<T> entries) {
- LinkedHashMap<Identifier, T> map = new LinkedHashMap<>();
- mergeEntries(entries, map);
- return map.values();
- }
-
- public static <T extends AbstractManifestEntry> void mergeEntries(
- Iterable<T> entries, Map<Identifier, T> map) {
- for (T entry : entries) {
- Identifier identifier = entry.identifier();
- switch (entry.kind()) {
- case ADD:
- Preconditions.checkState(
- !map.containsKey(identifier),
- "Trying to add file %s which is already added. Manifest might be corrupted.",
- identifier);
- map.put(identifier, entry);
- break;
- case DELETE:
- // each dataFile will only be added once and deleted once,
- // if we know that it is added before then both add and delete entry can be
- // removed because there won't be further operations on this file,
- // otherwise we have to keep the delete entry because the add entry must be
- // in the previous manifest files
- if (map.containsKey(identifier)) {
- map.remove(identifier);
- } else {
- map.put(identifier, entry);
- }
- break;
- default:
- throw new UnsupportedOperationException(
- "Unknown value kind " + entry.kind().name());
- }
- }
- }
-
- /**
- * The same {@link Identifier} indicates that the {@link AbstractManifestEntry} refers to the
- * same data file.
- */
- public static class Identifier {
- public final BinaryRow partition;
- public final int bucket;
- public final int level;
- public final String fileName;
-
- private Identifier(BinaryRow partition, int bucket, int level, String fileName) {
- this.partition = partition;
- this.bucket = bucket;
- this.level = level;
- this.fileName = fileName;
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof Identifier)) {
- return false;
- }
- Identifier that = (Identifier) o;
- return Objects.equals(partition, that.partition)
- && bucket == that.bucket
- && level == that.level
- && Objects.equals(fileName, that.fileName);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(partition, bucket, level, fileName);
- }
-
- @Override
- public String toString() {
- return String.format("{%s, %d, %d, %s}", partition, bucket, level, fileName);
- }
-
- public String toString(FileStorePathFactory pathFactory) {
- return pathFactory.getPartitionString(partition)
- + ", bucket "
- + bucket
- + ", level "
- + level
- + ", file "
- + fileName;
- }
- }
-}
diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
index 5ad9f9972..1c175e589 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
@@ -24,30 +24,61 @@ import org.apache.paimon.types.DataField;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Preconditions;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import static org.apache.paimon.utils.SerializationUtils.newBytesType;
/** Entry of a manifest file, representing an addition / deletion of a data file. */
-public class ManifestEntry extends AbstractManifestEntry {
+public class ManifestEntry {
+ private final FileKind kind;
+ // for tables without partition this field should be a row with 0 columns (not null)
+ private final BinaryRow partition;
+ private final int bucket;
+ private final int totalBuckets;
private final DataFileMeta file;
public ManifestEntry(
FileKind kind, BinaryRow partition, int bucket, int totalBuckets, DataFileMeta file) {
- super(kind, file.fileName(), partition, bucket, totalBuckets, file.level());
+ this.kind = kind;
+ this.partition = partition;
+ this.bucket = bucket;
+ this.totalBuckets = totalBuckets;
this.file = file;
}
+ public FileKind kind() {
+ return kind;
+ }
+
+ public BinaryRow partition() {
+ return partition;
+ }
+
+ public int bucket() {
+ return bucket;
+ }
+
+ public int totalBuckets() {
+ return totalBuckets;
+ }
+
public DataFileMeta file() {
return file;
}
+ public Identifier identifier() {
+ return new Identifier(partition, bucket, file.level(), file.fileName());
+ }
+
public static RowType schema() {
List<DataField> fields = new ArrayList<>();
fields.add(new DataField(0, "_KIND", new TinyIntType(false)));
@@ -81,6 +112,43 @@ public class ManifestEntry extends AbstractManifestEntry {
return String.format("{%s, %s, %d, %d, %s}", kind, partition, bucket, totalBuckets, file);
}
+ public static Collection<ManifestEntry> mergeEntries(Iterable<ManifestEntry> entries) {
+ LinkedHashMap<Identifier, ManifestEntry> map = new LinkedHashMap<>();
+ mergeEntries(entries, map);
+ return map.values();
+ }
+
+ public static void mergeEntries(
+ Iterable<ManifestEntry> entries, Map<Identifier, ManifestEntry> map) {
+ for (ManifestEntry entry : entries) {
+ ManifestEntry.Identifier identifier = entry.identifier();
+ switch (entry.kind()) {
+ case ADD:
+ Preconditions.checkState(
+ !map.containsKey(identifier),
+ "Trying to add file %s which is already added. Manifest might be corrupted.",
+ identifier);
+ map.put(identifier, entry);
+ break;
+ case DELETE:
+ // each dataFile will only be added once and deleted once,
+ // if we know that it is added before then both add and delete entry can be
+ // removed because there won't be further operations on this file,
+ // otherwise we have to keep the delete entry because the add entry must be
+ // in the previous manifest files
+ if (map.containsKey(identifier)) {
+ map.remove(identifier);
+ } else {
+ map.put(identifier, entry);
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unknown value kind " + entry.kind().name());
+ }
+ }
+ }
+
public static void assertNoDelete(Collection<ManifestEntry> entries) {
for (ManifestEntry entry : entries) {
Preconditions.checkState(
@@ -89,4 +157,54 @@ public class ManifestEntry extends AbstractManifestEntry {
entry.file().fileName());
}
}
+
+ /**
+ * The same {@link Identifier} indicates that the {@link ManifestEntry} refers to the same data
+ * file.
+ */
+ public static class Identifier {
+ public final BinaryRow partition;
+ public final int bucket;
+ public final int level;
+ public final String fileName;
+
+ private Identifier(BinaryRow partition, int bucket, int level, String fileName) {
+ this.partition = partition;
+ this.bucket = bucket;
+ this.level = level;
+ this.fileName = fileName;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof Identifier)) {
+ return false;
+ }
+ Identifier that = (Identifier) o;
+ return Objects.equals(partition, that.partition)
+ && bucket == that.bucket
+ && level == that.level
+ && Objects.equals(fileName, that.fileName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(partition, bucket, level, fileName);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("{%s, %d, %d, %s}", partition, bucket, level, fileName);
+ }
+
+ public String toString(FileStorePathFactory pathFactory) {
+ return pathFactory.getPartitionString(partition)
+ + ", bucket "
+ + bucket
+ + ", level "
+ + level
+ + ", file "
+ + fileName;
+ }
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index f81a5cdc5..4f3b3e3ba 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -21,7 +21,6 @@ package org.apache.paimon.operation;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.manifest.AbstractManifestEntry;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestEntrySerializer;
@@ -219,8 +218,8 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {
};
}
- private <T extends AbstractManifestEntry> Pair<Long, List<T>> doPlan(
- Function<ManifestFileMeta, List<T>> readManifest) {
+ private Pair<Long, List<ManifestEntry>> doPlan(
+ Function<ManifestFileMeta, List<ManifestEntry>> readManifest) {
List<ManifestFileMeta> manifests = specifiedManifests;
Long snapshotId = specifiedSnapshotId;
if (manifests == null) {
@@ -237,7 +236,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {
final List<ManifestFileMeta> readManifests = manifests;
- Iterable<T> entries =
+ Iterable<ManifestEntry> entries =
ParallellyExecuteUtils.parallelismBatchIterable(
files ->
files.parallelStream()
@@ -248,8 +247,8 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {
readManifests,
scanManifestParallelism);
- List<T> files = new ArrayList<>();
- for (T file : AbstractManifestEntry.mergeEntries(entries)) {
+ List<ManifestEntry> files = new ArrayList<>();
+ for (ManifestEntry file : ManifestEntry.mergeEntries(entries)) {
if (checkNumOfBuckets && file.totalBuckets() != numOfBuckets) {
String partInfo =
partitionConverter.getArity() > 0
@@ -324,29 +323,19 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {
}
/** Note: Keep this thread-safe. */
- private boolean filterByBucket(AbstractManifestEntry entry) {
+ private boolean filterByBucket(ManifestEntry entry) {
return (specifiedBucket == null || entry.bucket() == specifiedBucket);
}
/** Note: Keep this thread-safe. */
- private boolean filterByBucketSelector(AbstractManifestEntry entry) {
+ private boolean filterByBucketSelector(ManifestEntry entry) {
return (bucketSelector == null
|| bucketSelector.select(entry.bucket(), entry.totalBuckets()));
}
/** Note: Keep this thread-safe. */
- private boolean filterByLevel(AbstractManifestEntry entry) {
- return (levelFilter == null || levelFilter.test(entry.level()));
- }
-
- /** Note: Keep this thread-safe. */
- private boolean filterByStats(AbstractManifestEntry entry) {
- // filterByStats is an action that is completed as much as possible and does not have an
- // impact if it is not done.
- if (entry instanceof ManifestEntry) {
- return filterByStats((ManifestEntry) entry);
- }
- return true;
+ private boolean filterByLevel(ManifestEntry entry) {
+ return (levelFilter == null || levelFilter.test(entry.file().level()));
}
/** Note: Keep this thread-safe. */