You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/10/30 05:33:30 UTC

[hudi] branch rfc-15 updated (8bdbdf4 -> d305948)

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a change to branch rfc-15
in repository https://gitbox.apache.org/repos/asf/hudi.git.


    from 8bdbdf4  [HUDI-842] Reader and Writer for Metadata.
     new 389be1b  [RFC-15] Add metrics to track the time for each file system call.
     new e8f1698  [RFC-15] Added a distributed metrics registry for spark which can be used to collect metrics from executors.
     new ce1c983  [RFC-15] Using distributed metrics registry in metadata code.
     new 7bea565  [MINOR] Reduced number of fs calls to check for table existence. Removed dead code.
     new a2dcacf  [HUDI-1346] Choose a new instant time when performing autoClean.
     new b16586d  [HUDI-1321] Created HoodieMetadataConfig to specify configuration for the metadata table.
     new 6ade850  [HUDI-1317] Fix initialization when Async jobs are scheduled.
     new 48d2f80  [RFC-15] Fixing checkstyle errors.
     new 86fdd65  [HUDI-1305] Added an API to shutdown and remove the metrics reporter. (#2132)
     new 9e8025d  [RFC-15] Fixing metrics printing to console which generates a lot of text.
     new 5e13b19  [RFC-15] Fixing checkstyle errors.
     new 34872bb  [RFC-15] Reverting change to AbstractHoodieLogRecordScanner as they lead to hung tests.
     new 4b7a4b3  [RFC-15] Fixing getFileSystemView() which requires getCompletedCommitsTimeline().
     new d305948  [RFC-15] Fixing code review comments

The 14 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/hudi/cli/commands/MetadataCommand.java  |  23 +-
 .../apache/hudi/client/AbstractHoodieClient.java   |  27 +-
 .../hudi/client/AbstractHoodieWriteClient.java     |   5 +
 .../apache/hudi/client/AsyncCleanerService.java    |   7 +-
 .../org/apache/hudi/client/HoodieWriteClient.java  |  48 ++--
 .../apache/hudi/config/HoodieMetadataConfig.java   | 151 +++++++++++
 .../apache/hudi/config/HoodieMetricsConfig.java    |   8 +
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  85 +++----
 .../apache/hudi/metadata/HoodieMetadataWriter.java | 122 ++++++---
 .../hudi/metrics/ConsoleMetricsReporter.java       |   3 +
 .../apache/hudi/metrics/DistributedRegistry.java   | 107 ++++++++
 .../main/java/org/apache/hudi/metrics/Metrics.java |  30 ++-
 .../java/org/apache/hudi/table/HoodieTable.java    |   9 +-
 .../apache/hudi/metadata/TestHoodieMetadata.java   |  99 ++++---
 .../hudi/metrics/TestHoodieConsoleMetrics.java     |   6 +
 .../java/org/apache/hudi/common/fs/FSUtils.java    |  17 --
 .../hudi/common/fs/HoodieWrapperFileSystem.java    | 283 +++++++++++++--------
 .../common/fs/SizeAwareFSDataOutputStream.java     |  17 +-
 .../hudi/common/fs/TimedFSDataInputStream.java     |  79 ++++++
 .../metrics/{Registry.java => LocalRegistry.java}  |  62 +----
 .../org/apache/hudi/common/metrics/Registry.java   | 108 ++++----
 .../table/log/AbstractHoodieLogRecordScanner.java  |  14 +-
 .../hudi/common/table/log/HoodieLogFileReader.java |   5 +-
 .../hudi/exception/TableNotFoundException.java     |  14 +-
 .../apache/hudi/metadata/HoodieMetadataReader.java | 105 ++++----
 25 files changed, 971 insertions(+), 463 deletions(-)
 create mode 100644 hudi-client/src/main/java/org/apache/hudi/config/HoodieMetadataConfig.java
 create mode 100644 hudi-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java
 create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/fs/TimedFSDataInputStream.java
 copy hudi-common/src/main/java/org/apache/hudi/common/metrics/{Registry.java => LocalRegistry.java} (58%)


[hudi] 02/14: [RFC-15] Added a distributed metrics registry for spark which can be used to collect metrics from executors.

Posted by vi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch rfc-15
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit e8f16989b952588461bd38d1cd5a4679c29e259f
Author: Prashant Wason <pw...@uber.com>
AuthorDate: Fri Oct 16 14:26:07 2020 -0700

    [RFC-15] Added a distributed metrics registry for spark which can be used to collect metrics from executors.
    
    This helps create a stats dashboard which shows the metadata table improvements in real-time for production tables.
---
 .../apache/hudi/client/AbstractHoodieClient.java   |  24 ++++-
 .../hudi/client/AbstractHoodieWriteClient.java     |   5 +
 .../org/apache/hudi/client/HoodieWriteClient.java  |  27 +++--
 .../apache/hudi/config/HoodieMetricsConfig.java    |   8 ++
 .../org/apache/hudi/config/HoodieWriteConfig.java  |   4 +
 .../apache/hudi/metrics/DistributedRegistry.java   | 107 ++++++++++++++++++++
 .../hudi/common/fs/HoodieWrapperFileSystem.java    |  25 +++--
 .../hudi/common/fs/SizeAwareFSDataInputStream.java |   8 --
 .../common/fs/SizeAwareFSDataOutputStream.java     |   7 --
 .../metrics/{Registry.java => LocalRegistry.java}  |  60 +++---------
 .../org/apache/hudi/common/metrics/Registry.java   | 109 ++++++++++++---------
 11 files changed, 250 insertions(+), 134 deletions(-)

diff --git a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java
index cdd125e..fd02b6d 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java
@@ -22,10 +22,12 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hudi.client.embedded.EmbeddedTimelineService;
 import org.apache.hudi.client.utils.ClientUtils;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.metrics.Registry;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
-
+import org.apache.hudi.metrics.DistributedRegistry;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -122,6 +124,26 @@ public abstract class AbstractHoodieClient implements Serializable, AutoCloseabl
   }
 
   protected HoodieTableMetaClient createMetaClient(boolean loadActiveTimelineOnLoad) {
+    if (config.isMetricsOn()) {
+      Registry registry;
+      Registry registryMeta;
+
+      if (config.isExecutorMetricsEnabled()) {
+        // Create a distributed registry for HoodieWrapperFileSystem
+        registry = Registry.getRegistry(HoodieWrapperFileSystem.class.getSimpleName(),
+            DistributedRegistry.class.getName());
+        ((DistributedRegistry)registry).register(jsc);
+        registryMeta = Registry.getRegistry(HoodieWrapperFileSystem.class.getSimpleName() + "MetaFolder",
+            DistributedRegistry.class.getName());
+        ((DistributedRegistry)registryMeta).register(jsc);
+      } else {
+        registry = Registry.getRegistry(HoodieWrapperFileSystem.class.getSimpleName());
+        registryMeta = Registry.getRegistry(HoodieWrapperFileSystem.class.getSimpleName() + "MetaFolder");
+      }
+
+      HoodieWrapperFileSystem.setMetricsRegistry(registry, registryMeta);
+    }
+
     return ClientUtils.createMetaClient(hadoopConf, config, loadActiveTimelineOnLoad);
   }
 }
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
index 4566fe8..a4015a6 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
@@ -232,6 +232,11 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
     return table;
   }
 
+  protected HoodieTable<T> getTable() {
+    HoodieTableMetaClient metaClient = createMetaClient(true);
+    return HoodieTable.create(metaClient, config, hadoopConf);
+  }
+
   /**
    * Sets write schema from last instant since deletes may not have schema set in the config.
    */
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
index 5fa72d4..a0019b0 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
@@ -146,7 +146,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
    */
   public JavaRDD<HoodieRecord<T>> filterExists(JavaRDD<HoodieRecord<T>> hoodieRecords) {
     // Create a Hoodie table which encapsulated the commits and files visible
-    HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
+    HoodieTable<T> table = getTable();
     Timer.Context indexTimer = metrics.getIndexCtx();
     JavaRDD<HoodieRecord<T>> recordsWithLocation = getIndex().tagLocation(hoodieRecords, jsc, table);
     metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
@@ -169,7 +169,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
    */
   protected void rollBackInflightBootstrap() {
     LOG.info("Rolling back pending bootstrap if present");
-    HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
+    HoodieTable<T> table = getTable();
     HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
     Option<String> instant = Option.fromJavaOptional(
         inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp).findFirst());
@@ -438,7 +438,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
    * @param comment - Comment for the savepoint
    */
   public void savepoint(String user, String comment) {
-    HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
+    HoodieTable<T> table = getTable();
     if (table.getCompletedCommitsTimeline().empty()) {
       throw new HoodieSavepointException("Could not savepoint. Commit timeline is empty");
     }
@@ -462,7 +462,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
    * @param comment - Comment for the savepoint
    */
   public void savepoint(String instantTime, String user, String comment) {
-    HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
+    HoodieTable<T> table = getTable();
     table.savepoint(jsc, instantTime, user, comment);
   }
 
@@ -474,7 +474,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
    * @return true if the savepoint was deleted successfully
    */
   public void deleteSavepoint(String savepointTime) {
-    HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
+    HoodieTable<T> table = getTable();
     SavepointHelpers.deleteSavepoint(table, savepointTime);
   }
 
@@ -489,7 +489,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
    * @return true if the savepoint was restored to successfully
    */
   public void restoreToSavepoint(String savepointTime) {
-    HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
+    HoodieTable<T> table = getTable();
     SavepointHelpers.validateSavepointPresence(table, savepointTime);
     restoreToInstant(savepointTime);
     SavepointHelpers.validateSavepointRestore(table, savepointTime);
@@ -506,7 +506,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
     final String rollbackInstantTime = HoodieActiveTimeline.createNewInstantTime();
     final Timer.Context context = this.metrics.getRollbackCtx();
     try {
-      HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
+      HoodieTable<T> table = getTable();
       Option<HoodieInstant> commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants()
               .filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime))
               .findFirst());
@@ -537,7 +537,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
     final String restoreInstantTime = HoodieActiveTimeline.createNewInstantTime();
     Timer.Context context = metrics.getRollbackCtx();
     try {
-      HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
+      HoodieTable<T> table = getTable();
       HoodieRestoreMetadata restoreMetadata = table.restore(jsc, restoreInstantTime, instantTime);
       if (context != null) {
         final long durationInMs = metrics.getDurationInMs(context.stop());
@@ -571,7 +571,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
   public HoodieCleanMetadata clean(String cleanInstantTime) throws HoodieIOException {
     LOG.info("Cleaner started");
     final Timer.Context context = metrics.getCleanCtx();
-    HoodieCleanMetadata metadata = HoodieTable.create(config, hadoopConf).clean(jsc, cleanInstantTime);
+    HoodieCleanMetadata metadata = getTable().clean(jsc, cleanInstantTime);
     if (context != null && metadata != null) {
       long durationMs = metrics.getDurationInMs(context.stop());
       metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
@@ -660,8 +660,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
    */
   public boolean scheduleCompactionAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
     LOG.info("Scheduling compaction at instant time :" + instantTime);
-    Option<HoodieCompactionPlan> plan = HoodieTable.create(config, hadoopConf)
-        .scheduleCompaction(jsc, instantTime, extraMetadata);
+    Option<HoodieCompactionPlan> plan = getTable().scheduleCompaction(jsc, instantTime, extraMetadata);
     return plan.isPresent();
   }
 
@@ -684,7 +683,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
    */
   public void commitCompaction(String compactionInstantTime, JavaRDD<WriteStatus> writeStatuses,
                                Option<Map<String, String>> extraMetadata) throws IOException {
-    HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
+    HoodieTable<T> table = getTable();
     HoodieCommitMetadata metadata = CompactHelpers.createCompactionMetadata(
         table, compactionInstantTime, writeStatuses, config.getSchema());
     extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata));
@@ -733,7 +732,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
    * Cleanup all pending commits.
    */
   private void rollbackPendingCommits() {
-    HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
+    HoodieTable<T> table = getTable();
     HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
     List<String> commits = inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp)
         .collect(Collectors.toList());
@@ -755,7 +754,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
    * @return RDD of Write Status
    */
   private JavaRDD<WriteStatus> compact(String compactionInstantTime, boolean shouldComplete) {
-    HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
+    HoodieTable<T> table = getTable();
     HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
     HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
     if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java
index 800c75f..b6cb6e5 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java
@@ -62,6 +62,9 @@ public class HoodieMetricsConfig extends DefaultHoodieConfig {
   public static final String METRICS_REPORTER_CLASS = METRIC_PREFIX + ".reporter.class";
   public static final String DEFAULT_METRICS_REPORTER_CLASS = "";
 
+  // Enable metrics collection from executors
+  public static final String ENABLE_EXECUTOR_METRICS = METRIC_PREFIX + ".executor.enable";
+
   private HoodieMetricsConfig(Properties props) {
     super(props);
   }
@@ -126,6 +129,11 @@ public class HoodieMetricsConfig extends DefaultHoodieConfig {
       return this;
     }
 
+    public Builder withExecutorMetrics(boolean enable) {
+      props.setProperty(ENABLE_EXECUTOR_METRICS, String.valueOf(enable));
+      return this;
+    }
+
     public HoodieMetricsConfig build() {
       HoodieMetricsConfig config = new HoodieMetricsConfig(props);
       setDefaultOnCondition(props, !props.containsKey(METRICS_ON), METRICS_ON, String.valueOf(DEFAULT_METRICS_ON));
diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 527f18f..82199ff 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -686,6 +686,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
     return Boolean.parseBoolean(props.getProperty(HoodieMetricsPrometheusConfig.PUSHGATEWAY_RANDOM_JOB_NAME_SUFFIX));
   }
 
+  public boolean isExecutorMetricsEnabled() {
+    return Boolean.parseBoolean(props.getProperty(HoodieMetricsConfig.ENABLE_EXECUTOR_METRICS, "false"));
+  }
+
   /**
    * memory configs.
    */
diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java b/hudi-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java
new file mode 100644
index 0000000..5197a22
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.util.AccumulatorV2;
+
+/**
+ * Lightweight Metrics Registry to track Hudi events.
+ */
+public class DistributedRegistry extends AccumulatorV2<Map<String, Long>, Map<String, Long>>
+    implements Registry, Serializable {
+  private String name;
+  ConcurrentHashMap<String, Long> counters = new ConcurrentHashMap<>();
+
+  public DistributedRegistry(String name) {
+    this.name = name;
+  }
+
+  public void register(JavaSparkContext jsc) {
+    if (!isRegistered()) {
+      jsc.sc().register(this);
+    }
+  }
+
+  @Override
+  public void clear() {
+    counters.clear();
+  }
+
+  @Override
+  public void increment(String name) {
+    counters.merge(name,  1L, (oldValue, newValue) -> oldValue + newValue);
+  }
+
+  @Override
+  public void add(String name, long value) {
+    counters.merge(name,  value, (oldValue, newValue) -> oldValue + newValue);
+  }
+
+  /**
+   * Get all Counter type metrics.
+   */
+  @Override
+  public Map<String, Long> getAllCounts(boolean prefixWithRegistryName) {
+    HashMap<String, Long> countersMap = new HashMap<>();
+    counters.forEach((k, v) -> {
+      String key = prefixWithRegistryName ? name + "." + k : k;
+      countersMap.put(key, v);
+    });
+    return countersMap;
+  }
+
+  @Override
+  public void add(Map<String, Long> arg) {
+    arg.forEach((key, value) -> add(key, value));
+  }
+
+  @Override
+  public AccumulatorV2<Map<String, Long>, Map<String, Long>> copy() {
+    DistributedRegistry registry = new DistributedRegistry(name);
+    counters.forEach((key, value) -> registry.add(key, value));
+    return registry;
+  }
+
+  @Override
+  public boolean isZero() {
+    return counters.isEmpty();
+  }
+
+  @Override
+  public void merge(AccumulatorV2<Map<String, Long>, Map<String, Long>> acc) {
+    acc.value().forEach((key, value) -> add(key, value));
+  }
+
+  @Override
+  public void reset() {
+    counters.clear();
+  }
+
+  @Override
+  public Map<String, Long> value() {
+    return counters;
+  }
+}
\ No newline at end of file
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java
index 9cea356..cdda082 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java
@@ -74,9 +74,8 @@ public class HoodieWrapperFileSystem extends FileSystem {
   private FileSystem fileSystem;
   private URI uri;
   private ConsistencyGuard consistencyGuard = new NoOpConsistencyGuard();
-  private static Registry metricsRegistry = Registry.getRegistry(HoodieWrapperFileSystem.class.getSimpleName());
-  private static Registry metricsRegistryMetaFolder =
-      Registry.getRegistry(HoodieWrapperFileSystem.class.getSimpleName() + "MetaFolder");
+  private static Registry metricsRegistry;
+  private static Registry metricsRegistryMetaFolder;
 
   @FunctionalInterface
   public interface CheckedFunction<R> {
@@ -89,12 +88,14 @@ public class HoodieWrapperFileSystem extends FileSystem {
   }
 
   protected static <R> R executeFuncWithTimeMetrics(String metricName, Path p, CheckedFunction<R> func) throws IOException {
-    Registry registry = getMetricRegistryForPath(p);
-
     long t1 = System.currentTimeMillis();
     R res = func.get();
-    registry.increment(metricName);
-    registry.add(metricName + ".totalDuration", System.currentTimeMillis() - t1);
+
+    Registry registry = getMetricRegistryForPath(p);
+    if (registry != null) {
+      registry.increment(metricName);
+      registry.add(metricName + ".totalDuration", System.currentTimeMillis() - t1);
+    }
 
     return res;
   }
@@ -102,10 +103,18 @@ public class HoodieWrapperFileSystem extends FileSystem {
   protected static <R> R executeFuncWithTimeAndByteMetrics(String metricName, Path p, long byteCount,
                                                            CheckedFunction<R> func) throws IOException {
     Registry registry = getMetricRegistryForPath(p);
-    registry.add(metricName + ".totalBytes", byteCount);
+    if (registry != null) {
+      registry.add(metricName + ".totalBytes", byteCount);
+    }
+
     return executeFuncWithTimeMetrics(metricName, p, func);
   }
 
+  public static void setMetricsRegistry(Registry registry, Registry registryMeta) {
+    metricsRegistry = registry;
+    metricsRegistryMetaFolder = registryMeta;
+  }
+
   public HoodieWrapperFileSystem() {}
 
   public HoodieWrapperFileSystem(FileSystem fileSystem, ConsistencyGuard consistencyGuard) {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataInputStream.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataInputStream.java
index 684a625..f5adb0d 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataInputStream.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataInputStream.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.common.fs;
 
-import org.apache.hudi.common.metrics.Registry;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.ReadOption;
@@ -35,8 +34,6 @@ public class SizeAwareFSDataInputStream extends FSDataInputStream {
 
   // Path
   private final Path path;
-  // Registry or read and write metrics
-  Registry metricsRegistry;
 
   public SizeAwareFSDataInputStream(Path path, FSDataInputStream in) throws IOException {
     super(in);
@@ -85,9 +82,4 @@ public class SizeAwareFSDataInputStream extends FSDataInputStream {
           return null;
         });
   }
-
-  public void setMetricRegistry(Registry metricsRegistry) {
-    this.metricsRegistry = metricsRegistry;
-  }
-
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataOutputStream.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataOutputStream.java
index e0607d0..ac07cd7 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataOutputStream.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataOutputStream.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.common.fs;
 
-import org.apache.hudi.common.metrics.Registry;
 import org.apache.hudi.exception.HoodieException;
 
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -42,8 +41,6 @@ public class SizeAwareFSDataOutputStream extends FSDataOutputStream {
   private final Path path;
   // Consistency guard
   private final ConsistencyGuard consistencyGuard;
-  // Registry or read and write metrics
-  Registry metricsRegistry;
 
   public SizeAwareFSDataOutputStream(Path path, FSDataOutputStream out, ConsistencyGuard consistencyGuard,
       Runnable closeCallback) throws IOException {
@@ -87,8 +84,4 @@ public class SizeAwareFSDataOutputStream extends FSDataOutputStream {
   public long getBytesWritten() {
     return bytesWritten.get();
   }
-
-  public void setMetricRegistry(Registry metricsRegistry) {
-    this.metricsRegistry = metricsRegistry;
-  }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java b/hudi-common/src/main/java/org/apache/hudi/common/metrics/LocalRegistry.java
similarity index 60%
copy from hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java
copy to hudi-common/src/main/java/org/apache/hudi/common/metrics/LocalRegistry.java
index 169e8bc..36aeab9 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/metrics/LocalRegistry.java
@@ -22,76 +22,36 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
-
 /**
  * Lightweight Metrics Registry to track Hudi events.
  */
-public class Registry {
+public class LocalRegistry implements Registry {
   ConcurrentHashMap<String, Counter> counters = new ConcurrentHashMap<>();
-  final String name;
-
-  private static ConcurrentHashMap<String, Registry> registryMap = new ConcurrentHashMap<>();
+  private String name;
 
-  private Registry(String name) {
+  public LocalRegistry(String name) {
     this.name = name;
   }
 
-  /**
-   * Get (or create) the registry for a provided name.
-   */
-  public static synchronized Registry getRegistry(String registryName) {
-    if (!registryMap.containsKey(registryName)) {
-      registryMap.put(registryName, new Registry(registryName));
-    }
-    return registryMap.get(registryName);
-  }
-
-  /**
-   * Get all registered metrics.
-   * @param flush clean all metrics as part of this operation.
-   * @param prefixWithRegistryName prefix each metric name with the registry name.
-   * @return
-   */
-  public static synchronized Map<String, Long> getAllMetrics(boolean flush, boolean prefixWithRegistryName) {
-    HashMap<String, Long> allMetrics = new HashMap<>();
-    registryMap.forEach((registryName, registry) -> {
-      allMetrics.putAll(registry.getAllCounts(prefixWithRegistryName));
-      if (flush) {
-        registry.clear();
-      }
-    });
-    return allMetrics;
-  }
-
+  @Override
   public void clear() {
     counters.clear();
   }
 
+  @Override
   public void increment(String name) {
     getCounter(name).increment();
   }
 
+  @Override
   public void add(String name, long value) {
     getCounter(name).add(value);
   }
 
-  private synchronized Counter getCounter(String name) {
-    if (!counters.containsKey(name)) {
-      counters.put(name, new Counter());
-    }
-    return counters.get(name);
-  }
-
-  /**
-   * Get all Counter type metrics.
-   */
-  public Map<String, Long> getAllCounts() {
-    return getAllCounts(false);
-  }
-
   /**
    * Get all Counter type metrics.
    */
+  @Override
   public Map<String, Long> getAllCounts(boolean prefixWithRegistryName) {
     HashMap<String, Long> countersMap = new HashMap<>();
     counters.forEach((k, v) -> {
@@ -101,4 +61,10 @@ public class Registry {
     return countersMap;
   }
 
+  private synchronized Counter getCounter(String name) {
+    if (!counters.containsKey(name)) {
+      counters.put(name, new Counter());
+    }
+    return counters.get(name);
+  }
 }
\ No newline at end of file
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java b/hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java
index 169e8bc..0a56297 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java
@@ -18,87 +18,98 @@
 
 package org.apache.hudi.common.metrics;
 
+import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.hudi.common.util.ReflectionUtils;
+
 
 /**
- * Lightweight Metrics Registry to track Hudi events.
+ * Interface which defines a lightweight Metrics Registry to track Hudi events.
  */
-public class Registry {
-  ConcurrentHashMap<String, Counter> counters = new ConcurrentHashMap<>();
-  final String name;
-
-  private static ConcurrentHashMap<String, Registry> registryMap = new ConcurrentHashMap<>();
+public interface Registry extends Serializable {
+  static ConcurrentHashMap<String, Registry> REGISTRYMAP = new ConcurrentHashMap<>();
 
-  private Registry(String name) {
-    this.name = name;
+  /**
+   * Get (or create) the registry for a provided name.
+   *
+   * This function creates a {@code LocalRegistry}.
+   *
+   * @param registryName Name of the registry
+   */
+  public static Registry getRegistry(String registryName) {
+    return getRegistry(registryName, LocalRegistry.class.getName());
   }
 
   /**
-   * Get (or create) the registry for a provided name.
+   * Get (or create) the registry for a provided name and given class.
+   *
+   * @param registryName Name of the registry.
+   * @param clazz The fully qualified name of the registry class to create.
    */
-  public static synchronized Registry getRegistry(String registryName) {
-    if (!registryMap.containsKey(registryName)) {
-      registryMap.put(registryName, new Registry(registryName));
+  public static Registry getRegistry(String registryName, String clazz) {
+    synchronized (Registry.class) {
+      if (!REGISTRYMAP.containsKey(registryName)) {
+        Registry registry = (Registry)ReflectionUtils.loadClass(clazz, registryName);
+        REGISTRYMAP.put(registryName, registry);
+      }
+      return REGISTRYMAP.get(registryName);
     }
-    return registryMap.get(registryName);
   }
 
   /**
    * Get all registered metrics.
-   * @param flush clean all metrics as part of this operation.
+   *
+   * @param flush clear all metrics after this operation.
    * @param prefixWithRegistryName prefix each metric name with the registry name.
    * @return
    */
-  public static synchronized Map<String, Long> getAllMetrics(boolean flush, boolean prefixWithRegistryName) {
-    HashMap<String, Long> allMetrics = new HashMap<>();
-    registryMap.forEach((registryName, registry) -> {
-      allMetrics.putAll(registry.getAllCounts(prefixWithRegistryName));
-      if (flush) {
-        registry.clear();
-      }
-    });
-    return allMetrics;
-  }
-
-  public void clear() {
-    counters.clear();
+  public static Map<String, Long> getAllMetrics(boolean flush, boolean prefixWithRegistryName) {
+    synchronized (Registry.class) {
+      HashMap<String, Long> allMetrics = new HashMap<>();
+      REGISTRYMAP.forEach((registryName, registry) -> {
+        allMetrics.putAll(registry.getAllCounts(prefixWithRegistryName));
+        if (flush) {
+          registry.clear();
+        }
+      });
+      return allMetrics;
+    }
   }
 
-  public void increment(String name) {
-    getCounter(name).increment();
-  }
+  /**
+   * Clear all metrics.
+   */
+  public void clear();
 
-  public void add(String name, long value) {
-    getCounter(name).add(value);
-  }
+  /**
+   * Increment the metric.
+   *
+   * @param name Name of the metric to increment.
+   */
+  public void increment(String name);
 
-  private synchronized Counter getCounter(String name) {
-    if (!counters.containsKey(name)) {
-      counters.put(name, new Counter());
-    }
-    return counters.get(name);
-  }
+  /**
+   * Add value to the metric.
+   *
+   * @param name Name of the metric.
+   * @param value The value to add to the metrics.
+   */
+  public void add(String name, long value);
 
   /**
    * Get all Counter type metrics.
    */
-  public Map<String, Long> getAllCounts() {
+  public default Map<String, Long> getAllCounts() {
     return getAllCounts(false);
   }
 
   /**
    * Get all Counter type metrics.
+   *
+   * @param prefixWithRegistryName If true, the names of all metrics are prefixed with name of this registry.
    */
-  public Map<String, Long> getAllCounts(boolean prefixWithRegistryName) {
-    HashMap<String, Long> countersMap = new HashMap<>();
-    counters.forEach((k, v) -> {
-      String key = prefixWithRegistryName ? name + "." + k : k;
-      countersMap.put(key, v.getValue());
-    });
-    return countersMap;
-  }
-
+  public abstract Map<String, Long> getAllCounts(boolean prefixWithRegistryName);
 }
\ No newline at end of file


[hudi] 08/14: [RFC-15] Fixing checkstyle errors.

Posted by vi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch rfc-15
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 48d2f807c7382d57005e4a50c6ac10db2dae84bf
Author: Prashant Wason <pw...@uber.com>
AuthorDate: Tue Oct 27 15:56:01 2020 -0700

    [RFC-15] Fixing checkstyle errors.
---
 .../java/org/apache/hudi/cli/commands/MetadataCommand.java   | 12 ++++++++----
 .../java/org/apache/hudi/metadata/HoodieMetadataWriter.java  |  2 --
 .../java/org/apache/hudi/metadata/TestHoodieMetadata.java    |  1 -
 3 files changed, 8 insertions(+), 7 deletions(-)

diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
index 4c31efd..4ecc6a9 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hudi.cli.HoodieCLI;
 import org.apache.hudi.cli.utils.SparkUtil;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.config.HoodieMetadataConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.metadata.HoodieMetadataReader;
 import org.apache.hudi.metadata.HoodieMetadataWriter;
@@ -71,8 +72,7 @@ public class MetadataCommand implements CommandMarker {
     }
 
     long t1 = System.currentTimeMillis();
-    HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath(HoodieCLI.basePath)
-        .withUseFileListingMetadata(true).build();
+    HoodieWriteConfig writeConfig = getWriteConfig();
     initJavaSparkContext();
     HoodieMetadataWriter.instance(HoodieCLI.conf, writeConfig).initialize(jsc);
     long t2 = System.currentTimeMillis();
@@ -113,8 +113,7 @@ public class MetadataCommand implements CommandMarker {
     if (readOnly) {
       //HoodieMetadata.init(HoodieCLI.conf, HoodieCLI.basePath);
     } else {
-      HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath(HoodieCLI.basePath)
-          .withUseFileListingMetadata(true).build();
+      HoodieWriteConfig writeConfig = getWriteConfig();
       initJavaSparkContext();
       HoodieMetadataWriter.instance(HoodieCLI.conf, writeConfig).initialize(jsc);
     }
@@ -195,6 +194,11 @@ public class MetadataCommand implements CommandMarker {
     return out.toString();
   }
 
+  private HoodieWriteConfig getWriteConfig() {
+    return HoodieWriteConfig.newBuilder().withPath(HoodieCLI.basePath)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build();
+  }
+
   private void initJavaSparkContext() {
     if (jsc == null) {
       jsc = SparkUtil.initJavaSparkConf("HoodieClI");
diff --git a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java
index c7eb33f..655890b 100644
--- a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java
+++ b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java
@@ -42,7 +42,6 @@ import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.client.HoodieWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.utils.ClientUtils;
-import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.metrics.Registry;
@@ -58,7 +57,6 @@ import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
-import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
diff --git a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
index 36a8c13..e830054 100644
--- a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
+++ b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
@@ -54,7 +54,6 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.table.view.TableFileSystemView;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestTable;
-import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;


[hudi] 11/14: [RFC-15] Fixing checkstyle errors.

Posted by vi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch rfc-15
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 5e13b19d6aecc4d92a6c54e002386b9250c14d8b
Author: Prashant Wason <pw...@uber.com>
AuthorDate: Tue Oct 27 23:44:16 2020 -0700

    [RFC-15] Fixing checkstyle errors.
---
 .../src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java       | 1 -
 1 file changed, 1 deletion(-)

diff --git a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
index 3f8d4ca..98675e4 100644
--- a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
+++ b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
@@ -63,7 +63,6 @@ import org.apache.hudi.config.HoodieStorageConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.TableNotFoundException;
 import org.apache.hudi.index.HoodieIndex;
-import org.apache.hudi.metrics.Metrics;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.testutils.HoodieClientTestHarness;
 import org.apache.log4j.LogManager;


[hudi] 01/14: [RFC-15] Add metrics to track the time for each file system call.

Posted by vi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch rfc-15
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 389be1be88aa897fbec3777a7def6abb26e11a35
Author: Prashant Wason <pw...@uber.com>
AuthorDate: Thu Oct 8 22:26:07 2020 -0700

    [RFC-15] Add metrics to track the time for each file system call.
---
 .../hudi/common/fs/HoodieWrapperFileSystem.java    | 273 +++++++++++++--------
 .../hudi/common/fs/SizeAwareFSDataInputStream.java |  93 +++++++
 .../common/fs/SizeAwareFSDataOutputStream.java     |  22 +-
 3 files changed, 280 insertions(+), 108 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java
index c3f6189..9cea356 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.common.fs;
 
 import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 
@@ -65,15 +66,45 @@ public class HoodieWrapperFileSystem extends FileSystem {
 
   public static final String HOODIE_SCHEME_PREFIX = "hoodie-";
 
-  private enum MetricName {
-    create, rename, delete, listStatus, mkdirs, getFileStatus, globStatus, listFiles
+  protected enum MetricName {
+    create, rename, delete, listStatus, mkdirs, getFileStatus, globStatus, listFiles, read, write
   }
 
   private ConcurrentMap<String, SizeAwareFSDataOutputStream> openStreams = new ConcurrentHashMap<>();
   private FileSystem fileSystem;
   private URI uri;
   private ConsistencyGuard consistencyGuard = new NoOpConsistencyGuard();
-  private Registry metricsRegistry = Registry.getRegistry(this.getClass().getSimpleName());
+  private static Registry metricsRegistry = Registry.getRegistry(HoodieWrapperFileSystem.class.getSimpleName());
+  private static Registry metricsRegistryMetaFolder =
+      Registry.getRegistry(HoodieWrapperFileSystem.class.getSimpleName() + "MetaFolder");
+
+  @FunctionalInterface
+  public interface CheckedFunction<R> {
+    R get() throws IOException;
+  }
+
+  private static Registry getMetricRegistryForPath(Path p) {
+    return ((p != null) && (p.toString().contains(HoodieTableMetaClient.METAFOLDER_NAME)))
+        ? metricsRegistryMetaFolder : metricsRegistry;
+  }
+
+  protected static <R> R executeFuncWithTimeMetrics(String metricName, Path p, CheckedFunction<R> func) throws IOException {
+    Registry registry = getMetricRegistryForPath(p);
+
+    long t1 = System.currentTimeMillis();
+    R res = func.get();
+    registry.increment(metricName);
+    registry.add(metricName + ".totalDuration", System.currentTimeMillis() - t1);
+
+    return res;
+  }
+
+  protected static <R> R executeFuncWithTimeAndByteMetrics(String metricName, Path p, long byteCount,
+                                                           CheckedFunction<R> func) throws IOException {
+    Registry registry = getMetricRegistryForPath(p);
+    registry.add(metricName + ".totalBytes", byteCount);
+    return executeFuncWithTimeMetrics(metricName, p, func);
+  }
 
   public HoodieWrapperFileSystem() {}
 
@@ -140,16 +171,17 @@ public class HoodieWrapperFileSystem extends FileSystem {
 
   @Override
   public FSDataInputStream open(Path f, int bufferSize) throws IOException {
-    return fileSystem.open(convertToDefaultPath(f), bufferSize);
+    return wrapInputStream(f, fileSystem.open(convertToDefaultPath(f), bufferSize));
   }
 
   @Override
   public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize,
       short replication, long blockSize, Progressable progress) throws IOException {
-    this.metricsRegistry.increment(MetricName.create.name());
-    final Path translatedPath = convertToDefaultPath(f);
-    return wrapOutputStream(f,
-        fileSystem.create(translatedPath, permission, overwrite, bufferSize, replication, blockSize, progress));
+    return executeFuncWithTimeMetrics(MetricName.create.name(), f, () -> {
+      final Path translatedPath = convertToDefaultPath(f);
+      return wrapOutputStream(f,
+          fileSystem.create(translatedPath, permission, overwrite, bufferSize, replication, blockSize, progress));
+    });
   }
 
   private FSDataOutputStream wrapOutputStream(final Path path, FSDataOutputStream fsDataOutputStream)
@@ -164,79 +196,99 @@ public class HoodieWrapperFileSystem extends FileSystem {
     return os;
   }
 
+  private FSDataInputStream wrapInputStream(final Path path, FSDataInputStream fsDataInputStream) throws IOException {
+    if (fsDataInputStream instanceof SizeAwareFSDataInputStream) {
+      return fsDataInputStream;
+    }
+
+    SizeAwareFSDataInputStream os = new SizeAwareFSDataInputStream(path, fsDataInputStream);
+    return os;
+  }
+
   @Override
   public FSDataOutputStream create(Path f, boolean overwrite) throws IOException {
-    this.metricsRegistry.increment(MetricName.create.name());
-    return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), overwrite));
+    return executeFuncWithTimeMetrics(MetricName.create.name(), f, () -> {
+      return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), overwrite));
+    });
   }
 
   @Override
   public FSDataOutputStream create(Path f) throws IOException {
-    this.metricsRegistry.increment(MetricName.create.name());
-    return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f)));
+    return executeFuncWithTimeMetrics(MetricName.create.name(), f, () -> {
+      return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f)));
+    });
   }
 
   @Override
   public FSDataOutputStream create(Path f, Progressable progress) throws IOException {
-    this.metricsRegistry.increment(MetricName.create.name());
-    return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), progress));
+    return executeFuncWithTimeMetrics(MetricName.create.name(), f, () -> {
+      return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), progress));
+    });
   }
 
   @Override
   public FSDataOutputStream create(Path f, short replication) throws IOException {
-    this.metricsRegistry.increment(MetricName.create.name());
-    return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), replication));
+    return executeFuncWithTimeMetrics(MetricName.create.name(), f, () -> {
+      return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), replication));
+    });
   }
 
   @Override
   public FSDataOutputStream create(Path f, short replication, Progressable progress) throws IOException {
-    this.metricsRegistry.increment(MetricName.create.name());
-    return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), replication, progress));
+    return executeFuncWithTimeMetrics(MetricName.create.name(), f, () -> {
+      return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), replication, progress));
+    });
   }
 
   @Override
   public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize) throws IOException {
-    this.metricsRegistry.increment(MetricName.create.name());
-    return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), overwrite, bufferSize));
+    return executeFuncWithTimeMetrics(MetricName.create.name(), f, () -> {
+      return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), overwrite, bufferSize));
+    });
   }
 
   @Override
   public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, Progressable progress)
       throws IOException {
-    this.metricsRegistry.increment(MetricName.create.name());
-    return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), overwrite, bufferSize, progress));
+    return executeFuncWithTimeMetrics(MetricName.create.name(), f, () -> {
+      return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), overwrite, bufferSize, progress));
+    });
   }
 
   @Override
   public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize,
       Progressable progress) throws IOException {
-    this.metricsRegistry.increment(MetricName.create.name());
-    return wrapOutputStream(f,
-        fileSystem.create(convertToDefaultPath(f), overwrite, bufferSize, replication, blockSize, progress));
+    return executeFuncWithTimeMetrics(MetricName.create.name(), f, () -> {
+      return wrapOutputStream(f,
+          fileSystem.create(convertToDefaultPath(f), overwrite, bufferSize, replication, blockSize, progress));
+    });
   }
 
   @Override
   public FSDataOutputStream create(Path f, FsPermission permission, EnumSet<CreateFlag> flags, int bufferSize,
       short replication, long blockSize, Progressable progress) throws IOException {
-    this.metricsRegistry.increment(MetricName.create.name());
-    return wrapOutputStream(f,
-        fileSystem.create(convertToDefaultPath(f), permission, flags, bufferSize, replication, blockSize, progress));
+    return executeFuncWithTimeMetrics(MetricName.create.name(), f, () -> {
+      return wrapOutputStream(f,
+          fileSystem.create(convertToDefaultPath(f), permission, flags, bufferSize, replication, blockSize, progress));
+    });
   }
 
   @Override
   public FSDataOutputStream create(Path f, FsPermission permission, EnumSet<CreateFlag> flags, int bufferSize,
       short replication, long blockSize, Progressable progress, Options.ChecksumOpt checksumOpt) throws IOException {
-    this.metricsRegistry.increment(MetricName.create.name());
-    return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), permission, flags, bufferSize, replication,
-        blockSize, progress, checksumOpt));
+    return executeFuncWithTimeMetrics(MetricName.create.name(), f, () -> {
+      return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), permission, flags, bufferSize, replication,
+          blockSize, progress, checksumOpt));
+    });
   }
 
   @Override
   public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize)
       throws IOException {
-    this.metricsRegistry.increment(MetricName.create.name());
-    return wrapOutputStream(f,
-        fileSystem.create(convertToDefaultPath(f), overwrite, bufferSize, replication, blockSize));
+    return executeFuncWithTimeMetrics(MetricName.create.name(), f, () -> {
+      return wrapOutputStream(f,
+          fileSystem.create(convertToDefaultPath(f), overwrite, bufferSize, replication, blockSize));
+    });
   }
 
   @Override
@@ -246,50 +298,53 @@ public class HoodieWrapperFileSystem extends FileSystem {
 
   @Override
   public boolean rename(Path src, Path dst) throws IOException {
-    this.metricsRegistry.increment(MetricName.rename.name());
-    try {
-      consistencyGuard.waitTillFileAppears(convertToDefaultPath(src));
-    } catch (TimeoutException e) {
-      throw new HoodieException("Timed out waiting for " + src + " to appear", e);
-    }
-
-    boolean success = fileSystem.rename(convertToDefaultPath(src), convertToDefaultPath(dst));
-
-    if (success) {
+    return executeFuncWithTimeMetrics(MetricName.rename.name(), src, () -> {
       try {
-        consistencyGuard.waitTillFileAppears(convertToDefaultPath(dst));
+        consistencyGuard.waitTillFileAppears(convertToDefaultPath(src));
       } catch (TimeoutException e) {
-        throw new HoodieException("Timed out waiting for " + dst + " to appear", e);
+        throw new HoodieException("Timed out waiting for " + src + " to appear", e);
       }
 
-      try {
-        consistencyGuard.waitTillFileDisappears(convertToDefaultPath(src));
-      } catch (TimeoutException e) {
-        throw new HoodieException("Timed out waiting for " + src + " to disappear", e);
+      boolean success = fileSystem.rename(convertToDefaultPath(src), convertToDefaultPath(dst));
+
+      if (success) {
+        try {
+          consistencyGuard.waitTillFileAppears(convertToDefaultPath(dst));
+        } catch (TimeoutException e) {
+          throw new HoodieException("Timed out waiting for " + dst + " to appear", e);
+        }
+
+        try {
+          consistencyGuard.waitTillFileDisappears(convertToDefaultPath(src));
+        } catch (TimeoutException e) {
+          throw new HoodieException("Timed out waiting for " + src + " to disappear", e);
+        }
       }
-    }
-    return success;
+      return success;
+    });
   }
 
   @Override
   public boolean delete(Path f, boolean recursive) throws IOException {
-    this.metricsRegistry.increment(MetricName.delete.name());
-    boolean success = fileSystem.delete(convertToDefaultPath(f), recursive);
-
-    if (success) {
-      try {
-        consistencyGuard.waitTillFileDisappears(f);
-      } catch (TimeoutException e) {
-        throw new HoodieException("Timed out waiting for " + f + " to disappear", e);
+    return executeFuncWithTimeMetrics(MetricName.delete.name(), f, () -> {
+      boolean success = fileSystem.delete(convertToDefaultPath(f), recursive);
+
+      if (success) {
+        try {
+          consistencyGuard.waitTillFileDisappears(f);
+        } catch (TimeoutException e) {
+          throw new HoodieException("Timed out waiting for " + f + " to disappear", e);
+        }
       }
-    }
-    return success;
+      return success;
+    });
   }
 
   @Override
   public FileStatus[] listStatus(Path f) throws IOException {
-    this.metricsRegistry.increment(MetricName.listStatus.name());
-    return fileSystem.listStatus(convertToDefaultPath(f));
+    return executeFuncWithTimeMetrics(MetricName.listStatus.name(), f, () -> {
+      return fileSystem.listStatus(convertToDefaultPath(f));
+    });
   }
 
   @Override
@@ -304,27 +359,29 @@ public class HoodieWrapperFileSystem extends FileSystem {
 
   @Override
   public boolean mkdirs(Path f, FsPermission permission) throws IOException {
-    this.metricsRegistry.increment(MetricName.mkdirs.name());
-    boolean success = fileSystem.mkdirs(convertToDefaultPath(f), permission);
-    if (success) {
-      try {
-        consistencyGuard.waitTillFileAppears(convertToDefaultPath(f));
-      } catch (TimeoutException e) {
-        throw new HoodieException("Timed out waiting for directory " + f + " to appear", e);
+    return executeFuncWithTimeMetrics(MetricName.mkdirs.name(), f, () -> {
+      boolean success = fileSystem.mkdirs(convertToDefaultPath(f), permission);
+      if (success) {
+        try {
+          consistencyGuard.waitTillFileAppears(convertToDefaultPath(f));
+        } catch (TimeoutException e) {
+          throw new HoodieException("Timed out waiting for directory " + f + " to appear", e);
+        }
       }
-    }
-    return success;
+      return success;
+    });
   }
 
   @Override
   public FileStatus getFileStatus(Path f) throws IOException {
-    this.metricsRegistry.increment(MetricName.getFileStatus.name());
-    try {
-      consistencyGuard.waitTillFileAppears(convertToDefaultPath(f));
-    } catch (TimeoutException e) {
-      // pass
-    }
-    return fileSystem.getFileStatus(convertToDefaultPath(f));
+    return executeFuncWithTimeMetrics(MetricName.getFileStatus.name(), f, () -> {
+      try {
+        consistencyGuard.waitTillFileAppears(convertToDefaultPath(f));
+      } catch (TimeoutException e) {
+        // pass
+      }
+      return fileSystem.getFileStatus(convertToDefaultPath(f));
+    });
   }
 
   @Override
@@ -389,7 +446,7 @@ public class HoodieWrapperFileSystem extends FileSystem {
 
   @Override
   public FSDataInputStream open(Path f) throws IOException {
-    return fileSystem.open(convertToDefaultPath(f));
+    return wrapInputStream(f, fileSystem.open(convertToDefaultPath(f)));
   }
 
   @Override
@@ -462,8 +519,9 @@ public class HoodieWrapperFileSystem extends FileSystem {
 
   @Override
   public boolean delete(Path f) throws IOException {
-    this.metricsRegistry.increment(MetricName.delete.name());
-    return delete(f, true);
+    return executeFuncWithTimeMetrics(MetricName.delete.name(), f, () -> {
+      return delete(f, true);
+    });
   }
 
   @Override
@@ -508,32 +566,37 @@ public class HoodieWrapperFileSystem extends FileSystem {
 
   @Override
   public FileStatus[] listStatus(Path f, PathFilter filter) throws IOException {
-    this.metricsRegistry.increment(MetricName.listStatus.name());
-    return fileSystem.listStatus(convertToDefaultPath(f), filter);
+    return executeFuncWithTimeMetrics(MetricName.listStatus.name(), f, () -> {
+      return fileSystem.listStatus(convertToDefaultPath(f), filter);
+    });
   }
 
   @Override
   public FileStatus[] listStatus(Path[] files) throws IOException {
-    this.metricsRegistry.increment(MetricName.listStatus.name());
-    return fileSystem.listStatus(convertDefaults(files));
+    return executeFuncWithTimeMetrics(MetricName.listStatus.name(), files.length > 0 ? files[0] : null, () -> {
+      return fileSystem.listStatus(convertDefaults(files));
+    });
   }
 
   @Override
   public FileStatus[] listStatus(Path[] files, PathFilter filter) throws IOException {
-    this.metricsRegistry.increment(MetricName.listStatus.name());
-    return fileSystem.listStatus(convertDefaults(files), filter);
+    return executeFuncWithTimeMetrics(MetricName.listStatus.name(), files.length > 0 ? files[0] : null, () -> {
+      return fileSystem.listStatus(convertDefaults(files), filter);
+    });
   }
 
   @Override
   public FileStatus[] globStatus(Path pathPattern) throws IOException {
-    this.metricsRegistry.increment(MetricName.globStatus.name());
-    return fileSystem.globStatus(convertToDefaultPath(pathPattern));
+    return executeFuncWithTimeMetrics(MetricName.globStatus.name(), pathPattern, () -> {
+      return fileSystem.globStatus(convertToDefaultPath(pathPattern));
+    });
   }
 
   @Override
   public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException {
-    this.metricsRegistry.increment(MetricName.globStatus.name());
-    return fileSystem.globStatus(convertToDefaultPath(pathPattern), filter);
+    return executeFuncWithTimeMetrics(MetricName.globStatus.name(), pathPattern, () -> {
+      return fileSystem.globStatus(convertToDefaultPath(pathPattern), filter);
+    });
   }
 
   @Override
@@ -543,8 +606,9 @@ public class HoodieWrapperFileSystem extends FileSystem {
 
   @Override
   public RemoteIterator<LocatedFileStatus> listFiles(Path f, boolean recursive) throws IOException {
-    this.metricsRegistry.increment(MetricName.listFiles.name());
-    return fileSystem.listFiles(convertToDefaultPath(f), recursive);
+    return executeFuncWithTimeMetrics(MetricName.listFiles.name(), f, () -> {
+      return fileSystem.listFiles(convertToDefaultPath(f), recursive);
+    });
   }
 
   @Override
@@ -554,16 +618,17 @@ public class HoodieWrapperFileSystem extends FileSystem {
 
   @Override
   public boolean mkdirs(Path f) throws IOException {
-    this.metricsRegistry.increment(MetricName.mkdirs.name());
-    boolean success = fileSystem.mkdirs(convertToDefaultPath(f));
-    if (success) {
-      try {
-        consistencyGuard.waitTillFileAppears(convertToDefaultPath(f));
-      } catch (TimeoutException e) {
-        throw new HoodieException("Timed out waiting for directory " + f + " to appear", e);
+    return executeFuncWithTimeMetrics(MetricName.mkdirs.name(), f, () -> {
+      boolean success = fileSystem.mkdirs(convertToDefaultPath(f));
+      if (success) {
+        try {
+          consistencyGuard.waitTillFileAppears(convertToDefaultPath(f));
+        } catch (TimeoutException e) {
+          throw new HoodieException("Timed out waiting for directory " + f + " to appear", e);
+        }
       }
-    }
-    return success;
+      return success;
+    });
   }
 
   @Override
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataInputStream.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataInputStream.java
new file mode 100644
index 0000000..684a625
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataInputStream.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.fs;
+
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.ReadOption;
+import org.apache.hadoop.io.ByteBufferPool;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.EnumSet;
+
+/**
+ * Wrapper over <code>FSDataInputStream</code> to keep track of the size of the written bytes.
+ */
+public class SizeAwareFSDataInputStream extends FSDataInputStream {
+
+  // Path
+  private final Path path;
+  // Registry or read and write metrics
+  Registry metricsRegistry;
+
+  public SizeAwareFSDataInputStream(Path path, FSDataInputStream in) throws IOException {
+    super(in);
+    this.path = path;
+  }
+
+  @Override
+  public int read(ByteBuffer buf) throws IOException {
+    return HoodieWrapperFileSystem.executeFuncWithTimeAndByteMetrics(HoodieWrapperFileSystem.MetricName.read.name(),
+        path, 0, () -> {
+            return super.read(buf);
+      });
+  }
+
+  @Override
+  public int read(long position, byte[] buffer, int offset, int length) throws IOException {
+    return HoodieWrapperFileSystem.executeFuncWithTimeAndByteMetrics(HoodieWrapperFileSystem.MetricName.read.name(),
+        path, length, () -> {
+            return super.read(position, buffer, offset, length);
+      });
+  }
+
+  @Override
+  public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts)
+          throws IOException, UnsupportedOperationException {
+    return HoodieWrapperFileSystem.executeFuncWithTimeAndByteMetrics(HoodieWrapperFileSystem.MetricName.read.name(),
+        path, maxLength, () -> {
+          return super.read(bufferPool, maxLength, opts);
+      });
+  }
+
+  @Override
+  public void readFully(long position, byte[] buffer) throws IOException {
+    HoodieWrapperFileSystem.executeFuncWithTimeAndByteMetrics(HoodieWrapperFileSystem.MetricName.read.name(),
+        path, buffer.length, () -> {
+          super.readFully(position, buffer);
+          return null;
+        });
+  }
+
+  @Override
+  public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
+    HoodieWrapperFileSystem.executeFuncWithTimeAndByteMetrics(HoodieWrapperFileSystem.MetricName.read.name(),
+        path, length, () -> {
+          super.readFully(position, buffer, offset, length);
+          return null;
+        });
+  }
+
+  public void setMetricRegistry(Registry metricsRegistry) {
+    this.metricsRegistry = metricsRegistry;
+  }
+
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataOutputStream.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataOutputStream.java
index 0b70bed..e0607d0 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataOutputStream.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataOutputStream.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.common.fs;
 
+import org.apache.hudi.common.metrics.Registry;
 import org.apache.hudi.exception.HoodieException;
 
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -41,6 +42,8 @@ public class SizeAwareFSDataOutputStream extends FSDataOutputStream {
   private final Path path;
   // Consistency guard
   private final ConsistencyGuard consistencyGuard;
+  // Registry or read and write metrics
+  Registry metricsRegistry;
 
   public SizeAwareFSDataOutputStream(Path path, FSDataOutputStream out, ConsistencyGuard consistencyGuard,
       Runnable closeCallback) throws IOException {
@@ -52,14 +55,22 @@ public class SizeAwareFSDataOutputStream extends FSDataOutputStream {
 
   @Override
   public synchronized void write(byte[] b, int off, int len) throws IOException {
-    bytesWritten.addAndGet(len);
-    super.write(b, off, len);
+    HoodieWrapperFileSystem.executeFuncWithTimeAndByteMetrics(HoodieWrapperFileSystem.MetricName.write.name(), path,
+        len, () -> {
+            bytesWritten.addAndGet(len);
+            super.write(b, off, len);
+            return null;
+        });
   }
 
   @Override
   public void write(byte[] b) throws IOException {
-    bytesWritten.addAndGet(b.length);
-    super.write(b);
+    HoodieWrapperFileSystem.executeFuncWithTimeAndByteMetrics(HoodieWrapperFileSystem.MetricName.write.name(), path,
+        b.length, () -> {
+            bytesWritten.addAndGet(b.length);
+            super.write(b);
+            return null;
+        });
   }
 
   @Override
@@ -77,4 +88,7 @@ public class SizeAwareFSDataOutputStream extends FSDataOutputStream {
     return bytesWritten.get();
   }
 
+  public void setMetricRegistry(Registry metricsRegistry) {
+    this.metricsRegistry = metricsRegistry;
+  }
 }


[hudi] 04/14: [MINOR] Reduced number of fs calls to check for table existence. Removed dead code.

Posted by vi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch rfc-15
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 7bea565b860908af0d314e203ad3b20307a62164
Author: Prashant Wason <pw...@uber.com>
AuthorDate: Fri Oct 16 17:15:17 2020 -0700

    [MINOR] Reduced number of fs calls to check for table existence. Removed dead code.
---
 .../main/java/org/apache/hudi/common/fs/FSUtils.java    | 17 -----------------
 .../apache/hudi/exception/TableNotFoundException.java   | 14 ++++++++------
 2 files changed, 8 insertions(+), 23 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index c6725fc..c2338cf 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.common.fs;
 
-import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
@@ -565,22 +564,6 @@ public class FSUtils {
   }
 
   /**
-   * Get the FS implementation for this table.
-   * @param path  Path String
-   * @param hadoopConf  Serializable Hadoop Configuration
-   * @param consistencyGuardConfig Consistency Guard Config
-   * @return HoodieWrapperFileSystem
-   */
-  public static HoodieWrapperFileSystem getFs(String path, SerializableConfiguration hadoopConf,
-      ConsistencyGuardConfig consistencyGuardConfig) {
-    FileSystem fileSystem = FSUtils.getFs(path, hadoopConf.newCopy());
-    return new HoodieWrapperFileSystem(fileSystem,
-        consistencyGuardConfig.isConsistencyCheckEnabled()
-            ? new FailSafeConsistencyGuard(fileSystem, consistencyGuardConfig)
-            : new NoOpConsistencyGuard());
-  }
-
-  /**
    * Helper to filter out paths under metadata folder when running fs.globStatus.
    * @param fs  File System
    * @param globPath Glob Path
diff --git a/hudi-common/src/main/java/org/apache/hudi/exception/TableNotFoundException.java b/hudi-common/src/main/java/org/apache/hudi/exception/TableNotFoundException.java
index 7666e90..ad34a8e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/exception/TableNotFoundException.java
+++ b/hudi-common/src/main/java/org/apache/hudi/exception/TableNotFoundException.java
@@ -18,9 +18,11 @@
 
 package org.apache.hudi.exception;
 
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 
 /**
@@ -39,15 +41,15 @@ public class TableNotFoundException extends HoodieException {
   }
 
   public static void checkTableValidity(FileSystem fs, Path basePathDir, Path metaPathDir) {
-    // Check if the base path is found
+    // Check if the base and meta paths are found
     try {
-      if (!fs.exists(basePathDir) || !fs.isDirectory(basePathDir)) {
-        throw new TableNotFoundException(basePathDir.toString());
-      }
-      // Check if the meta path is found
-      if (!fs.exists(metaPathDir) || !fs.isDirectory(metaPathDir)) {
+      // Since metaPath is within the basePath, it is enough to check the metaPath exists
+      FileStatus status = fs.getFileStatus(metaPathDir);
+      if (!status.isDirectory()) {
         throw new TableNotFoundException(metaPathDir.toString());
       }
+    } catch (FileNotFoundException e) {
+      throw new TableNotFoundException(metaPathDir.toString());
     } catch (IllegalArgumentException e) {
       // if the base path is file:///, then we have a IllegalArgumentException
       throw new TableNotFoundException(metaPathDir.toString());


[hudi] 03/14: [RFC-15] Using distributed metrics registry in metadata code.

Posted by vi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch rfc-15
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit ce1c983e7a0b2fb34270a12d33a7c300433aabc3
Author: Prashant Wason <pw...@uber.com>
AuthorDate: Fri Oct 16 17:12:09 2020 -0700

    [RFC-15] Using distributed metrics registry in metadata code.
---
 .../apache/hudi/metadata/HoodieMetadataWriter.java | 36 +++++-----
 .../apache/hudi/metadata/TestHoodieMetadata.java   |  2 +-
 .../hudi/common/table/log/HoodieLogFileReader.java |  5 +-
 .../apache/hudi/metadata/HoodieMetadataReader.java | 79 +++++++++++++---------
 4 files changed, 70 insertions(+), 52 deletions(-)

diff --git a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java
index 295f15f..5491451 100644
--- a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java
+++ b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java
@@ -45,6 +45,7 @@ import org.apache.hudi.client.utils.ClientUtils;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.metrics.Registry;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFileFormat;
@@ -68,6 +69,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieMetadataException;
+import org.apache.hudi.metrics.DistributedRegistry;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -95,12 +97,6 @@ public class HoodieMetadataWriter extends HoodieMetadataReader implements Serial
   private static Map<String, HoodieMetadataWriter> instances = new HashMap<>();
 
   public static HoodieMetadataWriter instance(Configuration conf, HoodieWriteConfig writeConfig) {
-    try {
-      return new HoodieMetadataWriter(conf, writeConfig);
-    } catch (IOException e) {
-      throw new HoodieMetadataException("Could not initialize HoodieMetadataWriter", e);
-    }
-    /*
     return instances.computeIfAbsent(writeConfig.getBasePath(), k -> {
       try {
         return new HoodieMetadataWriter(conf, writeConfig);
@@ -108,12 +104,11 @@ public class HoodieMetadataWriter extends HoodieMetadataReader implements Serial
         throw new HoodieMetadataException("Could not initialize HoodieMetadataWriter", e);
       }
     });
-    */
   }
 
   HoodieMetadataWriter(Configuration hadoopConf, HoodieWriteConfig writeConfig) throws IOException {
     super(hadoopConf, writeConfig.getBasePath(), writeConfig.getSpillableMapBasePath(),
-        writeConfig.useFileListingMetadata(), writeConfig.getFileListingMetadataVerify());
+        writeConfig.useFileListingMetadata(), writeConfig.getFileListingMetadataVerify(), false);
 
     if (writeConfig.useFileListingMetadata()) {
       this.tableName = writeConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX;
@@ -126,6 +121,14 @@ public class HoodieMetadataWriter extends HoodieMetadataReader implements Serial
       // Metadata Table cannot have its metadata optimized
       ValidationUtils.checkArgument(this.config.shouldAutoCommit(), "Auto commit is required for Metadata Table");
       ValidationUtils.checkArgument(!this.config.useFileListingMetadata(), "File listing cannot be used for Metadata Table");
+
+      if (config.isMetricsOn()) {
+        if (config.isExecutorMetricsEnabled()) {
+          metricsRegistry = Registry.getRegistry("HoodieMetadata", DistributedRegistry.class.getName());
+        } else {
+          metricsRegistry = Registry.getRegistry("HoodieMetadata");
+        }
+      }
     } else {
       enabled = false;
     }
@@ -162,6 +165,7 @@ public class HoodieMetadataWriter extends HoodieMetadataReader implements Serial
     if (writeConfig.isMetricsOn()) {
       HoodieMetricsConfig.Builder metricsConfig = HoodieMetricsConfig.newBuilder()
           .withReporterType(writeConfig.getMetricsReporterType().toString())
+          .withExecutorMetrics(writeConfig.isExecutorMetricsEnabled())
           .on(true);
       switch (writeConfig.getMetricsReporterType()) {
         case GRAPHITE:
@@ -215,6 +219,10 @@ public class HoodieMetadataWriter extends HoodieMetadataReader implements Serial
    */
   public void initialize(JavaSparkContext jsc) {
     try {
+      if (metricsRegistry instanceof DistributedRegistry) {
+        ((DistributedRegistry) metricsRegistry).register(jsc);
+      }
+
       if (enabled) {
         initializeAndSync(jsc);
       }
@@ -261,12 +269,6 @@ public class HoodieMetadataWriter extends HoodieMetadataReader implements Serial
     } else {
       updateMetrics(INITIALIZE_STR, durationInMs);
     }
-
-    // Total size of the metadata and count of base/log files
-    Map<String, String> stats = getStats(false);
-    updateMetrics(Long.valueOf(stats.get(STAT_TOTAL_BASE_FILE_SIZE)),
-        Long.valueOf(stats.get(STAT_TOTAL_LOG_FILE_SIZE)), Integer.valueOf(stats.get(STAT_COUNT_BASE_FILES)),
-        Integer.valueOf(stats.get(STAT_COUNT_LOG_FILES)));
   }
 
   /**
@@ -287,7 +289,7 @@ public class HoodieMetadataWriter extends HoodieMetadataReader implements Serial
         HoodieFileFormat.HFILE.toString());
 
     // List all partitions in the basePath of the containing dataset
-    FileSystem fs = FSUtils.getFs(datasetBasePath, hadoopConf.get());
+    FileSystem fs = datasetMetaClient.getFs();
     List<String> partitions = FSUtils.getAllPartitionPaths(fs, datasetBasePath, false);
     LOG.info("Initializing metadata table by using file listings in " + partitions.size() + " partitions");
 
@@ -297,7 +299,7 @@ public class HoodieMetadataWriter extends HoodieMetadataReader implements Serial
     int parallelism =  Math.min(partitions.size(), jsc.defaultParallelism()) + 1; // +1 to prevent 0 parallelism
     JavaPairRDD<String, FileStatus[]> partitionFileListRDD = jsc.parallelize(partitions, parallelism)
         .mapToPair(partition -> {
-          FileSystem fsys = FSUtils.getFs(dbasePath, serializedConf.get());
+          FileSystem fsys = datasetMetaClient.getFs();
           FileStatus[] statuses = FSUtils.getAllDataFilesInPartition(fsys, new Path(dbasePath, partition));
           return new Tuple2<>(partition, statuses);
         });
@@ -473,7 +475,7 @@ public class HoodieMetadataWriter extends HoodieMetadataReader implements Serial
       return;
     }
 
-    HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+    HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline();
     long cnt = timeline.filterCompletedInstants().getInstants().filter(i -> i.getTimestamp().equals(instantTime)).count();
     if (cnt == 1) {
       LOG.info("Ignoring update from cleaner plan for already completed instant " + instantTime);
diff --git a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
index a901916..9395d5f 100644
--- a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
+++ b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
@@ -762,6 +762,6 @@ public class TestHoodieMetadata extends HoodieClientTestHarness {
         .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
         .withUseFileListingMetadata(useFileListingMetadata)
         .withMetricsConfig(HoodieMetricsConfig.newBuilder().withReporterType("CONSOLE").on(enableMetrics)
-                           .usePrefix("unit-test").build());
+                           .withExecutorMetrics(true).usePrefix("unit-test").build());
   }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
index 5d2e185..d7ef2d3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.common.table.log;
 
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.fs.SizeAwareFSDataInputStream;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
@@ -72,8 +73,8 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
       boolean readBlockLazily, boolean reverseReader) throws IOException {
     FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), bufferSize);
     if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
-      this.inputStream = new FSDataInputStream(
-          new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize));
+      this.inputStream = new SizeAwareFSDataInputStream(logFile.getPath(), new FSDataInputStream(
+          new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
     } else {
       // fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
       // need to wrap in another BufferedFSInputStream the make bufferSize work?
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataReader.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataReader.java
index 6ca0831..fed5516 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataReader.java
@@ -99,6 +99,7 @@ public class HoodieMetadataReader implements Serializable {
   public static final String VALIDATE_FILES_STR = "validate_files";
   public static final String VALIDATE_ERRORS_STR = "validate_errors";
   public static final String SCAN_STR = "scan";
+  public static final String BASEFILE_READ_STR = "basefile_read";
 
   // Stats names
   public static final String STAT_TOTAL_BASE_FILE_SIZE = "totalBaseFileSizeInBytes";
@@ -116,7 +117,7 @@ public class HoodieMetadataReader implements Serializable {
   protected final SerializableConfiguration hadoopConf;
   protected final String datasetBasePath;
   protected final String metadataBasePath;
-  protected transient Registry metricsRegistry;
+  protected Registry metricsRegistry;
   protected HoodieTableMetaClient metaClient;
   protected boolean enabled;
   private final boolean validateLookups;
@@ -138,6 +139,17 @@ public class HoodieMetadataReader implements Serializable {
    */
   public HoodieMetadataReader(Configuration conf, String datasetBasePath, String spillableMapDirectory,
                               boolean enabled, boolean validateLookups) {
+    this(conf, datasetBasePath, spillableMapDirectory, enabled, validateLookups, false);
+  }
+
+  /**
+   * Create a the Metadata Table in read-only mode.
+   *
+   * @param hadoopConf {@code Configuration}
+   * @param basePath The basePath for the dataset
+   */
+  public HoodieMetadataReader(Configuration conf, String datasetBasePath, String spillableMapDirectory,
+                              boolean enabled, boolean validateLookups, boolean enableMetrics) {
     this.hadoopConf = new SerializableConfiguration(conf);
     this.datasetBasePath = datasetBasePath;
     this.metadataBasePath = getMetadataTableBasePath(datasetBasePath);
@@ -158,6 +170,10 @@ public class HoodieMetadataReader implements Serializable {
       LOG.info("Metadata table is disabled.");
     }
 
+    if (enableMetrics) {
+      metricsRegistry = Registry.getRegistry("HoodieMetadata");
+    }
+
     this.enabled = enabled;
   }
 
@@ -334,10 +350,12 @@ public class HoodieMetadataReader implements Serializable {
     // Retrieve record from base file
     HoodieRecord<HoodieMetadataPayload> hoodieRecord = null;
     if (basefileReader != null) {
+      long t1 = System.currentTimeMillis();
       Option<GenericRecord> baseRecord = basefileReader.getRecordByKey(key);
       if (baseRecord.isPresent()) {
         hoodieRecord = SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) baseRecord.get(),
             metaClient.getTableConfig().getPayloadClass());
+        updateMetrics(BASEFILE_READ_STR, System.currentTimeMillis() - t1);
       }
     }
 
@@ -406,7 +424,7 @@ public class HoodieMetadataReader implements Serializable {
 
     // TODO: The below code may open the metadata to include incomplete instants on the dataset
     logRecordScanner =
-        new HoodieMetadataMergedLogRecordScanner(FSUtils.getFs(datasetBasePath, hadoopConf.get()), metadataBasePath,
+        new HoodieMetadataMergedLogRecordScanner(metaClient.getFs(), metadataBasePath,
             logFilePaths, schema, latestMetaInstantTimestamp, maxMemorySizeInBytes, bufferSize,
             spillableMapDirectory, null);
 
@@ -512,25 +530,29 @@ public class HoodieMetadataReader implements Serializable {
   }
 
   public Map<String, String> getStats(boolean detailed) throws IOException {
+    metaClient.reloadActiveTimeline();
+    HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline());
+    return getStats(fsView, detailed);
+  }
+
+  private Map<String, String> getStats(HoodieTableFileSystemView fsView, boolean detailed) throws IOException {
     Map<String, String> stats = new HashMap<>();
-    FileSystem fs = FSUtils.getFs(metadataBasePath, hadoopConf.get());
 
     // Total size of the metadata and count of base/log files
     long totalBaseFileSizeInBytes = 0;
     long totalLogFileSizeInBytes = 0;
     int baseFileCount = 0;
     int logFileCount = 0;
-    HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline());
     List<FileSlice> latestSlices = fsView.getLatestFileSlices(METADATA_PARTITION_NAME).collect(Collectors.toList());
 
     for (FileSlice slice : latestSlices) {
       if (slice.getBaseFile().isPresent()) {
-        totalBaseFileSizeInBytes += fs.getFileStatus(new Path(slice.getBaseFile().get().getPath())).getLen();
+        totalBaseFileSizeInBytes += slice.getBaseFile().get().getFileStatus().getLen();
         ++baseFileCount;
       }
       Iterator<HoodieLogFile> it = slice.getLogFiles().iterator();
       while (it.hasNext()) {
-        totalLogFileSizeInBytes += fs.getFileStatus(it.next().getPath()).getLen();
+        totalLogFileSizeInBytes += it.next().getFileStatus().getLen();
         ++logFileCount;
       }
     }
@@ -550,40 +572,33 @@ public class HoodieMetadataReader implements Serializable {
   }
 
   protected void updateMetrics(String action, long durationInMs) {
-    String countKey = action + ".count";
-    String durationKey = action + ".duration";
-    Registry registry = getMetricsRegistry();
-
-    // Update average for duration and total for count
-    long existingCount = registry.getAllCounts().getOrDefault(countKey, 0L);
-    long existingDuration = registry.getAllCounts().getOrDefault(durationKey, 0L);
-    long avgDuration = (long)Math.ceil((existingDuration * existingCount + durationInMs) / (existingCount + 1));
+    if (metricsRegistry == null) {
+      return;
+    }
 
-    registry.add(countKey, 1);
-    registry.add(durationKey, avgDuration - existingDuration);
+    // Update sum of duration and total for count
+    String countKey = action + ".count";
+    String durationKey = action + ".totalDuration";
+    metricsRegistry.add(countKey, 1);
+    metricsRegistry.add(durationKey, durationInMs);
 
-    LOG.info(String.format("Updating metadata metrics (%s=%dms, %s=%d)", durationKey, avgDuration, countKey,
-        existingCount + 1));
+    LOG.info(String.format("Updating metadata metrics (%s=%dms, %s=1)", durationKey, durationInMs, countKey));
   }
 
   protected void updateMetrics(long totalBaseFileSizeInBytes, long totalLogFileSizeInBytes, int baseFileCount,
-                            int logFileCount) {
-    LOG.info(String.format("Updating metadata size metrics (basefile.size=%d, logfile.size=%d, basefile.count=%d, "
-        + "logfile.count=%d)", totalBaseFileSizeInBytes, totalLogFileSizeInBytes, baseFileCount, logFileCount));
-
-    Registry registry = getMetricsRegistry();
-    registry.add("basefile.size", totalBaseFileSizeInBytes);
-    registry.add("logfile.size", totalLogFileSizeInBytes);
-    registry.add("basefile.count", baseFileCount);
-    registry.add("logfile.count", logFileCount);
-  }
-
-  private Registry getMetricsRegistry() {
+                               int logFileCount) {
     if (metricsRegistry == null) {
-      metricsRegistry = Registry.getRegistry("HoodieMetadata");
+      return;
     }
 
-    return metricsRegistry;
+    // Update sizes and count for metadata table's data files
+    metricsRegistry.add("basefile.size", totalBaseFileSizeInBytes);
+    metricsRegistry.add("logfile.size", totalLogFileSizeInBytes);
+    metricsRegistry.add("basefile.count", baseFileCount);
+    metricsRegistry.add("logfile.count", logFileCount);
+
+    LOG.info(String.format("Updating metadata size metrics (basefile.size=%d, logfile.size=%d, basefile.count=%d, "
+        + "logfile.count=%d)", totalBaseFileSizeInBytes, totalLogFileSizeInBytes, baseFileCount, logFileCount));
   }
 
   /**


[hudi] 13/14: [RFC-15] Fixing getFileSystemView() which requires getCompletedCommitsTimeline().

Posted by vi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch rfc-15
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 4b7a4b3f867a6ddfe7e290d9a68e073388872a2e
Author: Prashant Wason <pw...@uber.com>
AuthorDate: Wed Oct 28 02:33:34 2020 -0700

    [RFC-15] Fixing getFileSystemView() which requires getCompletedCommitsTimeline().
---
 hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
index 38988a4..286e6db 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -49,6 +49,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 import org.apache.hudi.common.table.view.FileSystemViewManager;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.table.view.SyncableFileSystemView;
 import org.apache.hudi.common.table.view.TableFileSystemView;
 import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
@@ -238,7 +239,11 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
    * Get the view of the file system for this table.
    */
   public TableFileSystemView getFileSystemView() {
-    return getFileSystemViewInternal(getCompletedCommitsTimeline());
+    if (config.useFileListingMetadata()) {
+      return getFileSystemViewInternal(getCompletedCommitsTimeline());
+    } else {
+      return new HoodieTableFileSystemView(metaClient, getCompletedCommitsTimeline());
+    }
   }
 
   /**


[hudi] 06/14: [HUDI-1321] Created HoodieMetadataConfig to specify configuration for the metadata table.

Posted by vi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch rfc-15
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit b16586d503c34b79d80f5b806b401de189f6db3f
Author: Prashant Wason <pw...@uber.com>
AuthorDate: Tue Oct 20 01:47:49 2020 -0700

    [HUDI-1321] Created HoodieMetadataConfig to specify configuration for the metadata table.
    
    This is safer than full-fledged properties for the metadata table (like HoodieWriteConfig) as it makes burdensome to tune the metadata. With limited configuration, we can control the performance of the metadata table closely.
---
 .../apache/hudi/config/HoodieMetadataConfig.java   | 152 +++++++++++++++++++++
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  79 ++++-------
 .../apache/hudi/metadata/HoodieMetadataWriter.java |  37 ++++-
 .../apache/hudi/metadata/TestHoodieMetadata.java   |  31 +++--
 4 files changed, 228 insertions(+), 71 deletions(-)

diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetadataConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetadataConfig.java
new file mode 100644
index 0000000..ca9c723
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetadataConfig.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.DefaultHoodieConfig;
+import org.apache.hudi.config.HoodieCompactionConfig.Builder;
+
+import javax.annotation.concurrent.Immutable;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Configurations used by the HUDI Metadata Table.
+ */
+@Immutable
+public class HoodieMetadataConfig extends DefaultHoodieConfig {
+
+  public static final String METADATA_PREFIX = "hoodie.metadata";
+
+  // Enable the internal Metadata Table which saves file listings
+  public static final String METADATA_ENABLE = METADATA_PREFIX + ".enable";
+  public static final boolean DEFAULT_METADATA_ENABLE = false;
+
+  // Validate contents of Metadata Table on each access against the actual filesystem
+  public static final String METADATA_VALIDATE = METADATA_PREFIX + ".validate";
+  public static final boolean DEFAULT_METADATA_VALIDATE = false;
+
+  // Parallelism for inserts
+  public static final String INSERT_PARALLELISM = METADATA_PREFIX + ".insert.parallelism";
+  public static final int DEFAULT_INSERT_PARALLELISM = 1;
+
+  // Async clean
+  public static final String ASYNC_CLEAN = METADATA_PREFIX + ".clean.async";
+  public static final boolean DEFAULT_ASYNC_CLEAN = false;
+
+  // Maximum delta commits before compaction occurs
+  public static final String COMPACT_NUM_DELTA_COMMITS = METADATA_PREFIX + ".compact.max.delta.commits";
+  public static final int DEFAULT_COMPACT_NUM_DELTA_COMMITS = 24;
+
+  // Archival settings
+  public static final String MIN_COMMITS_TO_KEEP = METADATA_PREFIX + ".keep.min.commits";
+  public static final int DEFAULT_MIN_COMMITS_TO_KEEP = 20;
+  public static final String MAX_COMMITS_TO_KEEP = METADATA_PREFIX + ".keep.max.commits";
+  public static final int DEFAULT_MAX_COMMITS_TO_KEEP = 30;
+
+  // Cleaner commits retained
+  public static final String CLEANER_COMMITS_RETAINED = METADATA_PREFIX + ".cleaner.commits.retained";
+  public static final int DEFAULT_CLEANER_COMMITS_RETAINED = 3;
+
+  private HoodieMetadataConfig(Properties props) {
+    super(props);
+  }
+
+  public static HoodieMetadataConfig.Builder newBuilder() {
+    return new Builder();
+  }
+
+  public static class Builder {
+
+    private final Properties props = new Properties();
+
+    public Builder fromFile(File propertiesFile) throws IOException {
+      try (FileReader reader = new FileReader(propertiesFile)) {
+        this.props.load(reader);
+        return this;
+      }
+    }
+
+    public Builder fromProperties(Properties props) {
+      this.props.putAll(props);
+      return this;
+    }
+
+    public Builder enable(boolean enable) {
+      props.setProperty(METADATA_ENABLE, String.valueOf(enable));
+      return this;
+    }
+
+    public Builder validate(boolean validate) {
+      props.setProperty(METADATA_VALIDATE, String.valueOf(validate));
+      return this;
+    }
+
+    public Builder withInsertParallelism(int parallelism) {
+      props.setProperty(INSERT_PARALLELISM, String.valueOf(parallelism));
+      return this;
+    }
+
+    public Builder withAsyncClean(boolean asyncClean) {
+      props.setProperty(ASYNC_CLEAN, String.valueOf(asyncClean));
+      return this;
+    }
+
+    public Builder withMaxNumDeltaCommitsBeforeCompaction(int maxNumDeltaCommitsBeforeCompaction) {
+      props.setProperty(COMPACT_NUM_DELTA_COMMITS, String.valueOf(maxNumDeltaCommitsBeforeCompaction));
+      return this;
+    }
+
+    public Builder archiveCommitsWith(int minToKeep, int maxToKeep) {
+      props.setProperty(MIN_COMMITS_TO_KEEP, String.valueOf(minToKeep));
+      props.setProperty(MAX_COMMITS_TO_KEEP, String.valueOf(maxToKeep));
+      return this;
+    }
+
+    public Builder retainCommits(int commitsRetained) {
+      props.setProperty(CLEANER_COMMITS_RETAINED, String.valueOf(commitsRetained));
+      return this;
+    }
+
+    public HoodieMetadataConfig build() {
+      HoodieMetadataConfig config = new HoodieMetadataConfig(props);
+      setDefaultOnCondition(props, !props.containsKey(METADATA_ENABLE), METADATA_ENABLE,
+          String.valueOf(DEFAULT_METADATA_ENABLE));
+      setDefaultOnCondition(props, !props.containsKey(METADATA_VALIDATE), METADATA_VALIDATE,
+          String.valueOf(DEFAULT_METADATA_VALIDATE));
+      setDefaultOnCondition(props, !props.containsKey(INSERT_PARALLELISM), INSERT_PARALLELISM,
+          String.valueOf(DEFAULT_INSERT_PARALLELISM));
+      setDefaultOnCondition(props, !props.containsKey(ASYNC_CLEAN), ASYNC_CLEAN,
+          String.valueOf(DEFAULT_ASYNC_CLEAN));
+      setDefaultOnCondition(props, !props.containsKey(COMPACT_NUM_DELTA_COMMITS),
+          COMPACT_NUM_DELTA_COMMITS, String.valueOf(DEFAULT_COMPACT_NUM_DELTA_COMMITS));
+      setDefaultOnCondition(props, !props.containsKey(CLEANER_COMMITS_RETAINED), CLEANER_COMMITS_RETAINED,
+          String.valueOf(DEFAULT_CLEANER_COMMITS_RETAINED));
+      setDefaultOnCondition(props, !props.containsKey(MAX_COMMITS_TO_KEEP), MAX_COMMITS_TO_KEEP,
+          String.valueOf(DEFAULT_MAX_COMMITS_TO_KEEP));
+      setDefaultOnCondition(props, !props.containsKey(MIN_COMMITS_TO_KEEP), MIN_COMMITS_TO_KEEP,
+          String.valueOf(DEFAULT_MIN_COMMITS_TO_KEEP));
+
+      return config;
+    }
+  }
+
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 82199ff..d8d732f 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -118,17 +118,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
   public static final String MAX_CONSISTENCY_CHECKS_PROP = "hoodie.consistency.check.max_checks";
   public static int DEFAULT_MAX_CONSISTENCY_CHECKS = 7;
 
-  // Enable the internal Metadata Table which saves file listings
-  private static final String USE_FILE_LISTING_METADATA = "hoodie.metadata.file.listings.enable";
-  private static final String DEFAULT_USE_FILE_LISTING_METADATA = "false";
-
-  // Validate contents of Metadata Table on each access against the actual filesystem
-  private static final String FILE_LISTING_METADATA_VERIFY = "hoodie.metadata.file.listings.verify";
-  private static final String DEFAULT_FILE_LISTING_METADATA_VERIFY = "false";
-
-  // Serialized compaction config to be used for Metadata Table
-  public static final String HOODIE_METADATA_COMPACTION_CONFIG = "hoodie.metadata.compaction.config";
-
   /**
    * HUDI-858 : There are users who had been directly using RDD APIs and have relied on a behavior in 0.4.x to allow
    * multiple write operations (upsert/buk-insert/...) to be executed within a single commit.
@@ -776,30 +765,35 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
    * File listing metadata configs.
    */
   public boolean useFileListingMetadata() {
-    return Boolean.parseBoolean(props.getProperty(USE_FILE_LISTING_METADATA));
+    return Boolean.parseBoolean(props.getProperty(HoodieMetadataConfig.METADATA_ENABLE));
   }
 
   public boolean getFileListingMetadataVerify() {
-    return Boolean.parseBoolean(props.getProperty(FILE_LISTING_METADATA_VERIFY));
+    return Boolean.parseBoolean(props.getProperty(HoodieMetadataConfig.METADATA_VALIDATE));
   }
 
-  public HoodieCompactionConfig getMetadataCompactionConfig() throws IOException {
-    String serializedCompactionConfig = props.getProperty(HOODIE_METADATA_COMPACTION_CONFIG);
-    if (serializedCompactionConfig != null) {
-      StringReader reader = new StringReader(serializedCompactionConfig);
-      Properties loadedProps = new Properties();
-      loadedProps.load(reader);
-      return HoodieCompactionConfig.newBuilder().fromProperties(loadedProps).build();
-    }
+  public int getMetadataInsertParallelism() {
+    return Integer.parseInt(props.getProperty(HoodieMetadataConfig.INSERT_PARALLELISM));
+  }
+
+  public int getMetadataCompactDeltaCommitMax() {
+    return Integer.parseInt(props.getProperty(HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS));
+  }
+
+  public boolean isMetadataAsyncClean() {
+    return Boolean.parseBoolean(props.getProperty(HoodieMetadataConfig.ASYNC_CLEAN));
+  }
+
+  public int getMetadataMaxCommitsToKeep() {
+    return Integer.parseInt(props.getProperty(HoodieMetadataConfig.MAX_COMMITS_TO_KEEP));
+  }
+
+  public int getMetadataMinCommitsToKeep() {
+    return Integer.parseInt(props.getProperty(HoodieMetadataConfig.MIN_COMMITS_TO_KEEP));
+  }
 
-    // Default config for compacting metadata tables
-    return HoodieCompactionConfig.newBuilder()
-        .withAutoClean(true)
-        .withInlineCompaction(true)
-        .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
-        .archiveCommitsWith(24, 30)
-        .withMaxNumDeltaCommitsBeforeCompaction(24)
-        .build();
+  public int getMetadataCleanerCommitsRetained() {
+    return Integer.parseInt(props.getProperty(HoodieMetadataConfig.CLEANER_COMMITS_RETAINED));
   }
 
   public static class Builder {
@@ -814,6 +808,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
     private boolean isViewConfigSet = false;
     private boolean isConsistencyGuardSet = false;
     private boolean isCallbackConfigSet = false;
+    private boolean isMetadataConfigSet = false;
 
     public Builder fromFile(File propertiesFile) throws IOException {
       try (FileReader reader = new FileReader(propertiesFile)) {
@@ -931,12 +926,9 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
       return this;
     }
 
-    public Builder withMetadataCompactionConfig(HoodieCompactionConfig compactionConfig) throws IOException {
-      // Since the property names from HoodieCompactionConfig are already used in withCompactionConfig,
-      // metadata compaction config can only be saved serialized.
-      StringWriter writer = new StringWriter();
-      compactionConfig.getProps().store(writer, "metadata compaction config");
-      props.setProperty(HOODIE_METADATA_COMPACTION_CONFIG, writer.toString());
+    public Builder withMetadataConfig(HoodieMetadataConfig metadataConfig) {
+      props.putAll(metadataConfig.getProps());
+      isMetadataConfigSet = true;
       return this;
     }
 
@@ -1026,16 +1018,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
       return this;
     }
 
-    public Builder withUseFileListingMetadata(boolean enable) {
-      props.setProperty(USE_FILE_LISTING_METADATA, String.valueOf(enable));
-      return this;
-    }
-
-    public Builder withFileListingMetadataVerify(boolean enable) {
-      props.setProperty(FILE_LISTING_METADATA_VERIFY, String.valueOf(enable));
-      return this;
-    }
-
     protected void setDefaults() {
       // Check for mandatory properties
       setDefaultOnCondition(props, !props.containsKey(INSERT_PARALLELISM), INSERT_PARALLELISM, DEFAULT_PARALLELISM);
@@ -1081,11 +1063,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
       setDefaultOnCondition(props, !props.containsKey(BULKINSERT_SORT_MODE),
           BULKINSERT_SORT_MODE, DEFAULT_BULKINSERT_SORT_MODE);
 
-      setDefaultOnCondition(props, !props.containsKey(USE_FILE_LISTING_METADATA), USE_FILE_LISTING_METADATA,
-          DEFAULT_USE_FILE_LISTING_METADATA);
-      setDefaultOnCondition(props, !props.containsKey(FILE_LISTING_METADATA_VERIFY), FILE_LISTING_METADATA_VERIFY,
-          DEFAULT_FILE_LISTING_METADATA_VERIFY);
-
       // Make sure the props is propagated
       setDefaultOnCondition(props, !isIndexConfigSet, HoodieIndexConfig.newBuilder().fromProperties(props).build());
       setDefaultOnCondition(props, !isStorageConfigSet, HoodieStorageConfig.newBuilder().fromProperties(props).build());
@@ -1101,6 +1078,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
           ConsistencyGuardConfig.newBuilder().fromProperties(props).build());
       setDefaultOnCondition(props, !isCallbackConfigSet,
           HoodieWriteCommitCallbackConfig.newBuilder().fromProperties(props).build());
+      setDefaultOnCondition(props, !isMetadataConfigSet,
+          HoodieMetadataConfig.newBuilder().fromProperties(props).build());
 
       setDefaultOnCondition(props, !props.containsKey(EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION),
           EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION, DEFAULT_EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION);
diff --git a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java
index 5491451..936d294 100644
--- a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java
+++ b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java
@@ -47,6 +47,7 @@ import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.metrics.Registry;
 import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieLogFile;
@@ -59,11 +60,15 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineLayout;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
 import org.apache.hudi.common.util.CleanerUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieMetadataConfig;
 import org.apache.hudi.config.HoodieMetricsConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
@@ -97,7 +102,15 @@ public class HoodieMetadataWriter extends HoodieMetadataReader implements Serial
   private static Map<String, HoodieMetadataWriter> instances = new HashMap<>();
 
   public static HoodieMetadataWriter instance(Configuration conf, HoodieWriteConfig writeConfig) {
-    return instances.computeIfAbsent(writeConfig.getBasePath(), k -> {
+    String key = writeConfig.getBasePath();
+    if (instances.containsKey(key)) {
+      if (instances.get(key).enabled() != writeConfig.useFileListingMetadata()) {
+        // Enabled state has changed. Remove so it is recreated.
+        instances.remove(key);
+      }
+    }
+
+    return instances.computeIfAbsent(key, k -> {
       try {
         return new HoodieMetadataWriter(conf, writeConfig);
       } catch (IOException e) {
@@ -141,17 +154,18 @@ public class HoodieMetadataWriter extends HoodieMetadataReader implements Serial
    * @param schemaStr Metadata Table schema
    */
   private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig writeConfig) throws IOException {
+    int parallelism = writeConfig.getMetadataInsertParallelism();
+
     // Create the write config for the metadata table by borrowing options from the main write config.
     HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder()
-        .withTimelineLayoutVersion(writeConfig.getTimelineLayoutVersion())
+        .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
         .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder()
             .withConsistencyCheckEnabled(writeConfig.getConsistencyGuardConfig().isConsistencyCheckEnabled())
             .withInitialConsistencyCheckIntervalMs(writeConfig.getConsistencyGuardConfig().getInitialConsistencyCheckIntervalMs())
             .withMaxConsistencyCheckIntervalMs(writeConfig.getConsistencyGuardConfig().getMaxConsistencyCheckIntervalMs())
             .withMaxConsistencyChecks(writeConfig.getConsistencyGuardConfig().getMaxConsistencyChecks())
             .build())
-        .withUseFileListingMetadata(false)
-        .withFileListingMetadataVerify(false)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
         .withAutoCommit(true)
         .withAvroSchemaValidate(true)
         .withEmbeddedTimelineServerEnabled(false)
@@ -159,8 +173,19 @@ public class HoodieMetadataWriter extends HoodieMetadataReader implements Serial
         .withPath(metadataBasePath)
         .withSchema(HoodieMetadataRecord.getClassSchema().toString())
         .forTable(tableName)
-        .withParallelism(1, 1).withDeleteParallelism(1).withRollbackParallelism(1).withFinalizeWriteParallelism(1)
-        .withCompactionConfig(writeConfig.getMetadataCompactionConfig());
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withAsyncClean(writeConfig.isMetadataAsyncClean())
+            .withAutoClean(true)
+            .withCleanerParallelism(parallelism)
+            .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
+            .retainCommits(writeConfig.getMetadataCleanerCommitsRetained())
+            .archiveCommitsWith(writeConfig.getMetadataMinCommitsToKeep(), writeConfig.getMetadataMaxCommitsToKeep())
+            .withInlineCompaction(true)
+            .withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax()).build())
+        .withParallelism(parallelism, parallelism)
+        .withDeleteParallelism(parallelism)
+        .withRollbackParallelism(parallelism)
+        .withFinalizeWriteParallelism(parallelism);
 
     if (writeConfig.isMetricsOn()) {
       HoodieMetricsConfig.Builder metricsConfig = HoodieMetricsConfig.newBuilder()
diff --git a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
index 751e2ab..0c5839b 100644
--- a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
+++ b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
@@ -56,6 +56,7 @@ import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieMetadataConfig;
 import org.apache.hudi.config.HoodieMetricsConfig;
 import org.apache.hudi.config.HoodieStorageConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -461,11 +462,9 @@ public class TestHoodieMetadata extends HoodieClientTestHarness {
     // Test autoClean and asyncClean based on this flag which is randomly chosen.
     boolean asyncClean = new Random().nextBoolean();
     HoodieWriteConfig config = getWriteConfigBuilder(true, true, false)
-        .withMetadataCompactionConfig(HoodieCompactionConfig.newBuilder()
-            .archiveCommitsWith(2, 4).retainCommits(1).retainFileVersions(1).withAutoClean(true)
-            .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS)
-            .withInlineCompaction(true).withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsBeforeCompaction)
-            .build())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true)
+            .archiveCommitsWith(2, 4).retainCommits(1)
+            .withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsBeforeCompaction).build())
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 3)
             .retainCommits(1).retainFileVersions(1).withAutoClean(true).withAsyncClean(asyncClean).build())
         .build();
@@ -594,8 +593,7 @@ public class TestHoodieMetadata extends HoodieClientTestHarness {
   public void testMetadataMetrics() throws Exception {
     init();
 
-    try (HoodieWriteClient client = new HoodieWriteClient<>(jsc,
-        getWriteConfigBuilder(true, true, true).withFileListingMetadataVerify(true).build())) {
+    try (HoodieWriteClient client = new HoodieWriteClient<>(jsc, getWriteConfigBuilder(true, true, true).build())) {
       // Write
       String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
       List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
@@ -606,8 +604,12 @@ public class TestHoodieMetadata extends HoodieClientTestHarness {
 
       Registry metricsRegistry = Registry.getRegistry("HoodieMetadata");
       assertTrue(metricsRegistry.getAllCounts().containsKey(HoodieMetadataWriter.INITIALIZE_STR + ".count"));
-      assertTrue(metricsRegistry.getAllCounts().containsKey(HoodieMetadataWriter.INITIALIZE_STR + ".duration"));
+      assertTrue(metricsRegistry.getAllCounts().containsKey(HoodieMetadataWriter.INITIALIZE_STR + ".totalDuration"));
       assertEquals(metricsRegistry.getAllCounts().get(HoodieMetadataWriter.INITIALIZE_STR + ".count"), 1L);
+      assertTrue(metricsRegistry.getAllCounts().containsKey("basefile.size"));
+      assertTrue(metricsRegistry.getAllCounts().containsKey("logfile.size"));
+      assertTrue(metricsRegistry.getAllCounts().containsKey("basefile.count"));
+      assertTrue(metricsRegistry.getAllCounts().containsKey("logfile.count"));
     }
   }
 
@@ -617,11 +619,14 @@ public class TestHoodieMetadata extends HoodieClientTestHarness {
    * @throws IOException
    */
   private void validateMetadata(HoodieWriteClient client) throws IOException {
-    long t1 = System.currentTimeMillis();
-
     HoodieWriteConfig config = client.getConfig();
     HoodieMetadataWriter metadata = metadata(client);
     assertFalse(metadata == null, "MetadataWriter should have been initialized");
+    if (!config.useFileListingMetadata()) {
+      return;
+    }
+
+    long t1 = System.currentTimeMillis();
 
     // Validate write config for metadata table
     HoodieWriteConfig metadataWriteConfig = metadata.getWriteConfig();
@@ -742,10 +747,6 @@ public class TestHoodieMetadata extends HoodieClientTestHarness {
     }
   }
 
-  private HoodieWriteConfig getWriteConfig() {
-    return getWriteConfig(true, true);
-  }
-
   private HoodieWriteConfig getWriteConfig(boolean autoCommit, boolean useFileListingMetadata) {
     return getWriteConfigBuilder(autoCommit, useFileListingMetadata, false).build();
   }
@@ -763,7 +764,7 @@ public class TestHoodieMetadata extends HoodieClientTestHarness {
         .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
             .withEnableBackupForRemoteFileSystemView(false).build())
         .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
-        .withUseFileListingMetadata(useFileListingMetadata)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(useFileListingMetadata).build())
         .withMetricsConfig(HoodieMetricsConfig.newBuilder().withReporterType("CONSOLE").on(enableMetrics)
                            .withExecutorMetrics(true).usePrefix("unit-test").build());
   }


[hudi] 14/14: [RFC-15] Fixing code review comments

Posted by vi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch rfc-15
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit d305948cac63e2a88a8d2df5bdf64ffd6fc49f5a
Author: Vinoth Chandar <vi...@apache.org>
AuthorDate: Thu Oct 29 18:33:56 2020 -0700

    [RFC-15] Fixing code review comments
---
 .../apache/hudi/cli/commands/MetadataCommand.java  | 11 ++--
 .../apache/hudi/client/AbstractHoodieClient.java   |  5 +-
 .../org/apache/hudi/client/HoodieWriteClient.java  |  2 +-
 .../apache/hudi/config/HoodieMetadataConfig.java   | 61 +++++++++++-----------
 .../org/apache/hudi/config/HoodieWriteConfig.java  | 16 +++---
 .../apache/hudi/metadata/HoodieMetadataWriter.java |  3 +-
 .../java/org/apache/hudi/table/HoodieTable.java    |  2 +-
 .../apache/hudi/metadata/TestHoodieMetadata.java   | 16 +++---
 .../hudi/common/fs/HoodieWrapperFileSystem.java    | 29 +++++-----
 ...nputStream.java => TimedFSDataInputStream.java} | 18 +++----
 .../apache/hudi/common/metrics/LocalRegistry.java  |  4 +-
 .../org/apache/hudi/common/metrics/Registry.java   | 27 +++++-----
 .../hudi/common/table/log/HoodieLogFileReader.java |  4 +-
 .../apache/hudi/metadata/HoodieMetadataReader.java | 36 ++++++-------
 14 files changed, 113 insertions(+), 121 deletions(-)

diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
index 4ecc6a9..a45e9b4 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hudi.cli.HoodieCLI;
 import org.apache.hudi.cli.utils.SparkUtil;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.config.HoodieMetadataConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.metadata.HoodieMetadataReader;
@@ -71,13 +72,11 @@ public class MetadataCommand implements CommandMarker {
       HoodieCLI.fs.mkdirs(metadataPath);
     }
 
-    long t1 = System.currentTimeMillis();
+    HoodieTimer timer = new HoodieTimer().startTimer();
     HoodieWriteConfig writeConfig = getWriteConfig();
     initJavaSparkContext();
-    HoodieMetadataWriter.instance(HoodieCLI.conf, writeConfig).initialize(jsc);
-    long t2 = System.currentTimeMillis();
-
-    return String.format("Created Metadata Table in %s (duration=%.2fsec)", metadataPath, (t2 - t1) / 1000.0);
+    HoodieMetadataWriter.create(HoodieCLI.conf, writeConfig).initialize(jsc);
+    return String.format("Created Metadata Table in %s (duration=%.2f secs)", metadataPath, timer.endTimer() / 1000.0);
   }
 
   @CliCommand(value = "metadata delete", help = "Remove the Metadata Table")
@@ -115,7 +114,7 @@ public class MetadataCommand implements CommandMarker {
     } else {
       HoodieWriteConfig writeConfig = getWriteConfig();
       initJavaSparkContext();
-      HoodieMetadataWriter.instance(HoodieCLI.conf, writeConfig).initialize(jsc);
+      HoodieMetadataWriter.create(HoodieCLI.conf, writeConfig).initialize(jsc);
     }
     long t2 = System.currentTimeMillis();
 
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java
index fd02b6d..f1abe7c 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java
@@ -72,6 +72,7 @@ public abstract class AbstractHoodieClient implements Serializable, AutoCloseabl
     this.timelineServer = timelineServer;
     shouldStopTimelineServer = !timelineServer.isPresent();
     startEmbeddedServerView();
+    initWrapperFSMetrics();
   }
 
   /**
@@ -123,7 +124,7 @@ public abstract class AbstractHoodieClient implements Serializable, AutoCloseabl
     return config;
   }
 
-  protected HoodieTableMetaClient createMetaClient(boolean loadActiveTimelineOnLoad) {
+  private void initWrapperFSMetrics() {
     if (config.isMetricsOn()) {
       Registry registry;
       Registry registryMeta;
@@ -143,7 +144,9 @@ public abstract class AbstractHoodieClient implements Serializable, AutoCloseabl
 
       HoodieWrapperFileSystem.setMetricsRegistry(registry, registryMeta);
     }
+  }
 
+  protected HoodieTableMetaClient createMetaClient(boolean loadActiveTimelineOnLoad) {
     return ClientUtils.createMetaClient(hadoopConf, config, loadActiveTimelineOnLoad);
   }
 }
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
index 614f0dc..c180a88 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
@@ -124,7 +124,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
     this.rollbackPending = rollbackPending;
 
     // Initialize Metadata Table
-    HoodieMetadataWriter.instance(hadoopConf, writeConfig).initialize(jsc);
+    HoodieMetadataWriter.create(hadoopConf, writeConfig).initialize(jsc);
   }
 
   /**
diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetadataConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetadataConfig.java
index ca9c723..53142b1 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetadataConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetadataConfig.java
@@ -19,7 +19,6 @@
 package org.apache.hudi.config;
 
 import org.apache.hudi.common.config.DefaultHoodieConfig;
-import org.apache.hudi.config.HoodieCompactionConfig.Builder;
 
 import javax.annotation.concurrent.Immutable;
 
@@ -37,33 +36,33 @@ public class HoodieMetadataConfig extends DefaultHoodieConfig {
   public static final String METADATA_PREFIX = "hoodie.metadata";
 
   // Enable the internal Metadata Table which saves file listings
-  public static final String METADATA_ENABLE = METADATA_PREFIX + ".enable";
+  public static final String METADATA_ENABLE_PROP = METADATA_PREFIX + ".enable";
   public static final boolean DEFAULT_METADATA_ENABLE = false;
 
   // Validate contents of Metadata Table on each access against the actual filesystem
-  public static final String METADATA_VALIDATE = METADATA_PREFIX + ".validate";
+  public static final String METADATA_VALIDATE_PROP = METADATA_PREFIX + ".validate";
   public static final boolean DEFAULT_METADATA_VALIDATE = false;
 
   // Parallelism for inserts
-  public static final String INSERT_PARALLELISM = METADATA_PREFIX + ".insert.parallelism";
-  public static final int DEFAULT_INSERT_PARALLELISM = 1;
+  public static final String METADATA_INSERT_PARALLELISM_PROP = METADATA_PREFIX + ".insert.parallelism";
+  public static final int DEFAULT_METADATA_INSERT_PARALLELISM = 1;
 
   // Async clean
-  public static final String ASYNC_CLEAN = METADATA_PREFIX + ".clean.async";
-  public static final boolean DEFAULT_ASYNC_CLEAN = false;
+  public static final String METADATA_ASYNC_CLEAN_PROP = METADATA_PREFIX + ".clean.async";
+  public static final boolean DEFAULT_METADATA_ASYNC_CLEAN = false;
 
   // Maximum delta commits before compaction occurs
-  public static final String COMPACT_NUM_DELTA_COMMITS = METADATA_PREFIX + ".compact.max.delta.commits";
-  public static final int DEFAULT_COMPACT_NUM_DELTA_COMMITS = 24;
+  public static final String METADATA_COMPACT_NUM_DELTA_COMMITS_PROP = METADATA_PREFIX + ".compact.max.delta.commits";
+  public static final int DEFAULT_METADATA_COMPACT_NUM_DELTA_COMMITS = 24;
 
   // Archival settings
-  public static final String MIN_COMMITS_TO_KEEP = METADATA_PREFIX + ".keep.min.commits";
+  public static final String MIN_COMMITS_TO_KEEP_PROP = METADATA_PREFIX + ".keep.min.commits";
   public static final int DEFAULT_MIN_COMMITS_TO_KEEP = 20;
-  public static final String MAX_COMMITS_TO_KEEP = METADATA_PREFIX + ".keep.max.commits";
+  public static final String MAX_COMMITS_TO_KEEP_PROP = METADATA_PREFIX + ".keep.max.commits";
   public static final int DEFAULT_MAX_COMMITS_TO_KEEP = 30;
 
   // Cleaner commits retained
-  public static final String CLEANER_COMMITS_RETAINED = METADATA_PREFIX + ".cleaner.commits.retained";
+  public static final String CLEANER_COMMITS_RETAINED_PROP = METADATA_PREFIX + ".cleaner.commits.retained";
   public static final int DEFAULT_CLEANER_COMMITS_RETAINED = 3;
 
   private HoodieMetadataConfig(Properties props) {
@@ -91,58 +90,58 @@ public class HoodieMetadataConfig extends DefaultHoodieConfig {
     }
 
     public Builder enable(boolean enable) {
-      props.setProperty(METADATA_ENABLE, String.valueOf(enable));
+      props.setProperty(METADATA_ENABLE_PROP, String.valueOf(enable));
       return this;
     }
 
     public Builder validate(boolean validate) {
-      props.setProperty(METADATA_VALIDATE, String.valueOf(validate));
+      props.setProperty(METADATA_VALIDATE_PROP, String.valueOf(validate));
       return this;
     }
 
     public Builder withInsertParallelism(int parallelism) {
-      props.setProperty(INSERT_PARALLELISM, String.valueOf(parallelism));
+      props.setProperty(METADATA_INSERT_PARALLELISM_PROP, String.valueOf(parallelism));
       return this;
     }
 
     public Builder withAsyncClean(boolean asyncClean) {
-      props.setProperty(ASYNC_CLEAN, String.valueOf(asyncClean));
+      props.setProperty(METADATA_ASYNC_CLEAN_PROP, String.valueOf(asyncClean));
       return this;
     }
 
     public Builder withMaxNumDeltaCommitsBeforeCompaction(int maxNumDeltaCommitsBeforeCompaction) {
-      props.setProperty(COMPACT_NUM_DELTA_COMMITS, String.valueOf(maxNumDeltaCommitsBeforeCompaction));
+      props.setProperty(METADATA_COMPACT_NUM_DELTA_COMMITS_PROP, String.valueOf(maxNumDeltaCommitsBeforeCompaction));
       return this;
     }
 
     public Builder archiveCommitsWith(int minToKeep, int maxToKeep) {
-      props.setProperty(MIN_COMMITS_TO_KEEP, String.valueOf(minToKeep));
-      props.setProperty(MAX_COMMITS_TO_KEEP, String.valueOf(maxToKeep));
+      props.setProperty(MIN_COMMITS_TO_KEEP_PROP, String.valueOf(minToKeep));
+      props.setProperty(MAX_COMMITS_TO_KEEP_PROP, String.valueOf(maxToKeep));
       return this;
     }
 
     public Builder retainCommits(int commitsRetained) {
-      props.setProperty(CLEANER_COMMITS_RETAINED, String.valueOf(commitsRetained));
+      props.setProperty(CLEANER_COMMITS_RETAINED_PROP, String.valueOf(commitsRetained));
       return this;
     }
 
     public HoodieMetadataConfig build() {
       HoodieMetadataConfig config = new HoodieMetadataConfig(props);
-      setDefaultOnCondition(props, !props.containsKey(METADATA_ENABLE), METADATA_ENABLE,
+      setDefaultOnCondition(props, !props.containsKey(METADATA_ENABLE_PROP), METADATA_ENABLE_PROP,
           String.valueOf(DEFAULT_METADATA_ENABLE));
-      setDefaultOnCondition(props, !props.containsKey(METADATA_VALIDATE), METADATA_VALIDATE,
+      setDefaultOnCondition(props, !props.containsKey(METADATA_VALIDATE_PROP), METADATA_VALIDATE_PROP,
           String.valueOf(DEFAULT_METADATA_VALIDATE));
-      setDefaultOnCondition(props, !props.containsKey(INSERT_PARALLELISM), INSERT_PARALLELISM,
-          String.valueOf(DEFAULT_INSERT_PARALLELISM));
-      setDefaultOnCondition(props, !props.containsKey(ASYNC_CLEAN), ASYNC_CLEAN,
-          String.valueOf(DEFAULT_ASYNC_CLEAN));
-      setDefaultOnCondition(props, !props.containsKey(COMPACT_NUM_DELTA_COMMITS),
-          COMPACT_NUM_DELTA_COMMITS, String.valueOf(DEFAULT_COMPACT_NUM_DELTA_COMMITS));
-      setDefaultOnCondition(props, !props.containsKey(CLEANER_COMMITS_RETAINED), CLEANER_COMMITS_RETAINED,
+      setDefaultOnCondition(props, !props.containsKey(METADATA_INSERT_PARALLELISM_PROP), METADATA_INSERT_PARALLELISM_PROP,
+          String.valueOf(DEFAULT_METADATA_INSERT_PARALLELISM));
+      setDefaultOnCondition(props, !props.containsKey(METADATA_ASYNC_CLEAN_PROP), METADATA_ASYNC_CLEAN_PROP,
+          String.valueOf(DEFAULT_METADATA_ASYNC_CLEAN));
+      setDefaultOnCondition(props, !props.containsKey(METADATA_COMPACT_NUM_DELTA_COMMITS_PROP),
+          METADATA_COMPACT_NUM_DELTA_COMMITS_PROP, String.valueOf(DEFAULT_METADATA_COMPACT_NUM_DELTA_COMMITS));
+      setDefaultOnCondition(props, !props.containsKey(CLEANER_COMMITS_RETAINED_PROP), CLEANER_COMMITS_RETAINED_PROP,
           String.valueOf(DEFAULT_CLEANER_COMMITS_RETAINED));
-      setDefaultOnCondition(props, !props.containsKey(MAX_COMMITS_TO_KEEP), MAX_COMMITS_TO_KEEP,
+      setDefaultOnCondition(props, !props.containsKey(MAX_COMMITS_TO_KEEP_PROP), MAX_COMMITS_TO_KEEP_PROP,
           String.valueOf(DEFAULT_MAX_COMMITS_TO_KEEP));
-      setDefaultOnCondition(props, !props.containsKey(MIN_COMMITS_TO_KEEP), MIN_COMMITS_TO_KEEP,
+      setDefaultOnCondition(props, !props.containsKey(MIN_COMMITS_TO_KEEP_PROP), MIN_COMMITS_TO_KEEP_PROP,
           String.valueOf(DEFAULT_MIN_COMMITS_TO_KEEP));
 
       return config;
diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index b499ac9..293c3c0 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -763,35 +763,35 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
    * File listing metadata configs.
    */
   public boolean useFileListingMetadata() {
-    return Boolean.parseBoolean(props.getProperty(HoodieMetadataConfig.METADATA_ENABLE));
+    return Boolean.parseBoolean(props.getProperty(HoodieMetadataConfig.METADATA_ENABLE_PROP));
   }
 
   public boolean getFileListingMetadataVerify() {
-    return Boolean.parseBoolean(props.getProperty(HoodieMetadataConfig.METADATA_VALIDATE));
+    return Boolean.parseBoolean(props.getProperty(HoodieMetadataConfig.METADATA_VALIDATE_PROP));
   }
 
   public int getMetadataInsertParallelism() {
-    return Integer.parseInt(props.getProperty(HoodieMetadataConfig.INSERT_PARALLELISM));
+    return Integer.parseInt(props.getProperty(HoodieMetadataConfig.METADATA_INSERT_PARALLELISM_PROP));
   }
 
   public int getMetadataCompactDeltaCommitMax() {
-    return Integer.parseInt(props.getProperty(HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS));
+    return Integer.parseInt(props.getProperty(HoodieMetadataConfig.METADATA_COMPACT_NUM_DELTA_COMMITS_PROP));
   }
 
   public boolean isMetadataAsyncClean() {
-    return Boolean.parseBoolean(props.getProperty(HoodieMetadataConfig.ASYNC_CLEAN));
+    return Boolean.parseBoolean(props.getProperty(HoodieMetadataConfig.METADATA_ASYNC_CLEAN_PROP));
   }
 
   public int getMetadataMaxCommitsToKeep() {
-    return Integer.parseInt(props.getProperty(HoodieMetadataConfig.MAX_COMMITS_TO_KEEP));
+    return Integer.parseInt(props.getProperty(HoodieMetadataConfig.MAX_COMMITS_TO_KEEP_PROP));
   }
 
   public int getMetadataMinCommitsToKeep() {
-    return Integer.parseInt(props.getProperty(HoodieMetadataConfig.MIN_COMMITS_TO_KEEP));
+    return Integer.parseInt(props.getProperty(HoodieMetadataConfig.MIN_COMMITS_TO_KEEP_PROP));
   }
 
   public int getMetadataCleanerCommitsRetained() {
-    return Integer.parseInt(props.getProperty(HoodieMetadataConfig.CLEANER_COMMITS_RETAINED));
+    return Integer.parseInt(props.getProperty(HoodieMetadataConfig.CLEANER_COMMITS_RETAINED_PROP));
   }
 
   public static class Builder {
diff --git a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java
index 655890b..08a785d 100644
--- a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java
+++ b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java
@@ -99,7 +99,7 @@ public class HoodieMetadataWriter extends HoodieMetadataReader implements Serial
   private String tableName;
   private static Map<String, HoodieMetadataWriter> instances = new HashMap<>();
 
-  public static HoodieMetadataWriter instance(Configuration conf, HoodieWriteConfig writeConfig) {
+  public static HoodieMetadataWriter create(Configuration conf, HoodieWriteConfig writeConfig) {
     String key = writeConfig.getBasePath();
     if (instances.containsKey(key)) {
       if (instances.get(key).enabled() != writeConfig.useFileListingMetadata()) {
@@ -314,7 +314,6 @@ public class HoodieMetadataWriter extends HoodieMetadataReader implements Serial
       }
     }
     String createInstantTime = latestInstant.isPresent() ? latestInstant.get().getTimestamp() : SOLO_COMMIT_TIMESTAMP;
-
     LOG.info("Creating a new metadata table in " + metadataBasePath + " at instant " + createInstantTime);
     metaClient = HoodieTableMetaClient.initTableType(hadoopConf.get(), metadataBasePath.toString(),
         HoodieTableType.MERGE_ON_READ, tableName, "archived", HoodieMetadataPayload.class.getName(),
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
index 286e6db..471fc8d 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -634,7 +634,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
 
   public HoodieMetadataWriter metadata() {
     if (metadataWriter == null) {
-      metadataWriter = HoodieMetadataWriter.instance(hadoopConfiguration.get(), config);
+      metadataWriter = HoodieMetadataWriter.create(hadoopConfiguration.get(), config);
     }
 
     return metadataWriter;
diff --git a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
index 98675e4..62962d5 100644
--- a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
+++ b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
@@ -30,7 +30,6 @@ import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.Random;
 import java.util.stream.Collectors;
 
 import org.apache.hadoop.fs.FileStatus;
@@ -54,6 +53,7 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.table.view.TableFileSystemView;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
@@ -73,6 +73,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.ValueSource;
 
 public class TestHoodieMetadata extends HoodieClientTestHarness {
   private static final Logger LOG = LogManager.getLogger(TestHoodieMetadata.class);
@@ -473,13 +474,12 @@ public class TestHoodieMetadata extends HoodieClientTestHarness {
    * Instants on Metadata Table should be archived as per config.
    * Metadata Table should be automatically compacted as per config.
    */
-  @Test
-  public void testArchivingAndCompaction() throws Exception {
+  @ParameterizedTest
+  @ValueSource(booleans =  {true, false})
+  public void testArchivingAndCompaction(boolean asyncClean) throws Exception {
     init(HoodieTableType.COPY_ON_WRITE);
 
     final int maxDeltaCommitsBeforeCompaction = 6;
-    // Test autoClean and asyncClean based on this flag which is randomly chosen.
-    boolean asyncClean = new Random().nextBoolean();
     HoodieWriteConfig config = getWriteConfigBuilder(true, true, false)
         .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true)
             .archiveCommitsWith(2, 4).retainCommits(1)
@@ -645,7 +645,7 @@ public class TestHoodieMetadata extends HoodieClientTestHarness {
       return;
     }
 
-    long t1 = System.currentTimeMillis();
+    HoodieTimer timer = new HoodieTimer().startTimer();
 
     // Validate write config for metadata table
     HoodieWriteConfig metadataWriteConfig = metadata.getWriteConfig();
@@ -751,11 +751,11 @@ public class TestHoodieMetadata extends HoodieClientTestHarness {
           + numFileVersions + " but was " + fsView.getAllFileSlices(partition).count());
     });
 
-    LOG.info("Validation time=" + (System.currentTimeMillis() - t1));
+    LOG.info("Validation time=" + timer.endTimer());
   }
 
   private HoodieMetadataWriter metadata(HoodieWriteClient client) {
-    return HoodieMetadataWriter.instance(hadoopConf, client.getConfig());
+    return HoodieMetadataWriter.create(hadoopConf, client.getConfig());
   }
 
   // TODO: this can be moved to TestHarness after merge from master
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java
index cdda082..f9c2296 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java
@@ -20,6 +20,7 @@ package org.apache.hudi.common.fs;
 
 import org.apache.hudi.common.metrics.Registry;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 
@@ -70,12 +71,19 @@ public class HoodieWrapperFileSystem extends FileSystem {
     create, rename, delete, listStatus, mkdirs, getFileStatus, globStatus, listFiles, read, write
   }
 
+  private static Registry METRICS_REGISTRY_DATA;
+  private static Registry METRICS_REGISTRY_META;
+
+  public static void setMetricsRegistry(Registry registry, Registry registryMeta) {
+    METRICS_REGISTRY_DATA = registry;
+    METRICS_REGISTRY_META = registryMeta;
+  }
+
+
   private ConcurrentMap<String, SizeAwareFSDataOutputStream> openStreams = new ConcurrentHashMap<>();
   private FileSystem fileSystem;
   private URI uri;
   private ConsistencyGuard consistencyGuard = new NoOpConsistencyGuard();
-  private static Registry metricsRegistry;
-  private static Registry metricsRegistryMetaFolder;
 
   @FunctionalInterface
   public interface CheckedFunction<R> {
@@ -84,17 +92,17 @@ public class HoodieWrapperFileSystem extends FileSystem {
 
   private static Registry getMetricRegistryForPath(Path p) {
     return ((p != null) && (p.toString().contains(HoodieTableMetaClient.METAFOLDER_NAME)))
-        ? metricsRegistryMetaFolder : metricsRegistry;
+        ? METRICS_REGISTRY_META : METRICS_REGISTRY_DATA;
   }
 
   protected static <R> R executeFuncWithTimeMetrics(String metricName, Path p, CheckedFunction<R> func) throws IOException {
-    long t1 = System.currentTimeMillis();
+    HoodieTimer timer = new HoodieTimer().startTimer();
     R res = func.get();
 
     Registry registry = getMetricRegistryForPath(p);
     if (registry != null) {
       registry.increment(metricName);
-      registry.add(metricName + ".totalDuration", System.currentTimeMillis() - t1);
+      registry.add(metricName + ".totalDuration", timer.endTimer());
     }
 
     return res;
@@ -110,11 +118,6 @@ public class HoodieWrapperFileSystem extends FileSystem {
     return executeFuncWithTimeMetrics(metricName, p, func);
   }
 
-  public static void setMetricsRegistry(Registry registry, Registry registryMeta) {
-    metricsRegistry = registry;
-    metricsRegistryMetaFolder = registryMeta;
-  }
-
   public HoodieWrapperFileSystem() {}
 
   public HoodieWrapperFileSystem(FileSystem fileSystem, ConsistencyGuard consistencyGuard) {
@@ -206,12 +209,10 @@ public class HoodieWrapperFileSystem extends FileSystem {
   }
 
   private FSDataInputStream wrapInputStream(final Path path, FSDataInputStream fsDataInputStream) throws IOException {
-    if (fsDataInputStream instanceof SizeAwareFSDataInputStream) {
+    if (fsDataInputStream instanceof TimedFSDataInputStream) {
       return fsDataInputStream;
     }
-
-    SizeAwareFSDataInputStream os = new SizeAwareFSDataInputStream(path, fsDataInputStream);
-    return os;
+    return new TimedFSDataInputStream(path, fsDataInputStream);
   }
 
   @Override
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataInputStream.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/TimedFSDataInputStream.java
similarity index 82%
rename from hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataInputStream.java
rename to hudi-common/src/main/java/org/apache/hudi/common/fs/TimedFSDataInputStream.java
index f5adb0d..c621111 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataInputStream.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/TimedFSDataInputStream.java
@@ -28,14 +28,14 @@ import java.nio.ByteBuffer;
 import java.util.EnumSet;
 
 /**
- * Wrapper over <code>FSDataInputStream</code> to keep track of the size of the written bytes.
+ * Wrapper over <code>FSDataInputStream</code> that also times the operations.
  */
-public class SizeAwareFSDataInputStream extends FSDataInputStream {
+public class TimedFSDataInputStream extends FSDataInputStream {
 
   // Path
   private final Path path;
 
-  public SizeAwareFSDataInputStream(Path path, FSDataInputStream in) throws IOException {
+  public TimedFSDataInputStream(Path path, FSDataInputStream in) {
     super(in);
     this.path = path;
   }
@@ -43,26 +43,20 @@ public class SizeAwareFSDataInputStream extends FSDataInputStream {
   @Override
   public int read(ByteBuffer buf) throws IOException {
     return HoodieWrapperFileSystem.executeFuncWithTimeAndByteMetrics(HoodieWrapperFileSystem.MetricName.read.name(),
-        path, 0, () -> {
-            return super.read(buf);
-      });
+        path, 0, () -> super.read(buf));
   }
 
   @Override
   public int read(long position, byte[] buffer, int offset, int length) throws IOException {
     return HoodieWrapperFileSystem.executeFuncWithTimeAndByteMetrics(HoodieWrapperFileSystem.MetricName.read.name(),
-        path, length, () -> {
-            return super.read(position, buffer, offset, length);
-      });
+        path, length, () -> super.read(position, buffer, offset, length));
   }
 
   @Override
   public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts)
           throws IOException, UnsupportedOperationException {
     return HoodieWrapperFileSystem.executeFuncWithTimeAndByteMetrics(HoodieWrapperFileSystem.MetricName.read.name(),
-        path, maxLength, () -> {
-          return super.read(bufferPool, maxLength, opts);
-      });
+        path, maxLength, () -> super.read(bufferPool, maxLength, opts));
   }
 
   @Override
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/metrics/LocalRegistry.java b/hudi-common/src/main/java/org/apache/hudi/common/metrics/LocalRegistry.java
index 36aeab9..9383223 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/metrics/LocalRegistry.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/metrics/LocalRegistry.java
@@ -23,11 +23,11 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
- * Lightweight Metrics Registry to track Hudi events.
+ * Registry that tracks metrics local to a single jvm process.
  */
 public class LocalRegistry implements Registry {
   ConcurrentHashMap<String, Counter> counters = new ConcurrentHashMap<>();
-  private String name;
+  private final String name;
 
   public LocalRegistry(String name) {
     this.name = name;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java b/hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java
index 0a56297..19822fb 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java
@@ -30,7 +30,8 @@ import org.apache.hudi.common.util.ReflectionUtils;
  * Interface which defines a lightweight Metrics Registry to track Hudi events.
  */
 public interface Registry extends Serializable {
-  static ConcurrentHashMap<String, Registry> REGISTRYMAP = new ConcurrentHashMap<>();
+
+  ConcurrentHashMap<String, Registry> REGISTRY_MAP = new ConcurrentHashMap<>();
 
   /**
    * Get (or create) the registry for a provided name.
@@ -39,7 +40,7 @@ public interface Registry extends Serializable {
    *
    * @param registryName Name of the registry
    */
-  public static Registry getRegistry(String registryName) {
+  static Registry getRegistry(String registryName) {
     return getRegistry(registryName, LocalRegistry.class.getName());
   }
 
@@ -49,13 +50,13 @@ public interface Registry extends Serializable {
    * @param registryName Name of the registry.
    * @param clazz The fully qualified name of the registry class to create.
    */
-  public static Registry getRegistry(String registryName, String clazz) {
+  static Registry getRegistry(String registryName, String clazz) {
     synchronized (Registry.class) {
-      if (!REGISTRYMAP.containsKey(registryName)) {
+      if (!REGISTRY_MAP.containsKey(registryName)) {
         Registry registry = (Registry)ReflectionUtils.loadClass(clazz, registryName);
-        REGISTRYMAP.put(registryName, registry);
+        REGISTRY_MAP.put(registryName, registry);
       }
-      return REGISTRYMAP.get(registryName);
+      return REGISTRY_MAP.get(registryName);
     }
   }
 
@@ -66,10 +67,10 @@ public interface Registry extends Serializable {
    * @param prefixWithRegistryName prefix each metric name with the registry name.
    * @return
    */
-  public static Map<String, Long> getAllMetrics(boolean flush, boolean prefixWithRegistryName) {
+  static Map<String, Long> getAllMetrics(boolean flush, boolean prefixWithRegistryName) {
     synchronized (Registry.class) {
       HashMap<String, Long> allMetrics = new HashMap<>();
-      REGISTRYMAP.forEach((registryName, registry) -> {
+      REGISTRY_MAP.forEach((registryName, registry) -> {
         allMetrics.putAll(registry.getAllCounts(prefixWithRegistryName));
         if (flush) {
           registry.clear();
@@ -82,14 +83,14 @@ public interface Registry extends Serializable {
   /**
    * Clear all metrics.
    */
-  public void clear();
+  void clear();
 
   /**
    * Increment the metric.
    *
    * @param name Name of the metric to increment.
    */
-  public void increment(String name);
+  void increment(String name);
 
   /**
    * Add value to the metric.
@@ -97,12 +98,12 @@ public interface Registry extends Serializable {
    * @param name Name of the metric.
    * @param value The value to add to the metrics.
    */
-  public void add(String name, long value);
+  void add(String name, long value);
 
   /**
    * Get all Counter type metrics.
    */
-  public default Map<String, Long> getAllCounts() {
+  default Map<String, Long> getAllCounts() {
     return getAllCounts(false);
   }
 
@@ -111,5 +112,5 @@ public interface Registry extends Serializable {
    *
    * @param prefixWithRegistryName If true, the names of all metrics are prefixed with name of this registry.
    */
-  public abstract Map<String, Long> getAllCounts(boolean prefixWithRegistryName);
+  Map<String, Long> getAllCounts(boolean prefixWithRegistryName);
 }
\ No newline at end of file
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
index d7ef2d3..7c96a0f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
@@ -19,7 +19,7 @@
 package org.apache.hudi.common.table.log;
 
 import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.fs.SizeAwareFSDataInputStream;
+import org.apache.hudi.common.fs.TimedFSDataInputStream;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
@@ -73,7 +73,7 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
       boolean readBlockLazily, boolean reverseReader) throws IOException {
     FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), bufferSize);
     if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
-      this.inputStream = new SizeAwareFSDataInputStream(logFile.getPath(), new FSDataInputStream(
+      this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
           new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
     } else {
       // fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataReader.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataReader.java
index fed5516..c14f402 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataReader.java
@@ -52,6 +52,7 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.SpillableMapUtils;
 import org.apache.hudi.common.util.ValidationUtils;
@@ -133,9 +134,6 @@ public class HoodieMetadataReader implements Serializable {
 
   /**
    * Create a the Metadata Table in read-only mode.
-   *
-   * @param hadoopConf {@code Configuration}
-   * @param basePath The basePath for the dataset
    */
   public HoodieMetadataReader(Configuration conf, String datasetBasePath, String spillableMapDirectory,
                               boolean enabled, boolean validateLookups) {
@@ -144,9 +142,6 @@ public class HoodieMetadataReader implements Serializable {
 
   /**
    * Create a the Metadata Table in read-only mode.
-   *
-   * @param hadoopConf {@code Configuration}
-   * @param basePath The basePath for the dataset
    */
   public HoodieMetadataReader(Configuration conf, String datasetBasePath, String spillableMapDirectory,
                               boolean enabled, boolean validateLookups, boolean enableMetrics) {
@@ -230,9 +225,9 @@ public class HoodieMetadataReader implements Serializable {
    * Returns a list of all partitions.
    */
   protected List<String> getAllPartitionPaths() throws IOException {
-    long t1 = System.currentTimeMillis();
+    HoodieTimer timer = new HoodieTimer().startTimer();
     Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord = getMergedRecordByKey(RECORDKEY_PARTITION_LIST);
-    updateMetrics(LOOKUP_PARTITIONS_STR, System.currentTimeMillis() - t1);
+    updateMetrics(LOOKUP_PARTITIONS_STR, timer.endTimer());
 
     List<String> partitions = Collections.emptyList();
     if (hoodieRecord.isPresent()) {
@@ -251,9 +246,9 @@ public class HoodieMetadataReader implements Serializable {
 
     if (validateLookups) {
       // Validate the Metadata Table data by listing the partitions from the file system
-      t1 = System.currentTimeMillis();
+      timer.startTimer();
       List<String> actualPartitions  = getAllPartitionPathsByListing(metaClient.getFs(), datasetBasePath, false);
-      updateMetrics(VALIDATE_PARTITIONS_STR, System.currentTimeMillis() - t1);
+      updateMetrics(VALIDATE_PARTITIONS_STR, timer.endTimer());
 
       Collections.sort(actualPartitions);
       Collections.sort(partitions);
@@ -284,9 +279,9 @@ public class HoodieMetadataReader implements Serializable {
       partitionName = NON_PARTITIONED_NAME;
     }
 
-    long t1 = System.currentTimeMillis();
+    HoodieTimer timer = new HoodieTimer().startTimer();
     Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord = getMergedRecordByKey(partitionName);
-    updateMetrics(LOOKUP_FILES_STR, System.currentTimeMillis() - t1);
+    updateMetrics(LOOKUP_FILES_STR, timer.endTimer());
 
     FileStatus[] statuses = {};
     if (hoodieRecord.isPresent()) {
@@ -299,20 +294,21 @@ public class HoodieMetadataReader implements Serializable {
 
     if (validateLookups) {
       // Validate the Metadata Table data by listing the partitions from the file system
-      t1 = System.currentTimeMillis();
+      timer.startTimer();
 
       // Ignore partition metadata file
       FileStatus[] directStatuses = metaClient.getFs().listStatus(partitionPath,
           p -> !p.getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
-      updateMetrics(VALIDATE_FILES_STR, System.currentTimeMillis() - t1);
+      updateMetrics(VALIDATE_FILES_STR, timer.endTimer());
 
       List<String> directFilenames = Arrays.stream(directStatuses)
-          .map(s -> s.getPath().getName()).collect(Collectors.toList());
+          .map(s -> s.getPath().getName()).sorted()
+          .collect(Collectors.toList());
+
       List<String> metadataFilenames = Arrays.stream(statuses)
-          .map(s -> s.getPath().getName()).collect(Collectors.toList());
+          .map(s -> s.getPath().getName()).sorted()
+          .collect(Collectors.toList());
 
-      Collections.sort(metadataFilenames);
-      Collections.sort(directFilenames);
       if (!metadataFilenames.equals(directFilenames)) {
         LOG.error("Validation of metadata file listing for partition " + partitionName + " failed.");
         LOG.error("File list from metadata: " + Arrays.toString(metadataFilenames.toArray()));
@@ -350,12 +346,12 @@ public class HoodieMetadataReader implements Serializable {
     // Retrieve record from base file
     HoodieRecord<HoodieMetadataPayload> hoodieRecord = null;
     if (basefileReader != null) {
-      long t1 = System.currentTimeMillis();
+      HoodieTimer timer = new HoodieTimer().startTimer();
       Option<GenericRecord> baseRecord = basefileReader.getRecordByKey(key);
       if (baseRecord.isPresent()) {
         hoodieRecord = SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) baseRecord.get(),
             metaClient.getTableConfig().getPayloadClass());
-        updateMetrics(BASEFILE_READ_STR, System.currentTimeMillis() - t1);
+        updateMetrics(BASEFILE_READ_STR, timer.endTimer());
       }
     }
 


[hudi] 07/14: [HUDI-1317] Fix initialization when Async jobs are scheduled.

Posted by vi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch rfc-15
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 6ade85078b50e3c9ffea0b6a79ebe8cd35747daf
Author: Prashant Wason <pw...@uber.com>
AuthorDate: Tue Oct 20 11:22:13 2020 -0700

    [HUDI-1317] Fix initialization when Async jobs are scheduled.
    
    Logic is explained in the JIRA ticket.
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  2 -
 .../apache/hudi/metadata/HoodieMetadataWriter.java | 50 ++++++++++++++--------
 .../apache/hudi/metadata/TestHoodieMetadata.java   | 50 +++++++++++++++-------
 3 files changed, 68 insertions(+), 34 deletions(-)

diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index d8d732f..b499ac9 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -42,8 +42,6 @@ import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.StringReader;
-import java.io.StringWriter;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
diff --git a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java
index 936d294..c7eb33f 100644
--- a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java
+++ b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java
@@ -58,9 +58,9 @@ import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.timeline.TimelineLayout;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
@@ -305,7 +305,16 @@ public class HoodieMetadataWriter extends HoodieMetadataReader implements Serial
     ValidationUtils.checkState(enabled, "Metadata table cannot be initialized as it is not enabled");
 
     // If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP as the instant time for initial commit
-    Option<HoodieInstant> latestInstant = datasetMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
+    // Otherwise, we use the timestamp of the instant which does not have any non-completed instants before it.
+    Option<HoodieInstant> latestInstant = Option.empty();
+    boolean foundNonComplete = false;
+    for (HoodieInstant instant : datasetMetaClient.getActiveTimeline().getInstants().collect(Collectors.toList())) {
+      if (!instant.isCompleted()) {
+        foundNonComplete = true;
+      } else if (!foundNonComplete) {
+        latestInstant = Option.of(instant);
+      }
+    }
     String createInstantTime = latestInstant.isPresent() ? latestInstant.get().getTimestamp() : SOLO_COMMIT_TIMESTAMP;
 
     LOG.info("Creating a new metadata table in " + metadataBasePath + " at instant " + createInstantTime);
@@ -320,7 +329,6 @@ public class HoodieMetadataWriter extends HoodieMetadataReader implements Serial
 
     // List all partitions in parallel and collect the files in them
     final String dbasePath = datasetBasePath;
-    final SerializableConfiguration serializedConf = new SerializableConfiguration(hadoopConf);
     int parallelism =  Math.min(partitions.size(), jsc.defaultParallelism()) + 1; // +1 to prevent 0 parallelism
     JavaPairRDD<String, FileStatus[]> partitionFileListRDD = jsc.parallelize(partitions, parallelism)
         .mapToPair(partition -> {
@@ -335,6 +343,7 @@ public class HoodieMetadataWriter extends HoodieMetadataReader implements Serial
     // Create a HoodieCommitMetadata with writeStats for all discovered files
     int[] stats = {0};
     HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+
     partitionFileList.forEach(t -> {
       final String partition = t._1;
       try {
@@ -345,25 +354,32 @@ public class HoodieMetadataWriter extends HoodieMetadataReader implements Serial
         throw new HoodieMetadataException("Failed to check partition " + partition, e);
       }
 
+      // Filter the statuses to only include files which were created before or on createInstantTime
+      Arrays.stream(t._2).filter(status -> {
+        String filename = status.getPath().getName();
+        if (filename.equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) {
+          return false;
+        }
+        if (HoodieTimeline.compareTimestamps(FSUtils.getCommitTime(filename), HoodieTimeline.GREATER_THAN,
+            createInstantTime)) {
+          return false;
+        }
+        return true;
+      }).forEach(status -> {
+        HoodieWriteStat writeStat = new HoodieWriteStat();
+        writeStat.setPath(partition + Path.SEPARATOR + status.getPath().getName());
+        writeStat.setPartitionPath(partition);
+        writeStat.setTotalWriteBytes(status.getLen());
+        metadata.addWriteStat(partition, writeStat);
+        stats[0] += 1;
+      });
+
       // If the partition has no files then create a writeStat with no file path
-      if (t._2.length == 0) {
+      if (metadata.getWriteStats(partition) == null) {
         HoodieWriteStat writeStat = new HoodieWriteStat();
         writeStat.setPartitionPath(partition);
         metadata.addWriteStat(partition, writeStat);
-      } else {
-        Arrays.stream(t._2).forEach(status -> {
-          String filename = status.getPath().getName();
-          if (filename.equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) {
-            return;
-          }
-          HoodieWriteStat writeStat = new HoodieWriteStat();
-          writeStat.setPath(partition + Path.SEPARATOR + filename);
-          writeStat.setPartitionPath(partition);
-          writeStat.setTotalWriteBytes(status.getLen());
-          metadata.addWriteStat(partition, writeStat);
-        });
       }
-      stats[0] += t._2.length;
     });
 
     LOG.info("Committing " + partitionFileList.size() + " partitions and " + stats[0] + " files to metadata");
diff --git a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
index 0c5839b..36a8c13 100644
--- a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
+++ b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
@@ -25,6 +25,8 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -38,7 +40,6 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.utils.ClientUtils;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.metrics.Registry;
-import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieFileGroup;
 import org.apache.hudi.common.model.HoodieKey;
@@ -52,6 +53,7 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.table.view.TableFileSystemView;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestTable;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieCompactionConfig;
@@ -82,16 +84,23 @@ public class TestHoodieMetadata extends HoodieClientTestHarness {
   private String metadataTableBasePath;
 
   public void init() throws IOException {
-    init(HoodieTableType.MERGE_ON_READ);
+    init(HoodieTableType.MERGE_ON_READ, true);
   }
 
   public void init(HoodieTableType tableType) throws IOException {
-    initDFS();
-    initSparkContexts("TestHoodieMetadata");
-    hadoopConf.addResource(dfs.getConf());
+    init(tableType, true);
+  }
+
+  public void init(HoodieTableType tableType, boolean useDFS) throws IOException {
     initPath();
-    dfs.mkdirs(new Path(basePath));
-    metaClient = HoodieTestUtils.init(hadoopConf, basePath, tableType);
+    initSparkContexts("TestHoodieMetadata");
+    initFileSystem();
+    if (useDFS) {
+      initDFS();
+      dfs.mkdirs(new Path(basePath));
+    }
+    initMetaClient();
+
     initTestDataGenerator();
 
     metadataTableBasePath = HoodieMetadataReader.getMetadataTableBasePath(basePath);
@@ -133,22 +142,33 @@ public class TestHoodieMetadata extends HoodieClientTestHarness {
    */
   @Test
   public void testOnlyValidPartitionsAdded() throws Exception {
-    init();
-
-    HoodieTestDataGenerator.writePartitionMetadata(dfs, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, basePath);
+    // This test requires local file system
+    init(HoodieTableType.MERGE_ON_READ, false);
 
     // Create an empty directory which is not a partition directory (lacks partition metadata)
     final String nonPartitionDirectory = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0] + "-nonpartition";
-    final Path nonPartitionPath = new Path(basePath, nonPartitionDirectory);
-    dfs.mkdirs(nonPartitionPath);
+    Files.createDirectories(Paths.get(basePath, nonPartitionDirectory));
+
+    // Create some commits
+    HoodieTestTable testTable = HoodieTestTable.of(metaClient);
+    testTable.withPartitionMetaFiles("p1", "p2")
+             .addCommit("001").withBaseFilesInPartition("p1", 10).withBaseFilesInPartition("p2", 10, 10)
+             .addCommit("002").withBaseFilesInPartition("p1", 10).withBaseFilesInPartition("p2", 10, 10, 10)
+             .addInflightCommit("003").withBaseFilesInPartition("p1", 10).withBaseFilesInPartition("p2", 10);
 
     try (HoodieWriteClient client = new HoodieWriteClient<>(jsc, getWriteConfig(true, true))) {
-      client.startCommitWithTime("001");
-      validateMetadata(client);
+      client.startCommitWithTime("005");
 
-      List<String> partitions = metadata(client).getAllPartitionPaths(dfs, basePath, true);
+      List<String> partitions = metadata(client).getAllPartitionPaths(dfs, basePath, false);
       assertFalse(partitions.contains(nonPartitionDirectory),
           "Must not contain the non-partition " + nonPartitionDirectory);
+      assertTrue(partitions.contains("p1"), "Must contain partition p1");
+      assertTrue(partitions.contains("p2"), "Must contain partition p2");
+
+      FileStatus[] statuses = metadata(client).getAllFilesInPartition(hadoopConf, basePath, new Path(basePath, "p1"));
+      assertTrue(statuses.length == 2);
+      statuses = metadata(client).getAllFilesInPartition(hadoopConf, basePath, new Path(basePath, "p2"));
+      assertTrue(statuses.length == 5);
     }
   }
 


[hudi] 10/14: [RFC-15] Fixing metrics printing to console which generates a lot of text.

Posted by vi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch rfc-15
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 9e8025d98dcbbe756cb04a259d14b68f7368e7ee
Author: Prashant Wason <pw...@uber.com>
AuthorDate: Tue Oct 27 22:53:26 2020 -0700

    [RFC-15] Fixing metrics printing to console which generates a lot of text.
    
    Changed to using memory metrics.
---
 .../src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java     | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
index e830054..3f8d4ca 100644
--- a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
+++ b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
@@ -63,6 +63,7 @@ import org.apache.hudi.config.HoodieStorageConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.TableNotFoundException;
 import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.metrics.Metrics;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.testutils.HoodieClientTestHarness;
 import org.apache.log4j.LogManager;
@@ -784,7 +785,7 @@ public class TestHoodieMetadata extends HoodieClientTestHarness {
             .withEnableBackupForRemoteFileSystemView(false).build())
         .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
         .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(useFileListingMetadata).build())
-        .withMetricsConfig(HoodieMetricsConfig.newBuilder().withReporterType("CONSOLE").on(enableMetrics)
+        .withMetricsConfig(HoodieMetricsConfig.newBuilder().on(enableMetrics)
                            .withExecutorMetrics(true).usePrefix("unit-test").build());
   }
 }


[hudi] 09/14: [HUDI-1305] Added an API to shutdown and remove the metrics reporter. (#2132)

Posted by vi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch rfc-15
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 86fdd659d899b74dab3cc1eb0965d90fc78ea554
Author: Prashant Wason <pw...@uber.com>
AuthorDate: Sun Oct 4 09:30:04 2020 -0700

    [HUDI-1305] Added an API to shutdown and remove the metrics reporter. (#2132)
    
    This helps in removing reporter once the test has complete. Prevents log pollution from un-necessary metric logs.
    
    - Added an API to shutdown the metrics reporter after tests.
---
 .../hudi/metrics/ConsoleMetricsReporter.java       |  3 +++
 .../main/java/org/apache/hudi/metrics/Metrics.java | 30 +++++++++++++++-------
 .../hudi/metrics/TestHoodieConsoleMetrics.java     |  6 +++++
 3 files changed, 30 insertions(+), 9 deletions(-)

diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/ConsoleMetricsReporter.java b/hudi-client/src/main/java/org/apache/hudi/metrics/ConsoleMetricsReporter.java
index ca96109..b65c4ad 100644
--- a/hudi-client/src/main/java/org/apache/hudi/metrics/ConsoleMetricsReporter.java
+++ b/hudi-client/src/main/java/org/apache/hudi/metrics/ConsoleMetricsReporter.java
@@ -68,5 +68,8 @@ public class ConsoleMetricsReporter extends MetricsReporter {
 
   @Override
   public void stop() {
+    if (consoleReporter != null) {
+      consoleReporter.stop();
+    }
   }
 }
diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java b/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java
index a9cf14e..c4107ce 100644
--- a/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java
+++ b/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java
@@ -52,18 +52,22 @@ public class Metrics {
     reporter.start();
 
     Runtime.getRuntime().addShutdownHook(new Thread(() -> {
-      try {
-        registerHoodieCommonMetrics();
-        reporter.report();
-        if (getReporter() != null) {
-          getReporter().close();
-        }
-      } catch (Exception e) {
-        LOG.warn("Error while closing reporter", e);
-      }
+      reportAndCloseReporter();
     }));
   }
 
+  private void reportAndCloseReporter() {
+    try {
+      registerHoodieCommonMetrics();
+      reporter.report();
+      if (getReporter() != null) {
+        getReporter().close();
+      }
+    } catch (Exception e) {
+      LOG.warn("Error while closing reporter", e);
+    }
+  }
+
   private void registerHoodieCommonMetrics() {
     registerGauges(Registry.getAllMetrics(true, true), Option.empty());
   }
@@ -85,6 +89,14 @@ public class Metrics {
     initialized = true;
   }
 
+  public static synchronized void shutdown() {
+    if (!initialized) {
+      return;
+    }
+    metrics.reportAndCloseReporter();
+    initialized = false;
+  }
+
   public static void registerGauges(Map<String, Long> metricsMap, Option<String> prefix) {
     String metricPrefix = prefix.isPresent() ? prefix.get() + "." : "";
     metricsMap.forEach((k, v) -> registerGauge(metricPrefix + k, v));
diff --git a/hudi-client/src/test/java/org/apache/hudi/metrics/TestHoodieConsoleMetrics.java b/hudi-client/src/test/java/org/apache/hudi/metrics/TestHoodieConsoleMetrics.java
index e644730..7424d0b 100644
--- a/hudi-client/src/test/java/org/apache/hudi/metrics/TestHoodieConsoleMetrics.java
+++ b/hudi-client/src/test/java/org/apache/hudi/metrics/TestHoodieConsoleMetrics.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.metrics;
 
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
@@ -38,6 +39,11 @@ public class TestHoodieConsoleMetrics {
     new HoodieMetrics(config, "raw_table");
   }
 
+  @AfterEach
+  public void stop() {
+    Metrics.shutdown();
+  }
+
   @Test
   public void testRegisterGauge() {
     registerGauge("metric1", 123L);


[hudi] 05/14: [HUDI-1346] Choose a new instant time when performing autoClean.

Posted by vi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch rfc-15
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit a2dcacf7b370096e5a65dcefdbe9dcc7cdd80078
Author: Prashant Wason <pw...@uber.com>
AuthorDate: Fri Oct 16 18:15:31 2020 -0700

    [HUDI-1346] Choose a new instant time when performing autoClean.
---
 .../org/apache/hudi/client/AsyncCleanerService.java   |  7 ++++---
 .../org/apache/hudi/client/HoodieWriteClient.java     | 19 ++++++++++---------
 .../org/apache/hudi/metadata/TestHoodieMetadata.java  |  5 ++++-
 3 files changed, 18 insertions(+), 13 deletions(-)

diff --git a/hudi-client/src/main/java/org/apache/hudi/client/AsyncCleanerService.java b/hudi-client/src/main/java/org/apache/hudi/client/AsyncCleanerService.java
index 6367e79..331948d 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/AsyncCleanerService.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/AsyncCleanerService.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.client;
 
 import org.apache.hudi.async.AbstractAsyncService;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.log4j.LogManager;
@@ -52,11 +53,11 @@ class AsyncCleanerService extends AbstractAsyncService {
     }), executor);
   }
 
-  public static AsyncCleanerService startAsyncCleaningIfEnabled(HoodieWriteClient writeClient,
-                                                                String instantTime) {
+  public static AsyncCleanerService startAsyncCleaningIfEnabled(HoodieWriteClient writeClient) {
     AsyncCleanerService asyncCleanerService = null;
     if (writeClient.getConfig().isAutoClean() && writeClient.getConfig().isAsyncClean()) {
-      LOG.info("Auto cleaning is enabled. Running cleaner async to write operation");
+      String instantTime = HoodieActiveTimeline.createNewInstantTime();
+      LOG.info("Auto cleaning is enabled. Running cleaner async to write operation at instant time " + instantTime);
       asyncCleanerService = new AsyncCleanerService(writeClient, instantTime);
       asyncCleanerService.start(null);
     } else {
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
index a0019b0..614f0dc 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
@@ -193,7 +193,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
     HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.UPSERT, instantTime);
     table.validateUpsertSchema();
     setOperationType(WriteOperationType.UPSERT);
-    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
     HoodieWriteMetadata result = table.upsert(jsc, instantTime, records);
     if (result.getIndexLookupDuration().isPresent()) {
       metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
@@ -214,7 +214,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
     HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED, instantTime);
     table.validateUpsertSchema();
     setOperationType(WriteOperationType.UPSERT_PREPPED);
-    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
     HoodieWriteMetadata result = table.upsertPrepped(jsc,instantTime, preppedRecords);
     return postWrite(result, instantTime, table);
   }
@@ -233,7 +233,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
     HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.INSERT, instantTime);
     table.validateInsertSchema();
     setOperationType(WriteOperationType.INSERT);
-    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
     HoodieWriteMetadata result = table.insert(jsc,instantTime, records);
     return postWrite(result, instantTime, table);
   }
@@ -253,7 +253,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
     HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.INSERT_PREPPED, instantTime);
     table.validateInsertSchema();
     setOperationType(WriteOperationType.INSERT_PREPPED);
-    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
     HoodieWriteMetadata result = table.insertPrepped(jsc,instantTime, preppedRecords);
     return postWrite(result, instantTime, table);
   }
@@ -293,7 +293,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
     HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.BULK_INSERT, instantTime);
     table.validateInsertSchema();
     setOperationType(WriteOperationType.BULK_INSERT);
-    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
     HoodieWriteMetadata result = table.bulkInsert(jsc,instantTime, records, userDefinedBulkInsertPartitioner);
     return postWrite(result, instantTime, table);
   }
@@ -319,7 +319,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
     HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.BULK_INSERT_PREPPED, instantTime);
     table.validateInsertSchema();
     setOperationType(WriteOperationType.BULK_INSERT_PREPPED);
-    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
     HoodieWriteMetadata result = table.bulkInsertPrepped(jsc,instantTime, preppedRecords, bulkInsertPartitioner);
     return postWrite(result, instantTime, table);
   }
@@ -335,7 +335,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
     HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE, instantTime);
     table.validateInsertSchema();
     setOperationType(WriteOperationType.INSERT_OVERWRITE);
-    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
     HoodieWriteMetadata result = table.insertOverwrite(jsc, instantTime, records);
     return new HoodieWriteResult(postWrite(result, instantTime, table), result.getPartitionToReplaceFileIds());
   }
@@ -425,8 +425,9 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
         AsyncCleanerService.waitForCompletion(asyncCleanerService);
         LOG.info("Cleaner has finished");
       } else {
+        // Do not reuse instantTime for clean as metadata table requires all changes to have unique instant timestamps.
         LOG.info("Auto cleaning is enabled. Running cleaner now");
-        clean(instantTime);
+        clean();
       }
     }
   }
@@ -569,7 +570,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
    * cleaned)
    */
   public HoodieCleanMetadata clean(String cleanInstantTime) throws HoodieIOException {
-    LOG.info("Cleaner started");
+    LOG.info("Cleaner started for instant time " + cleanInstantTime);
     final Timer.Context context = metrics.getCleanCtx();
     HoodieCleanMetadata metadata = getTable().clean(jsc, cleanInstantTime);
     if (context != null && metadata != null) {
diff --git a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
index 9395d5f..751e2ab 100644
--- a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
+++ b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
@@ -28,6 +28,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Random;
 import java.util.stream.Collectors;
 
 import org.apache.hadoop.fs.FileStatus;
@@ -457,6 +458,8 @@ public class TestHoodieMetadata extends HoodieClientTestHarness {
     init(HoodieTableType.COPY_ON_WRITE);
 
     final int maxDeltaCommitsBeforeCompaction = 6;
+    // Test autoClean and asyncClean based on this flag which is randomly chosen.
+    boolean asyncClean = new Random().nextBoolean();
     HoodieWriteConfig config = getWriteConfigBuilder(true, true, false)
         .withMetadataCompactionConfig(HoodieCompactionConfig.newBuilder()
             .archiveCommitsWith(2, 4).retainCommits(1).retainFileVersions(1).withAutoClean(true)
@@ -464,7 +467,7 @@ public class TestHoodieMetadata extends HoodieClientTestHarness {
             .withInlineCompaction(true).withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsBeforeCompaction)
             .build())
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 3)
-            .retainCommits(1).retainFileVersions(1).withAutoClean(false).build())
+            .retainCommits(1).retainFileVersions(1).withAutoClean(true).withAsyncClean(asyncClean).build())
         .build();
     List<HoodieRecord> records;
     HoodieTableMetaClient metaClient = ClientUtils.createMetaClient(jsc.hadoopConfiguration(), config, true);


[hudi] 12/14: [RFC-15] Reverting change to AbstractHoodieLogRecordScanner as they lead to hung tests.

Posted by vi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch rfc-15
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 34872bbfccf36814aaa03ca3aadee3b8d7f8893b
Author: Prashant Wason <pw...@uber.com>
AuthorDate: Wed Oct 28 01:44:38 2020 -0700

    [RFC-15] Reverting change to AbstractHoodieLogRecordScanner as they lead to hung tests.
---
 .../common/table/log/AbstractHoodieLogRecordScanner.java   | 14 ++++++++------
 1 file changed, 8 insertions(+), 6 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
index ab60752..7c51524 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
@@ -208,13 +208,15 @@ public abstract class AbstractHoodieLogRecordScanner {
                     LOG.info("Rolling back the last log block read in " + logFile.getPath());
                     currentInstantLogBlocks.pop();
                     numBlocksRolledBack++;
+                  } else if (!targetInstantForCommandBlock
+                      .contentEquals(currentInstantLogBlocks.peek().getLogBlockHeader().get(INSTANT_TIME))) {
+                    // invalid or extra rollback block
+                    LOG.warn("TargetInstantTime " + targetInstantForCommandBlock
+                        + " invalid or extra rollback command block in " + logFile.getPath());
+                    break;
                   } else {
-                    if (numBlocksRolledBack == 0) {
-                      // no blocks rolled back. this was an invalid or extra rollback block
-                      LOG.warn("TargetInstantTime " + targetInstantForCommandBlock
-                          + " invalid or extra rollback command block in " + logFile.getPath());
-                      break;
-                    }
+                    // this should not happen ideally
+                    LOG.warn("Unable to apply rollback command block in " + logFile.getPath());
                   }
                 }