You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yu...@apache.org on 2022/09/30 06:39:09 UTC
[hudi] 02/10: [HUDI-4722] Added locking metrics for Hudi (#6502)
This is an automated email from the ASF dual-hosted git repository.
yuzhaojing pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 2e0e8ab7c0f86aadc69bcedbac81b919f9fbde00
Author: jsbali <js...@uber.com>
AuthorDate: Thu Sep 29 11:07:46 2022 +0530
[HUDI-4722] Added locking metrics for Hudi (#6502)
---
.../hudi/client/transaction/lock/LockManager.java | 16 ++-
.../lock/metrics/HoodieLockMetrics.java | 110 +++++++++++++++++++++
.../org/apache/hudi/config/HoodieWriteConfig.java | 4 +
.../hudi/config/metrics/HoodieMetricsConfig.java | 16 +++
.../org/apache/hudi/metrics/HoodieMetrics.java | 40 ++++++++
.../apache/hudi/client/SparkRDDWriteClient.java | 16 ++-
6 files changed, 197 insertions(+), 5 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java
index 305b042bc2..2c5a884638 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java
@@ -18,9 +18,7 @@
package org.apache.hudi.client.transaction.lock;
-import java.io.Serializable;
-import java.util.concurrent.TimeUnit;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.hudi.client.transaction.lock.metrics.HoodieLockMetrics;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.lock.LockProvider;
@@ -28,9 +26,14 @@ import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieLockException;
+
+import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY;
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY;
@@ -45,6 +48,7 @@ public class LockManager implements Serializable, AutoCloseable {
private final SerializableConfiguration hadoopConf;
private final int maxRetries;
private final long maxWaitTimeInMs;
+ private transient HoodieLockMetrics metrics;
private volatile LockProvider lockProvider;
public LockManager(HoodieWriteConfig writeConfig, FileSystem fs) {
@@ -55,6 +59,7 @@ public class LockManager implements Serializable, AutoCloseable {
Integer.parseInt(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.defaultValue()));
maxWaitTimeInMs = lockConfiguration.getConfig().getLong(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
Long.parseLong(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.defaultValue()));
+ metrics = new HoodieLockMetrics(writeConfig);
}
public void lock() {
@@ -64,13 +69,17 @@ public class LockManager implements Serializable, AutoCloseable {
boolean acquired = false;
while (retryCount <= maxRetries) {
try {
+ metrics.startLockApiTimerContext();
acquired = lockProvider.tryLock(writeConfig.getLockAcquireWaitTimeoutInMs(), TimeUnit.MILLISECONDS);
if (acquired) {
+ metrics.updateLockAcquiredMetric();
break;
}
+ metrics.updateLockNotAcquiredMetric();
LOG.info("Retrying to acquire lock...");
Thread.sleep(maxWaitTimeInMs);
} catch (HoodieLockException | InterruptedException e) {
+ metrics.updateLockNotAcquiredMetric();
if (retryCount >= maxRetries) {
throw new HoodieLockException("Unable to acquire lock, lock object " + lockProvider.getLock(), e);
}
@@ -96,6 +105,7 @@ public class LockManager implements Serializable, AutoCloseable {
public void unlock() {
if (writeConfig.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
getLockProvider().unlock();
+ metrics.updateLockHeldTimerMetrics();
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java
new file mode 100644
index 0000000000..6ea7a1ae14
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock.metrics;
+
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metrics.Metrics;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.SlidingWindowReservoir;
+import com.codahale.metrics.Timer;
+
+import java.util.concurrent.TimeUnit;
+
+public class HoodieLockMetrics {
+
+ public static final String LOCK_ACQUIRE_ATTEMPTS_COUNTER_NAME = "lock.acquire.attempts";
+ public static final String LOCK_ACQUIRE_SUCCESS_COUNTER_NAME = "lock.acquire.success";
+ public static final String LOCK_ACQUIRE_FAILURES_COUNTER_NAME = "lock.acquire.failure";
+ public static final String LOCK_ACQUIRE_DURATION_TIMER_NAME = "lock.acquire.duration";
+ public static final String LOCK_REQUEST_LATENCY_TIMER_NAME = "lock.request.latency";
+ private final HoodieWriteConfig writeConfig;
+ private final boolean isMetricsEnabled;
+ private final int keepLastNtimes = 100;
+ private final transient HoodieTimer lockDurationTimer = HoodieTimer.create();
+ private final transient HoodieTimer lockApiRequestDurationTimer = HoodieTimer.create();
+ private transient Counter lockAttempts;
+ private transient Counter successfulLockAttempts;
+ private transient Counter failedLockAttempts;
+ private transient Timer lockDuration;
+ private transient Timer lockApiRequestDuration;
+
+ public HoodieLockMetrics(HoodieWriteConfig writeConfig) {
+ this.isMetricsEnabled = writeConfig.isLockingMetricsEnabled();
+ this.writeConfig = writeConfig;
+
+ if (isMetricsEnabled) {
+ MetricRegistry registry = Metrics.getInstance().getRegistry();
+
+ lockAttempts = registry.counter(getMetricsName(LOCK_ACQUIRE_ATTEMPTS_COUNTER_NAME));
+ successfulLockAttempts = registry.counter(getMetricsName(LOCK_ACQUIRE_SUCCESS_COUNTER_NAME));
+ failedLockAttempts = registry.counter(getMetricsName(LOCK_ACQUIRE_FAILURES_COUNTER_NAME));
+
+ lockDuration = createTimerForMetrics(registry, LOCK_ACQUIRE_DURATION_TIMER_NAME);
+ lockApiRequestDuration = createTimerForMetrics(registry, LOCK_REQUEST_LATENCY_TIMER_NAME);
+ }
+ }
+
+ private String getMetricsName(String metric) {
+ return writeConfig == null ? null : String.format("%s.%s", writeConfig.getMetricReporterMetricsNamePrefix(), metric);
+ }
+
+ private Timer createTimerForMetrics(MetricRegistry registry, String metric) {
+ String metricName = getMetricsName(metric);
+ if (registry.getMetrics().get(metricName) == null) {
+ lockDuration = new Timer(new SlidingWindowReservoir(keepLastNtimes));
+ registry.register(metricName, lockDuration);
+ return lockDuration;
+ }
+ return (Timer) registry.getMetrics().get(metricName);
+ }
+
+ public void startLockApiTimerContext() {
+ if (isMetricsEnabled) {
+ lockApiRequestDurationTimer.startTimer();
+ }
+ }
+
+ public void updateLockAcquiredMetric() {
+ if (isMetricsEnabled) {
+ long durationMs = lockApiRequestDurationTimer.endTimer();
+ lockApiRequestDuration.update(durationMs, TimeUnit.MILLISECONDS);
+ lockAttempts.inc();
+ successfulLockAttempts.inc();
+ lockDurationTimer.startTimer();
+ }
+ }
+
+ public void updateLockNotAcquiredMetric() {
+ if (isMetricsEnabled) {
+ long durationMs = lockApiRequestDurationTimer.endTimer();
+ lockApiRequestDuration.update(durationMs, TimeUnit.MILLISECONDS);
+ failedLockAttempts.inc();
+ }
+ }
+
+ public void updateLockHeldTimerMetrics() {
+ if (isMetricsEnabled && lockDurationTimer != null) {
+ long lockDurationInMs = lockDurationTimer.endTimer();
+ lockDuration.update(lockDurationInMs, TimeUnit.MILLISECONDS);
+ }
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 81e3bb134b..55979b481b 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1749,6 +1749,10 @@ public class HoodieWriteConfig extends HoodieConfig {
getStringOrDefault(HoodieMetricsConfig.EXECUTOR_METRICS_ENABLE, "false"));
}
+ public boolean isLockingMetricsEnabled() {
+ return getBoolean(HoodieMetricsConfig.LOCK_METRICS_ENABLE);
+ }
+
public MetricsReporterType getMetricsReporterType() {
return MetricsReporterType.valueOf(getString(HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE));
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
index a515eb702b..957b439051 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
@@ -83,6 +83,17 @@ public class HoodieMetricsConfig extends HoodieConfig {
.sinceVersion("0.7.0")
.withDocumentation("");
+ public static final ConfigProperty<Boolean> LOCK_METRICS_ENABLE = ConfigProperty
+ .key(METRIC_PREFIX + ".lock.enable")
+ .defaultValue(false)
+ .withInferFunction(cfg -> {
+ if (cfg.contains(TURN_METRICS_ON)) {
+ return Option.of(cfg.getBoolean(TURN_METRICS_ON));
+ }
+ return Option.empty();
+ })
+ .withDocumentation("Enable metrics for locking infra. Useful when operating in multiwriter mode");
+
/**
* @deprecated Use {@link #TURN_METRICS_ON} and its methods instead
*/
@@ -163,6 +174,11 @@ public class HoodieMetricsConfig extends HoodieConfig {
return this;
}
+ public Builder withLockingMetrics(boolean enable) {
+ hoodieMetricsConfig.setValue(LOCK_METRICS_ENABLE, String.valueOf(enable));
+ return this;
+ }
+
public HoodieMetricsConfig build() {
hoodieMetricsConfig.setDefaults(HoodieMetricsConfig.class.getName());
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
index d13110feef..69ef7917b2 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
+import com.codahale.metrics.Counter;
import com.codahale.metrics.Timer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -43,6 +44,9 @@ public class HoodieMetrics {
public String finalizeTimerName = null;
public String compactionTimerName = null;
public String indexTimerName = null;
+ private String conflictResolutionTimerName = null;
+ private String conflictResolutionSuccessCounterName = null;
+ private String conflictResolutionFailureCounterName = null;
private HoodieWriteConfig config;
private String tableName;
private Timer rollbackTimer = null;
@@ -53,6 +57,9 @@ public class HoodieMetrics {
private Timer compactionTimer = null;
private Timer clusteringTimer = null;
private Timer indexTimer = null;
+ private Timer conflictResolutionTimer = null;
+ private Counter conflictResolutionSuccessCounter = null;
+ private Counter conflictResolutionFailureCounter = null;
public HoodieMetrics(HoodieWriteConfig config) {
this.config = config;
@@ -67,6 +74,9 @@ public class HoodieMetrics {
this.finalizeTimerName = getMetricsName("timer", "finalize");
this.compactionTimerName = getMetricsName("timer", HoodieTimeline.COMPACTION_ACTION);
this.indexTimerName = getMetricsName("timer", "index");
+ this.conflictResolutionTimerName = getMetricsName("timer", "conflict_resolution");
+ this.conflictResolutionSuccessCounterName = getMetricsName("counter", "conflict_resolution.success");
+ this.conflictResolutionFailureCounterName = getMetricsName("counter", "conflict_resolution.failure");
}
}
@@ -130,6 +140,13 @@ public class HoodieMetrics {
return indexTimer == null ? null : indexTimer.time();
}
+ public Timer.Context getConflictResolutionCtx() {
+ if (config.isLockingMetricsEnabled() && conflictResolutionTimer == null) {
+ conflictResolutionTimer = createTimer(conflictResolutionTimerName);
+ }
+ return conflictResolutionTimer == null ? null : conflictResolutionTimer.time();
+ }
+
public void updateMetricsForEmptyData(String actionType) {
if (!config.isMetricsOn() || !config.getMetricsReporterType().equals(MetricsReporterType.PROMETHEUS_PUSHGATEWAY)) {
// No-op if metrics are not of type PROMETHEUS_PUSHGATEWAY.
@@ -244,4 +261,27 @@ public class HoodieMetrics {
public long getDurationInMs(long ctxDuration) {
return ctxDuration / 1000000;
}
+
+ public void emitConflictResolutionSuccessful() {
+ if (config.isLockingMetricsEnabled()) {
+ LOG.info("Sending conflict resolution success metric");
+ conflictResolutionSuccessCounter = getCounter(conflictResolutionSuccessCounter, conflictResolutionSuccessCounterName);
+ conflictResolutionSuccessCounter.inc();
+ }
+ }
+
+ public void emitConflictResolutionFailed() {
+ if (config.isLockingMetricsEnabled()) {
+ LOG.info("Sending conflict resolution failure metric");
+ conflictResolutionFailureCounter = getCounter(conflictResolutionFailureCounter, conflictResolutionFailureCounterName);
+ conflictResolutionFailureCounter.inc();
+ }
+ }
+
+ private Counter getCounter(Counter counter, String name) {
+ if (counter == null) {
+ return Metrics.getInstance().getRegistry().counter(name);
+ }
+ return counter;
+ }
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index 5dfbdadca8..7110e26bb0 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -45,6 +45,7 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieClusteringException;
+import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.SparkHoodieIndexFactory;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
@@ -479,8 +480,19 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
// Create a Hoodie table after startTxn which encapsulated the commits and files visible.
// Important to create this after the lock to ensure the latest commits show up in the timeline without need for reload
HoodieTable table = createTable(config, hadoopConf);
- TransactionUtils.resolveWriteConflictIfAny(table, this.txnManager.getCurrentTransactionOwner(),
- Option.of(metadata), config, txnManager.getLastCompletedTransactionOwner(), false, this.pendingInflightAndRequestedInstants);
+ Timer.Context conflictResolutionTimer = metrics.getConflictResolutionCtx();
+ try {
+ TransactionUtils.resolveWriteConflictIfAny(table, this.txnManager.getCurrentTransactionOwner(),
+ Option.of(metadata), config, txnManager.getLastCompletedTransactionOwner(), false, this.pendingInflightAndRequestedInstants);
+ metrics.emitConflictResolutionSuccessful();
+ } catch (HoodieWriteConflictException e) {
+ metrics.emitConflictResolutionFailed();
+ throw e;
+ } finally {
+ if (conflictResolutionTimer != null) {
+ conflictResolutionTimer.stop();
+ }
+ }
}
@Override