You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yu...@apache.org on 2022/09/30 06:39:09 UTC

[hudi] 02/10: [HUDI-4722] Added locking metrics for Hudi (#6502)

This is an automated email from the ASF dual-hosted git repository.

yuzhaojing pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 2e0e8ab7c0f86aadc69bcedbac81b919f9fbde00
Author: jsbali <js...@uber.com>
AuthorDate: Thu Sep 29 11:07:46 2022 +0530

    [HUDI-4722] Added locking metrics for Hudi (#6502)
---
 .../hudi/client/transaction/lock/LockManager.java  |  16 ++-
 .../lock/metrics/HoodieLockMetrics.java            | 110 +++++++++++++++++++++
 .../org/apache/hudi/config/HoodieWriteConfig.java  |   4 +
 .../hudi/config/metrics/HoodieMetricsConfig.java   |  16 +++
 .../org/apache/hudi/metrics/HoodieMetrics.java     |  40 ++++++++
 .../apache/hudi/client/SparkRDDWriteClient.java    |  16 ++-
 6 files changed, 197 insertions(+), 5 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java
index 305b042bc2..2c5a884638 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java
@@ -18,9 +18,7 @@
 
 package org.apache.hudi.client.transaction.lock;
 
-import java.io.Serializable;
-import java.util.concurrent.TimeUnit;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.hudi.client.transaction.lock.metrics.HoodieLockMetrics;
 import org.apache.hudi.common.config.LockConfiguration;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.lock.LockProvider;
@@ -28,9 +26,14 @@ import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.config.HoodieLockConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieLockException;
+
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+
 import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY;
 import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY;
 
@@ -45,6 +48,7 @@ public class LockManager implements Serializable, AutoCloseable {
   private final SerializableConfiguration hadoopConf;
   private final int maxRetries;
   private final long maxWaitTimeInMs;
+  private transient HoodieLockMetrics metrics;
   private volatile LockProvider lockProvider;
 
   public LockManager(HoodieWriteConfig writeConfig, FileSystem fs) {
@@ -55,6 +59,7 @@ public class LockManager implements Serializable, AutoCloseable {
         Integer.parseInt(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.defaultValue()));
     maxWaitTimeInMs = lockConfiguration.getConfig().getLong(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
         Long.parseLong(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.defaultValue()));
+    metrics = new HoodieLockMetrics(writeConfig);
   }
 
   public void lock() {
@@ -64,13 +69,17 @@ public class LockManager implements Serializable, AutoCloseable {
       boolean acquired = false;
       while (retryCount <= maxRetries) {
         try {
+          metrics.startLockApiTimerContext();
           acquired = lockProvider.tryLock(writeConfig.getLockAcquireWaitTimeoutInMs(), TimeUnit.MILLISECONDS);
           if (acquired) {
+            metrics.updateLockAcquiredMetric();
             break;
           }
+          metrics.updateLockNotAcquiredMetric();
           LOG.info("Retrying to acquire lock...");
           Thread.sleep(maxWaitTimeInMs);
         } catch (HoodieLockException | InterruptedException e) {
+          metrics.updateLockNotAcquiredMetric();
           if (retryCount >= maxRetries) {
             throw new HoodieLockException("Unable to acquire lock, lock object " + lockProvider.getLock(), e);
           }
@@ -96,6 +105,7 @@ public class LockManager implements Serializable, AutoCloseable {
   public void unlock() {
     if (writeConfig.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
       getLockProvider().unlock();
+      metrics.updateLockHeldTimerMetrics();
     }
   }
 
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java
new file mode 100644
index 0000000000..6ea7a1ae14
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock.metrics;
+
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metrics.Metrics;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.SlidingWindowReservoir;
+import com.codahale.metrics.Timer;
+
+import java.util.concurrent.TimeUnit;
+
+public class HoodieLockMetrics {
+
+  public static final String LOCK_ACQUIRE_ATTEMPTS_COUNTER_NAME = "lock.acquire.attempts";
+  public static final String LOCK_ACQUIRE_SUCCESS_COUNTER_NAME = "lock.acquire.success";
+  public static final String LOCK_ACQUIRE_FAILURES_COUNTER_NAME = "lock.acquire.failure";
+  public static final String LOCK_ACQUIRE_DURATION_TIMER_NAME = "lock.acquire.duration";
+  public static final String LOCK_REQUEST_LATENCY_TIMER_NAME = "lock.request.latency";
+  private final HoodieWriteConfig writeConfig;
+  private final boolean isMetricsEnabled;
+  private final int keepLastNtimes = 100;
+  private final transient HoodieTimer lockDurationTimer = HoodieTimer.create();
+  private final transient HoodieTimer lockApiRequestDurationTimer = HoodieTimer.create();
+  private transient Counter lockAttempts;
+  private transient Counter successfulLockAttempts;
+  private transient Counter failedLockAttempts;
+  private transient Timer lockDuration;
+  private transient Timer lockApiRequestDuration;
+
+  public HoodieLockMetrics(HoodieWriteConfig writeConfig) {
+    this.isMetricsEnabled = writeConfig.isLockingMetricsEnabled();
+    this.writeConfig = writeConfig;
+
+    if (isMetricsEnabled) {
+      MetricRegistry registry = Metrics.getInstance().getRegistry();
+
+      lockAttempts = registry.counter(getMetricsName(LOCK_ACQUIRE_ATTEMPTS_COUNTER_NAME));
+      successfulLockAttempts = registry.counter(getMetricsName(LOCK_ACQUIRE_SUCCESS_COUNTER_NAME));
+      failedLockAttempts = registry.counter(getMetricsName(LOCK_ACQUIRE_FAILURES_COUNTER_NAME));
+
+      lockDuration = createTimerForMetrics(registry, LOCK_ACQUIRE_DURATION_TIMER_NAME);
+      lockApiRequestDuration = createTimerForMetrics(registry, LOCK_REQUEST_LATENCY_TIMER_NAME);
+    }
+  }
+
+  private String getMetricsName(String metric) {
+    return writeConfig == null ? null : String.format("%s.%s", writeConfig.getMetricReporterMetricsNamePrefix(), metric);
+  }
+
+  private Timer createTimerForMetrics(MetricRegistry registry, String metric) {
+    String metricName = getMetricsName(metric);
+    if (registry.getMetrics().get(metricName) == null) {
+      lockDuration = new Timer(new SlidingWindowReservoir(keepLastNtimes));
+      registry.register(metricName, lockDuration);
+      return lockDuration;
+    }
+    return (Timer) registry.getMetrics().get(metricName);
+  }
+
+  public void startLockApiTimerContext() {
+    if (isMetricsEnabled) {
+      lockApiRequestDurationTimer.startTimer();
+    }
+  }
+
+  public void updateLockAcquiredMetric() {
+    if (isMetricsEnabled) {
+      long durationMs = lockApiRequestDurationTimer.endTimer();
+      lockApiRequestDuration.update(durationMs, TimeUnit.MILLISECONDS);
+      lockAttempts.inc();
+      successfulLockAttempts.inc();
+      lockDurationTimer.startTimer();
+    }
+  }
+
+  public void updateLockNotAcquiredMetric() {
+    if (isMetricsEnabled) {
+      long durationMs = lockApiRequestDurationTimer.endTimer();
+      lockApiRequestDuration.update(durationMs, TimeUnit.MILLISECONDS);
+      failedLockAttempts.inc();
+    }
+  }
+
+  public void updateLockHeldTimerMetrics() {
+    if (isMetricsEnabled && lockDurationTimer != null) {
+      long lockDurationInMs = lockDurationTimer.endTimer();
+      lockDuration.update(lockDurationInMs, TimeUnit.MILLISECONDS);
+    }
+  }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 81e3bb134b..55979b481b 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1749,6 +1749,10 @@ public class HoodieWriteConfig extends HoodieConfig {
         getStringOrDefault(HoodieMetricsConfig.EXECUTOR_METRICS_ENABLE, "false"));
   }
 
+  public boolean isLockingMetricsEnabled() {
+    return getBoolean(HoodieMetricsConfig.LOCK_METRICS_ENABLE);
+  }
+
   public MetricsReporterType getMetricsReporterType() {
     return MetricsReporterType.valueOf(getString(HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE));
   }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
index a515eb702b..957b439051 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
@@ -83,6 +83,17 @@ public class HoodieMetricsConfig extends HoodieConfig {
       .sinceVersion("0.7.0")
       .withDocumentation("");
 
+  public static final ConfigProperty<Boolean> LOCK_METRICS_ENABLE = ConfigProperty
+      .key(METRIC_PREFIX + ".lock.enable")
+      .defaultValue(false)
+      .withInferFunction(cfg -> {
+        if (cfg.contains(TURN_METRICS_ON)) {
+          return Option.of(cfg.getBoolean(TURN_METRICS_ON));
+        }
+        return Option.empty();
+      })
+      .withDocumentation("Enable metrics for locking infra. Useful when operating in multiwriter mode");
+
   /**
    * @deprecated Use {@link #TURN_METRICS_ON} and its methods instead
    */
@@ -163,6 +174,11 @@ public class HoodieMetricsConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withLockingMetrics(boolean enable) {
+      hoodieMetricsConfig.setValue(LOCK_METRICS_ENABLE, String.valueOf(enable));
+      return this;
+    }
+
     public HoodieMetricsConfig build() {
 
       hoodieMetricsConfig.setDefaults(HoodieMetricsConfig.class.getName());
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
index d13110feef..69ef7917b2 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 
+import com.codahale.metrics.Counter;
 import com.codahale.metrics.Timer;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -43,6 +44,9 @@ public class HoodieMetrics {
   public String finalizeTimerName = null;
   public String compactionTimerName = null;
   public String indexTimerName = null;
+  private String conflictResolutionTimerName = null;
+  private String conflictResolutionSuccessCounterName = null;
+  private String conflictResolutionFailureCounterName = null;
   private HoodieWriteConfig config;
   private String tableName;
   private Timer rollbackTimer = null;
@@ -53,6 +57,9 @@ public class HoodieMetrics {
   private Timer compactionTimer = null;
   private Timer clusteringTimer = null;
   private Timer indexTimer = null;
+  private Timer conflictResolutionTimer = null;
+  private Counter conflictResolutionSuccessCounter = null;
+  private Counter conflictResolutionFailureCounter = null;
 
   public HoodieMetrics(HoodieWriteConfig config) {
     this.config = config;
@@ -67,6 +74,9 @@ public class HoodieMetrics {
       this.finalizeTimerName = getMetricsName("timer", "finalize");
       this.compactionTimerName = getMetricsName("timer", HoodieTimeline.COMPACTION_ACTION);
       this.indexTimerName = getMetricsName("timer", "index");
+      this.conflictResolutionTimerName = getMetricsName("timer", "conflict_resolution");
+      this.conflictResolutionSuccessCounterName = getMetricsName("counter", "conflict_resolution.success");
+      this.conflictResolutionFailureCounterName = getMetricsName("counter", "conflict_resolution.failure");
     }
   }
 
@@ -130,6 +140,13 @@ public class HoodieMetrics {
     return indexTimer == null ? null : indexTimer.time();
   }
 
+  public Timer.Context getConflictResolutionCtx() {
+    if (config.isLockingMetricsEnabled() && conflictResolutionTimer == null) {
+      conflictResolutionTimer = createTimer(conflictResolutionTimerName);
+    }
+    return conflictResolutionTimer == null ? null : conflictResolutionTimer.time();
+  }
+
   public void updateMetricsForEmptyData(String actionType) {
     if (!config.isMetricsOn() || !config.getMetricsReporterType().equals(MetricsReporterType.PROMETHEUS_PUSHGATEWAY)) {
       // No-op if metrics are not of type PROMETHEUS_PUSHGATEWAY.
@@ -244,4 +261,27 @@ public class HoodieMetrics {
   public long getDurationInMs(long ctxDuration) {
     return ctxDuration / 1000000;
   }
+
+  public void emitConflictResolutionSuccessful() {
+    if (config.isLockingMetricsEnabled()) {
+      LOG.info("Sending conflict resolution success metric");
+      conflictResolutionSuccessCounter = getCounter(conflictResolutionSuccessCounter, conflictResolutionSuccessCounterName);
+      conflictResolutionSuccessCounter.inc();
+    }
+  }
+
+  public void emitConflictResolutionFailed() {
+    if (config.isLockingMetricsEnabled()) {
+      LOG.info("Sending conflict resolution failure metric");
+      conflictResolutionFailureCounter = getCounter(conflictResolutionFailureCounter, conflictResolutionFailureCounterName);
+      conflictResolutionFailureCounter.inc();
+    }
+  }
+
+  private Counter getCounter(Counter counter, String name) {
+    if (counter == null) {
+      return Metrics.getInstance().getRegistry().counter(name);
+    }
+    return counter;
+  }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index 5dfbdadca8..7110e26bb0 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -45,6 +45,7 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.exception.HoodieClusteringException;
+import org.apache.hudi.exception.HoodieWriteConflictException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.SparkHoodieIndexFactory;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
@@ -479,8 +480,19 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
     // Create a Hoodie table after startTxn which encapsulated the commits and files visible.
     // Important to create this after the lock to ensure the latest commits show up in the timeline without need for reload
     HoodieTable table = createTable(config, hadoopConf);
-    TransactionUtils.resolveWriteConflictIfAny(table, this.txnManager.getCurrentTransactionOwner(),
-        Option.of(metadata), config, txnManager.getLastCompletedTransactionOwner(), false, this.pendingInflightAndRequestedInstants);
+    Timer.Context conflictResolutionTimer = metrics.getConflictResolutionCtx();
+    try {
+      TransactionUtils.resolveWriteConflictIfAny(table, this.txnManager.getCurrentTransactionOwner(),
+          Option.of(metadata), config, txnManager.getLastCompletedTransactionOwner(), false, this.pendingInflightAndRequestedInstants);
+      metrics.emitConflictResolutionSuccessful();
+    } catch (HoodieWriteConflictException e) {
+      metrics.emitConflictResolutionFailed();
+      throw e;
+    } finally {
+      if (conflictResolutionTimer != null) {
+        conflictResolutionTimer.stop();
+      }
+    }
   }
 
   @Override