You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2019/09/16 18:12:57 UTC

[incubator-hudi] branch master updated: [HUDI-62] Index Lookup Timer added to HoodieWriteClient

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c0f42af  [HUDI-62] Index Lookup Timer added to HoodieWriteClient
c0f42af is described below

commit c0f42afa35768cf191dcc061feac2e7396457058
Author: Taher Koitawala <ta...@gmail.com>
AuthorDate: Tue Sep 3 20:35:42 2019 +0530

    [HUDI-62] Index Lookup Timer added to HoodieWriteClient
---
 .../java/org/apache/hudi/HoodieWriteClient.java     | 16 ++++++++++++++--
 .../java/org/apache/hudi/metrics/HoodieMetrics.java | 21 ++++++++++++++++++++-
 2 files changed, 34 insertions(+), 3 deletions(-)

diff --git a/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java
index e6b3152..4a84821 100644
--- a/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java
@@ -101,6 +101,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
   private final transient HoodieIndex<T> index;
   private transient Timer.Context writeContext = null;
   private transient Timer.Context compactionTimer;
+  private transient Timer.Context indexTimer = null;
 
   /**
    * @param jsc
@@ -152,8 +153,11 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
     // Create a Hoodie table which encapsulated the commits and files visible
     HoodieTable<T> table = HoodieTable.getHoodieTable(
         createMetaClient(true), config, jsc);
-
+    indexTimer = metrics.getIndexCtx();
     JavaRDD<HoodieRecord<T>> recordsWithLocation = index.tagLocation(hoodieRecords, jsc, table);
+    metrics.updateIndexMetrics("lookup", metrics.getDurationInMs(indexTimer == null ? 0L :
+        indexTimer.stop()));
+    indexTimer = null;
     return recordsWithLocation.filter(v1 -> !v1.isCurrentLocationKnown());
   }
 
@@ -167,8 +171,12 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
       JavaRDD<HoodieRecord<T>> dedupedRecords = combineOnCondition(
           config.shouldCombineBeforeUpsert(), records, config.getUpsertShuffleParallelism());
 
+      indexTimer = metrics.getIndexCtx();
       // perform index loop up to get existing location of records
       JavaRDD<HoodieRecord<T>> taggedRecords = index.tagLocation(dedupedRecords, jsc, table);
+      metrics.updateIndexMetrics("lookup", metrics.getDurationInMs(indexTimer == null ? 0L :
+          indexTimer.stop()));
+      indexTimer = null;
       return upsertRecordsInternal(taggedRecords, commitTime, table, true);
     } catch (Throwable e) {
       if (e instanceof HoodieUpsertException) {
@@ -468,8 +476,12 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
     // cache writeStatusRDD before updating index, so that all actions before this are not triggered again for future
     // RDD actions that are performed after updating the index.
     writeStatusRDD = writeStatusRDD.persist(config.getWriteStatusStorageLevel());
+    indexTimer = metrics.getIndexCtx();
     // Update the index back
     JavaRDD<WriteStatus> statuses = index.updateLocation(writeStatusRDD, jsc, table);
+    metrics.updateIndexMetrics("update", metrics.getDurationInMs(indexTimer == null ? 0L :
+        indexTimer.stop()));
+    indexTimer = null;
     // Trigger the insert and collect statuses
     commitOnAutoCommit(commitTime, statuses, table.getMetaClient().getCommitActionType());
     return statuses;
@@ -1410,4 +1422,4 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
     }
   }
 
-}
+}
\ No newline at end of file
diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java b/hudi-client/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
index a564634..c462e2b 100644
--- a/hudi-client/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
+++ b/hudi-client/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
@@ -39,6 +39,7 @@ public class HoodieMetrics {
   public String deltaCommitTimerName = null;
   public String finalizeTimerName = null;
   public String compactionTimerName = null;
+  public String indexTimerName = null;
   private HoodieWriteConfig config = null;
   private String tableName = null;
   private Timer rollbackTimer = null;
@@ -47,6 +48,7 @@ public class HoodieMetrics {
   private Timer deltaCommitTimer = null;
   private Timer finalizeTimer = null;
   private Timer compactionTimer = null;
+  private Timer indexTimer = null;
 
   public HoodieMetrics(HoodieWriteConfig config, String tableName) {
     this.config = config;
@@ -59,6 +61,7 @@ public class HoodieMetrics {
       this.deltaCommitTimerName = getMetricsName("timer", HoodieTimeline.DELTA_COMMIT_ACTION);
       this.finalizeTimerName = getMetricsName("timer", "finalize");
       this.compactionTimerName = getMetricsName("timer", HoodieTimeline.COMPACTION_ACTION);
+      this.indexTimerName = getMetricsName("timer", "index");
     }
   }
 
@@ -108,6 +111,13 @@ public class HoodieMetrics {
     return deltaCommitTimer == null ? null : deltaCommitTimer.time();
   }
 
+  public Timer.Context getIndexCtx() {
+    if (config.isMetricsOn() && indexTimer == null) {
+      indexTimer = createTimer(indexTimerName);
+    }
+    return indexTimer == null ? null : indexTimer.time();
+  }
+
   public void updateCommitMetrics(long commitEpochTimeInMs, long durationInMs,
       HoodieCommitMetadata metadata, String actionType) {
     if (config.isMetricsOn()) {
@@ -172,6 +182,15 @@ public class HoodieMetrics {
     }
   }
 
+  public void updateIndexMetrics(final String action,final long durationInMs) {
+    if (config.isMetricsOn()) {
+      logger.info(String
+          .format("Sending index metrics (%s.duration, %d)",action, durationInMs));
+      Metrics.registerGauge(getMetricsName("index", String.format("%s.duration", action)),
+          durationInMs);
+    }
+  }
+
   @VisibleForTesting
   String getMetricsName(String action, String metric) {
     return config == null ? null : String.format("%s.%s.%s", tableName, action, metric);
@@ -183,4 +202,4 @@ public class HoodieMetrics {
   public long getDurationInMs(long ctxDuration) {
     return ctxDuration / 1000000;
   }
-}
+}
\ No newline at end of file