You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by st...@apache.org on 2023/02/09 15:56:07 UTC

[iceberg] branch master updated: Flink: improve metrics (elapsedSecondsSinceLastSuccessfulCommit) and logging for IcebergFilesCommitter (#6764)

This is an automated email from the ASF dual-hosted git repository.

stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 99659a15c8 Flink: improve metrics (elapsedSecondsSinceLastSuccessfulCommit) and logging for IcebergFilesCommitter (#6764)
99659a15c8 is described below

commit 99659a15c8b94a6066aaf7bb840feb987eafee76
Author: Steven Zhen Wu <st...@gmail.com>
AuthorDate: Thu Feb 9 07:56:00 2023 -0800

    Flink: improve metrics (elapsedSecondsSinceLastSuccessfulCommit) and logging for IcebergFilesCommitter (#6764)
    
    * Flink: improve metrics (elapsedSecondsSinceLastSuccessfulCommit) and logging for IcebergFilesCommitter
---
 .../iceberg/flink/sink/IcebergFilesCommitter.java  |  3 +++
 .../flink/sink/IcebergFilesCommitterMetrics.java   | 31 ++++++++++++++++++++++
 2 files changed, 34 insertions(+)

diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
index b686a76c98..d8a7bc5cf2 100644
--- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
@@ -230,6 +230,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
     // the files,
     // Besides, we need to maintain the max-committed-checkpoint-id to be increasing.
     if (checkpointId > maxCommittedCheckpointId) {
+      LOG.info("Checkpoint {} completed. Attempting commit.", checkpointId);
       commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, operatorUniqueId, checkpointId);
       this.maxCommittedCheckpointId = checkpointId;
     } else {
@@ -285,6 +286,8 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
         commitDeltaTxn(pendingResults, summary, newFlinkJobId, operatorId, checkpointId);
       }
       continuousEmptyCheckpoints = 0;
+    } else {
+      LOG.info("Skip commit for checkpoint {} due to no data files or delete files.", checkpointId);
     }
   }
 
diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitterMetrics.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitterMetrics.java
index edcb3fb495..9de0d6aaa5 100644
--- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitterMetrics.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitterMetrics.java
@@ -18,13 +18,16 @@
  */
 package org.apache.iceberg.flink.sink;
 
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
 
 class IcebergFilesCommitterMetrics {
   private final AtomicLong lastCheckpointDurationMs = new AtomicLong();
   private final AtomicLong lastCommitDurationMs = new AtomicLong();
+  private final ElapsedTimeGauge elapsedSecondsSinceLastSuccessfulCommit;
   private final Counter committedDataFilesCount;
   private final Counter committedDataFilesRecordCount;
   private final Counter committedDataFilesByteCount;
@@ -37,6 +40,9 @@ class IcebergFilesCommitterMetrics {
         metrics.addGroup("IcebergFilesCommitter").addGroup("table", fullTableName);
     committerMetrics.gauge("lastCheckpointDurationMs", lastCheckpointDurationMs::get);
     committerMetrics.gauge("lastCommitDurationMs", lastCommitDurationMs::get);
+    this.elapsedSecondsSinceLastSuccessfulCommit = new ElapsedTimeGauge(TimeUnit.SECONDS);
+    committerMetrics.gauge(
+        "elapsedSecondsSinceLastSuccessfulCommit", elapsedSecondsSinceLastSuccessfulCommit);
     this.committedDataFilesCount = committerMetrics.counter("committedDataFilesCount");
     this.committedDataFilesRecordCount = committerMetrics.counter("committedDataFilesRecordCount");
     this.committedDataFilesByteCount = committerMetrics.counter("committedDataFilesByteCount");
@@ -54,7 +60,9 @@ class IcebergFilesCommitterMetrics {
     lastCommitDurationMs.set(commitDurationMs);
   }
 
+  /** This is called upon a successful commit. */
   void updateCommitSummary(CommitSummary stats) {
+    elapsedSecondsSinceLastSuccessfulCommit.refreshLastRecordedTime();
     committedDataFilesCount.inc(stats.dataFilesCount());
     committedDataFilesRecordCount.inc(stats.dataFilesRecordCount());
     committedDataFilesByteCount.inc(stats.dataFilesByteCount());
@@ -62,4 +70,27 @@ class IcebergFilesCommitterMetrics {
     committedDeleteFilesRecordCount.inc(stats.deleteFilesRecordCount());
     committedDeleteFilesByteCount.inc(stats.deleteFilesByteCount());
   }
+
+  /**
+   * This gauge measures the elapsed time between now and last recorded time set by {@link
+   * ElapsedTimeGauge#refreshLastRecordedTime()}.
+   */
+  private static class ElapsedTimeGauge implements Gauge<Long> {
+    private final TimeUnit reportUnit;
+    private volatile long lastRecordedTimeNano;
+
+    ElapsedTimeGauge(TimeUnit timeUnit) {
+      this.reportUnit = timeUnit;
+      this.lastRecordedTimeNano = System.nanoTime();
+    }
+
+    void refreshLastRecordedTime() {
+      this.lastRecordedTimeNano = System.nanoTime();
+    }
+
+    @Override
+    public Long getValue() {
+      return reportUnit.convert(System.nanoTime() - lastRecordedTimeNano, TimeUnit.NANOSECONDS);
+    }
+  }
 }