You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2023/01/10 21:55:11 UTC

[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #6782: [HUDI-4911][HUDI-3301] Fixing `HoodieMetadataLogRecordReader` to avoid flushing cache for every lookup

alexeykudinkin commented on code in PR #6782:
URL: https://github.com/apache/hudi/pull/6782#discussion_r1066355578


##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java:
##########
@@ -228,11 +228,11 @@ protected void cleanupSparkContexts() {
       LOG.info("Clearing sql context cache of spark-session used in previous test-case");
       sqlContext.clearCache();
       sqlContext = null;
+      sparkSession = null;

Review Comment:
   Had to fix the tests



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -198,41 +190,42 @@ protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List<Str
     this.path = basePath;
     this.useScanV2 = useScanV2;
 
-    // Key fields when populate meta fields is disabled (that is, virtual keys enabled)
-    if (!tableConfig.populateMetaFields()) {
-      this.populateMetaFields = false;
-      this.simpleKeyGenFields = Option.of(
-          Pair.of(tableConfig.getRecordKeyFieldProp(), tableConfig.getPartitionFieldProp()));
-    }
-    this.partitionName = partitionName;
-    this.recordType = recordMerger.getRecordType();
-  }
+    if (keyFieldOverride.isPresent()) {
+      // TODO elaborate
+      checkState(partitionNameOverride.isPresent());
 
-  protected String getKeyField() {
-    if (this.populateMetaFields) {
-      return HoodieRecord.RECORD_KEY_METADATA_FIELD;
+      this.populateMetaFields = false;
+      this.recordKeyField = keyFieldOverride.get();
+      this.partitionPathFieldOpt = Option.empty();
+    } else if (tableConfig.populateMetaFields()) {
+      this.populateMetaFields = true;
+      this.recordKeyField = HoodieRecord.RECORD_KEY_METADATA_FIELD;
+      this.partitionPathFieldOpt = Option.of(HoodieRecord.PARTITION_PATH_METADATA_FIELD);
+    } else {
+      this.populateMetaFields = false;

Review Comment:
   Added java-doc for the first branch, let me know if that makes the remaining 2 clearer



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -255,38 +255,35 @@ public List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecord
     return result;
   }
 
-  private Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> readLogRecords(HoodieMetadataMergedLogRecordReader logRecordScanner,
+  private Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> readLogRecords(HoodieMetadataLogRecordReader logRecordReader,
                                                                                   List<String> keys,
                                                                                   boolean fullKey,
                                                                                   List<Long> timings) {
     HoodieTimer timer = HoodieTimer.start();
 
-    if (logRecordScanner == null) {
+    if (logRecordReader == null) {
       timings.add(timer.endTimer());
       return Collections.emptyMap();
     }
 
-    String partitionName = logRecordScanner.getPartitionName().get();
+    Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords = new HashMap<>(keys.size());
 
-    Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords = new HashMap<>();
-    if (isFullScanAllowedForPartition(partitionName)) {

Review Comment:
   Full-scan or not should be implementation detail of the Reader, and after this patch we don't need to know that to use it -- APIs now provide similar incremental semantic (only keys that weren't scanned before would be scanned)



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hudi.common.model.HoodieAvroRecordMerger;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.table.log.InstantRange;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+/**
+ * A {@code HoodieMergedLogRecordScanner} implementation which only merged records matching providing keys. This is
+ * useful in limiting memory usage when only a small subset of updates records are to be read.
+ */
+@ThreadSafe
+public class HoodieMetadataLogRecordReader implements Closeable {
+
+  private final HoodieMergedLogRecordScanner logRecordScanner;
+
+  private HoodieMetadataLogRecordReader(HoodieMergedLogRecordScanner logRecordScanner) {
+    this.logRecordScanner = logRecordScanner;
+  }
+
+  /**
+   * Returns the builder for {@code HoodieMetadataMergedLogRecordScanner}.
+   */
+  public static HoodieMetadataLogRecordReader.Builder newBuilder() {
+    return new HoodieMetadataLogRecordReader.Builder();
+  }
+
+  @SuppressWarnings("unchecked")
+  public List<HoodieRecord<HoodieMetadataPayload>> getRecords() {
+    // NOTE: Locking is necessary since we're accessing [[HoodieMetadataLogRecordReader]]
+    //       materialized state, to make sure there's no concurrent access
+    synchronized (this) {
+      logRecordScanner.scan();

Review Comment:
   Great catch! 
   
   All APIs actually handle the case when we're doing full-scan except of this one. Addressed



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hudi.common.model.HoodieAvroRecordMerger;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.table.log.InstantRange;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+/**
+ * A {@code HoodieMergedLogRecordScanner} implementation which only merged records matching providing keys. This is
+ * useful in limiting memory usage when only a small subset of updates records are to be read.
+ */
+@ThreadSafe
+public class HoodieMetadataLogRecordReader implements Closeable {
+
+  private final HoodieMergedLogRecordScanner logRecordScanner;
+
+  private HoodieMetadataLogRecordReader(HoodieMergedLogRecordScanner logRecordScanner) {
+    this.logRecordScanner = logRecordScanner;
+  }
+
+  /**
+   * Returns the builder for {@code HoodieMetadataMergedLogRecordScanner}.
+   */
+  public static HoodieMetadataLogRecordReader.Builder newBuilder() {
+    return new HoodieMetadataLogRecordReader.Builder();
+  }
+
+  @SuppressWarnings("unchecked")
+  public List<HoodieRecord<HoodieMetadataPayload>> getRecords() {
+    // NOTE: Locking is necessary since we're accessing [[HoodieMetadataLogRecordReader]]
+    //       materialized state, to make sure there's no concurrent access
+    synchronized (this) {
+      logRecordScanner.scan();
+      return logRecordScanner.getRecords().values()
+          .stream()
+          .map(record -> (HoodieRecord<HoodieMetadataPayload>) record)
+          .collect(Collectors.toList());
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public List<HoodieRecord<HoodieMetadataPayload>> getRecordsByKeyPrefixes(List<String> keyPrefixes) {
+    if (keyPrefixes.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    // NOTE: Locking is necessary since we're accessing [[HoodieMetadataLogRecordReader]]
+    //       materialized state, to make sure there's no concurrent access
+    synchronized (this) {
+      logRecordScanner.scanByKeyPrefixes(keyPrefixes);
+      Map<String, HoodieRecord> allRecords = logRecordScanner.getRecords();
+
+      Predicate<String> p = createPrefixMatchingPredicate(keyPrefixes);
+      return allRecords.entrySet()
+          .stream()
+          .filter(r -> r != null && p.test(r.getKey()))

Review Comment:
   Keep in mind that `getRecords` now return map of all merged records not only ones matching the predicates (this is just preserved historical behavior of this API)



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -255,38 +255,35 @@ public List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecord
     return result;
   }
 
-  private Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> readLogRecords(HoodieMetadataMergedLogRecordReader logRecordScanner,
+  private Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> readLogRecords(HoodieMetadataLogRecordReader logRecordReader,
                                                                                   List<String> keys,
                                                                                   boolean fullKey,
                                                                                   List<Long> timings) {
     HoodieTimer timer = HoodieTimer.start();
 
-    if (logRecordScanner == null) {
+    if (logRecordReader == null) {
       timings.add(timer.endTimer());
       return Collections.emptyMap();
     }
 
-    String partitionName = logRecordScanner.getPartitionName().get();
+    Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords = new HashMap<>(keys.size());
 
-    Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords = new HashMap<>();
-    if (isFullScanAllowedForPartition(partitionName)) {
-      checkArgument(fullKey, "If full-scan is required, only full keys could be used!");
-      // Path which does full scan of log files
-      for (String key : keys) {
-        logRecords.put(key, logRecordScanner.getRecordByKey(key).get(0).getValue());
-      }
-    } else {
-      // This path will do seeks pertaining to the keys passed in
-      List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> logRecordsList =
-          fullKey ? logRecordScanner.getRecordsByKeys(keys)
-              : logRecordScanner.getRecordsByKeyPrefixes(keys)
-                  .stream()
-                  .map(record -> Pair.of(record.getRecordKey(), Option.of(record)))
-                  .collect(Collectors.toList());
-
-      for (Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry : logRecordsList) {
-        logRecords.put(entry.getKey(), entry.getValue());
-      }
+    // First, fetch the keys being looked up
+    List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> logRecordsList =
+        (fullKey ? logRecordReader.getRecordsByKeys(keys) : logRecordReader.getRecordsByKeyPrefixes(keys))
+                .stream()
+                .map(record -> Pair.of(record.getRecordKey(), Option.of(record)))
+                .collect(Collectors.toList());
+
+    // Second, back-fill keys not present in the log-blocks (such that map holds
+    // a record for every key being looked up)
+    List<String> missingKeys = CollectionUtils.diff(keys, logRecords.keySet());
+    for (String key : missingKeys) {
+      logRecords.put(key, Option.empty());
+    }
+
+    for (Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry : logRecordsList) {
+      logRecords.put(entry.getKey(), entry.getValue());

Review Comment:
   Responded



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java:
##########
@@ -108,30 +112,85 @@ protected HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List<Stri
     }
   }
 
-  protected void performScan() {
+  /**
+   * Scans delta-log files processing blocks
+   */
+  public final void scan() {
+    scan(false);
+  }
+
+  public final void scan(boolean skipProcessingBlocks) {
+    scanInternal(Option.empty(), skipProcessingBlocks);
+  }
+
+  /**
+   * Provides incremental scanning capability where only provided keys will be looked
+   * up in the delta-log files, scanned and subsequently materialized into the internal
+   * cache
+   *
+   * @param keys to be looked up
+   */
+  public void scanByFullKeys(List<String> keys) {

Review Comment:
   Yeah, good point. I've kept it consistent w/ `scan`, b/c semantic of these methods is the same and they differ just in scope.
   
   I'm open to it, but need to think more if it's a good idea to rename the `scan` methods as these could be relied upon (we're not annotating it as API, but it is an API of the Reader)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org