You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2023/01/07 04:46:43 UTC

[GitHub] [hudi] nsivabalan commented on a diff in pull request #6782: [HUDI-4911][HUDI-3301] Fixing `HoodieMetadataLogRecordReader` to avoid flushing cache for every lookup

nsivabalan commented on code in PR #6782:
URL: https://github.com/apache/hudi/pull/6782#discussion_r1063950675


##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java:
##########
@@ -108,30 +112,85 @@ protected HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List<Stri
     }
   }
 
-  protected void performScan() {
+  /**
+   * Scans delta-log files processing blocks
+   */
+  public final void scan() {
+    scan(false);
+  }
+
+  public final void scan(boolean skipProcessingBlocks) {
+    scanInternal(Option.empty(), skipProcessingBlocks);
+  }
+
+  /**
+   * Provides incremental scanning capability where only provided keys will be looked
+   * up in the delta-log files, scanned and subsequently materialized into the internal
+   * cache
+   *
+   * @param keys to be looked up
+   */
+  public void scanByFullKeys(List<String> keys) {
+    if (forceFullScan) {
+      return; // no-op
+    }
+
+    List<String> missingKeys = keys.stream()
+        .filter(key -> !records.containsKey(key))
+        .collect(Collectors.toList());
+
+    if (missingKeys.isEmpty()) {
+      // All the required records are already fetched, no-op
+      return;
+    }
+
+    scanInternal(Option.of(KeySpec.fullKeySpec(missingKeys)), false);

Review Comment:
   this is a nice optimization. good one.



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hudi.common.model.HoodieAvroRecordMerger;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.table.log.InstantRange;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+/**
+ * A {@code HoodieMergedLogRecordScanner} implementation which only merged records matching providing keys. This is

Review Comment:
   fix class name



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -255,38 +255,35 @@ public List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecord
     return result;
   }
 
-  private Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> readLogRecords(HoodieMetadataMergedLogRecordReader logRecordScanner,
+  private Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> readLogRecords(HoodieMetadataLogRecordReader logRecordReader,
                                                                                   List<String> keys,
                                                                                   boolean fullKey,
                                                                                   List<Long> timings) {
     HoodieTimer timer = HoodieTimer.start();
 
-    if (logRecordScanner == null) {
+    if (logRecordReader == null) {
       timings.add(timer.endTimer());
       return Collections.emptyMap();
     }
 
-    String partitionName = logRecordScanner.getPartitionName().get();
+    Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords = new HashMap<>(keys.size());
 
-    Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords = new HashMap<>();
-    if (isFullScanAllowedForPartition(partitionName)) {
-      checkArgument(fullKey, "If full-scan is required, only full keys could be used!");
-      // Path which does full scan of log files
-      for (String key : keys) {
-        logRecords.put(key, logRecordScanner.getRecordByKey(key).get(0).getValue());
-      }
-    } else {
-      // This path will do seeks pertaining to the keys passed in
-      List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> logRecordsList =
-          fullKey ? logRecordScanner.getRecordsByKeys(keys)
-              : logRecordScanner.getRecordsByKeyPrefixes(keys)
-                  .stream()
-                  .map(record -> Pair.of(record.getRecordKey(), Option.of(record)))
-                  .collect(Collectors.toList());
-
-      for (Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry : logRecordsList) {
-        logRecords.put(entry.getKey(), entry.getValue());
-      }
+    // First, fetch the keys being looked up
+    List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> logRecordsList =
+        (fullKey ? logRecordReader.getRecordsByKeys(keys) : logRecordReader.getRecordsByKeyPrefixes(keys))
+                .stream()
+                .map(record -> Pair.of(record.getRecordKey(), Option.of(record)))
+                .collect(Collectors.toList());
+
+    // Second, back-fill keys not present in the log-blocks (such that map holds
+    // a record for every key being looked up)
+    List<String> missingKeys = CollectionUtils.diff(keys, logRecords.keySet());
+    for (String key : missingKeys) {
+      logRecords.put(key, Option.empty());
+    }
+
+    for (Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry : logRecordsList) {
+      logRecords.put(entry.getKey(), entry.getValue());

Review Comment:
   will review L269 to 286 once I get the ans from you for my prev comment



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hudi.common.model.HoodieAvroRecordMerger;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.table.log.InstantRange;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+/**
+ * A {@code HoodieMergedLogRecordScanner} implementation which only merged records matching providing keys. This is
+ * useful in limiting memory usage when only a small subset of updates records are to be read.
+ */
+@ThreadSafe
+public class HoodieMetadataLogRecordReader implements Closeable {
+
+  private final HoodieMergedLogRecordScanner logRecordScanner;
+
+  private HoodieMetadataLogRecordReader(HoodieMergedLogRecordScanner logRecordScanner) {
+    this.logRecordScanner = logRecordScanner;
+  }
+
+  /**
+   * Returns the builder for {@code HoodieMetadataMergedLogRecordScanner}.
+   */
+  public static HoodieMetadataLogRecordReader.Builder newBuilder() {
+    return new HoodieMetadataLogRecordReader.Builder();
+  }
+
+  @SuppressWarnings("unchecked")
+  public List<HoodieRecord<HoodieMetadataPayload>> getRecords() {
+    // NOTE: Locking is necessary since we're accessing [[HoodieMetadataLogRecordReader]]
+    //       materialized state, to make sure there's no concurrent access
+    synchronized (this) {
+      logRecordScanner.scan();

Review Comment:
   won't this trigger scanning twice. logRecordScanner i.e. HoodieMergedLogRecordScanner instantiation would have trigger scan() already right? 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java:
##########
@@ -108,30 +112,85 @@ protected HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List<Stri
     }
   }
 
-  protected void performScan() {
+  /**
+   * Scans delta-log files processing blocks
+   */
+  public final void scan() {
+    scan(false);
+  }
+
+  public final void scan(boolean skipProcessingBlocks) {
+    scanInternal(Option.empty(), skipProcessingBlocks);
+  }
+
+  /**
+   * Provides incremental scanning capability where only provided keys will be looked
+   * up in the delta-log files, scanned and subsequently materialized into the internal
+   * cache
+   *
+   * @param keys to be looked up
+   */
+  public void scanByFullKeys(List<String> keys) {

Review Comment:
   this method naming is confusing w/ scan(), scanInternal(). may be we should rename this to scanAndGetRecords(). bcoz, for both scanByFullKeys and scanByKeyPrefixes, immediately caller calls logRecordScanner.getRecords(). 
   
   This will differentiate from one time preprocessing using scan or scanInternal(). 
   I am even open to renaming scan() to something like load/fetchRecordsFromStorage. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -198,41 +190,42 @@ protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List<Str
     this.path = basePath;
     this.useScanV2 = useScanV2;
 
-    // Key fields when populate meta fields is disabled (that is, virtual keys enabled)
-    if (!tableConfig.populateMetaFields()) {
-      this.populateMetaFields = false;
-      this.simpleKeyGenFields = Option.of(
-          Pair.of(tableConfig.getRecordKeyFieldProp(), tableConfig.getPartitionFieldProp()));
-    }
-    this.partitionName = partitionName;
-    this.recordType = recordMerger.getRecordType();
-  }
+    if (keyFieldOverride.isPresent()) {
+      // TODO elaborate
+      checkState(partitionNameOverride.isPresent());
 
-  protected String getKeyField() {
-    if (this.populateMetaFields) {
-      return HoodieRecord.RECORD_KEY_METADATA_FIELD;
+      this.populateMetaFields = false;
+      this.recordKeyField = keyFieldOverride.get();
+      this.partitionPathFieldOpt = Option.empty();
+    } else if (tableConfig.populateMetaFields()) {
+      this.populateMetaFields = true;
+      this.recordKeyField = HoodieRecord.RECORD_KEY_METADATA_FIELD;
+      this.partitionPathFieldOpt = Option.of(HoodieRecord.PARTITION_PATH_METADATA_FIELD);
+    } else {
+      this.populateMetaFields = false;

Review Comment:
   can you add java docs for all 3 cases what each one is meant for.
   might be confusing since we have virtual keys for data table, virtual keys for metadata table, keyField override is just for metadata table and so could get confusing. 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -255,38 +255,35 @@ public List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecord
     return result;
   }
 
-  private Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> readLogRecords(HoodieMetadataMergedLogRecordReader logRecordScanner,
+  private Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> readLogRecords(HoodieMetadataLogRecordReader logRecordReader,
                                                                                   List<String> keys,
                                                                                   boolean fullKey,
                                                                                   List<Long> timings) {
     HoodieTimer timer = HoodieTimer.start();
 
-    if (logRecordScanner == null) {
+    if (logRecordReader == null) {
       timings.add(timer.endTimer());
       return Collections.emptyMap();
     }
 
-    String partitionName = logRecordScanner.getPartitionName().get();
+    Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords = new HashMap<>(keys.size());
 
-    Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords = new HashMap<>();
-    if (isFullScanAllowedForPartition(partitionName)) {

Review Comment:
   incase of MDT, only for FILES partition, we enable full scan but for other partitions we do on-demand calls. So, we can't really rely on forceFullScan instance var value in LogRecordReader. Not sure how this pans out inline 273 to 276 after this fix. bcoz, I dont' see isFullScanAllowedForPartition() being used after this patch. am I missing something



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hudi.common.model.HoodieAvroRecordMerger;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.table.log.InstantRange;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+/**
+ * A {@code HoodieMergedLogRecordScanner} implementation which only merged records matching providing keys. This is
+ * useful in limiting memory usage when only a small subset of updates records are to be read.
+ */
+@ThreadSafe
+public class HoodieMetadataLogRecordReader implements Closeable {
+
+  private final HoodieMergedLogRecordScanner logRecordScanner;
+
+  private HoodieMetadataLogRecordReader(HoodieMergedLogRecordScanner logRecordScanner) {
+    this.logRecordScanner = logRecordScanner;
+  }
+
+  /**
+   * Returns the builder for {@code HoodieMetadataMergedLogRecordScanner}.
+   */
+  public static HoodieMetadataLogRecordReader.Builder newBuilder() {
+    return new HoodieMetadataLogRecordReader.Builder();
+  }
+
+  @SuppressWarnings("unchecked")
+  public List<HoodieRecord<HoodieMetadataPayload>> getRecords() {
+    // NOTE: Locking is necessary since we're accessing [[HoodieMetadataLogRecordReader]]
+    //       materialized state, to make sure there's no concurrent access
+    synchronized (this) {
+      logRecordScanner.scan();
+      return logRecordScanner.getRecords().values()
+          .stream()
+          .map(record -> (HoodieRecord<HoodieMetadataPayload>) record)
+          .collect(Collectors.toList());
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public List<HoodieRecord<HoodieMetadataPayload>> getRecordsByKeyPrefixes(List<String> keyPrefixes) {
+    if (keyPrefixes.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    // NOTE: Locking is necessary since we're accessing [[HoodieMetadataLogRecordReader]]
+    //       materialized state, to make sure there's no concurrent access
+    synchronized (this) {
+      logRecordScanner.scanByKeyPrefixes(keyPrefixes);
+      Map<String, HoodieRecord> allRecords = logRecordScanner.getRecords();
+
+      Predicate<String> p = createPrefixMatchingPredicate(keyPrefixes);
+      return allRecords.entrySet()
+          .stream()
+          .filter(r -> r != null && p.test(r.getKey()))

Review Comment:
   do we need this additional predicate filtering? if Hfile lookup is gonna return non matching records, we have some other underlying problem right. we are banking on hfile data block to return only matching records for keyPrefixes. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org