You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by st...@apache.org on 2023/02/09 15:56:07 UTC
[iceberg] branch master updated: Flink: improve metrics (elapsedSecondsSinceLastSuccessfulCommit) and logging for IcebergFilesCommitter (#6764)
This is an automated email from the ASF dual-hosted git repository.
stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 99659a15c8 Flink: improve metrics (elapsedSecondsSinceLastSuccessfulCommit) and logging for IcebergFilesCommitter (#6764)
99659a15c8 is described below
commit 99659a15c8b94a6066aaf7bb840feb987eafee76
Author: Steven Zhen Wu <st...@gmail.com>
AuthorDate: Thu Feb 9 07:56:00 2023 -0800
Flink: improve metrics (elapsedSecondsSinceLastSuccessfulCommit) and logging for IcebergFilesCommitter (#6764)
* Flink: improve metrics (elapsedSecondsSinceLastSuccessfulCommit) and logging for IcebergFilesCommitter
---
.../iceberg/flink/sink/IcebergFilesCommitter.java | 3 +++
.../flink/sink/IcebergFilesCommitterMetrics.java | 31 ++++++++++++++++++++++
2 files changed, 34 insertions(+)
diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
index b686a76c98..d8a7bc5cf2 100644
--- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
@@ -230,6 +230,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
// the files,
// Besides, we need to maintain the max-committed-checkpoint-id to be increasing.
if (checkpointId > maxCommittedCheckpointId) {
+ LOG.info("Checkpoint {} completed. Attempting commit.", checkpointId);
commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, operatorUniqueId, checkpointId);
this.maxCommittedCheckpointId = checkpointId;
} else {
@@ -285,6 +286,8 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
commitDeltaTxn(pendingResults, summary, newFlinkJobId, operatorId, checkpointId);
}
continuousEmptyCheckpoints = 0;
+ } else {
+ LOG.info("Skip commit for checkpoint {} due to no data files or delete files.", checkpointId);
}
}
diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitterMetrics.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitterMetrics.java
index edcb3fb495..9de0d6aaa5 100644
--- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitterMetrics.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitterMetrics.java
@@ -18,13 +18,16 @@
*/
package org.apache.iceberg.flink.sink;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
class IcebergFilesCommitterMetrics {
private final AtomicLong lastCheckpointDurationMs = new AtomicLong();
private final AtomicLong lastCommitDurationMs = new AtomicLong();
+ private final ElapsedTimeGauge elapsedSecondsSinceLastSuccessfulCommit;
private final Counter committedDataFilesCount;
private final Counter committedDataFilesRecordCount;
private final Counter committedDataFilesByteCount;
@@ -37,6 +40,9 @@ class IcebergFilesCommitterMetrics {
metrics.addGroup("IcebergFilesCommitter").addGroup("table", fullTableName);
committerMetrics.gauge("lastCheckpointDurationMs", lastCheckpointDurationMs::get);
committerMetrics.gauge("lastCommitDurationMs", lastCommitDurationMs::get);
+ this.elapsedSecondsSinceLastSuccessfulCommit = new ElapsedTimeGauge(TimeUnit.SECONDS);
+ committerMetrics.gauge(
+ "elapsedSecondsSinceLastSuccessfulCommit", elapsedSecondsSinceLastSuccessfulCommit);
this.committedDataFilesCount = committerMetrics.counter("committedDataFilesCount");
this.committedDataFilesRecordCount = committerMetrics.counter("committedDataFilesRecordCount");
this.committedDataFilesByteCount = committerMetrics.counter("committedDataFilesByteCount");
@@ -54,7 +60,9 @@ class IcebergFilesCommitterMetrics {
lastCommitDurationMs.set(commitDurationMs);
}
+ /** This is called upon a successful commit. */
void updateCommitSummary(CommitSummary stats) {
+ elapsedSecondsSinceLastSuccessfulCommit.refreshLastRecordedTime();
committedDataFilesCount.inc(stats.dataFilesCount());
committedDataFilesRecordCount.inc(stats.dataFilesRecordCount());
committedDataFilesByteCount.inc(stats.dataFilesByteCount());
@@ -62,4 +70,27 @@ class IcebergFilesCommitterMetrics {
committedDeleteFilesRecordCount.inc(stats.deleteFilesRecordCount());
committedDeleteFilesByteCount.inc(stats.deleteFilesByteCount());
}
+
+ /**
+ * This gauge measures the elapsed time between now and last recorded time set by {@link
+ * ElapsedTimeGauge#refreshLastRecordedTime()}.
+ */
+ private static class ElapsedTimeGauge implements Gauge<Long> {
+ private final TimeUnit reportUnit;
+ private volatile long lastRecordedTimeNano;
+
+ ElapsedTimeGauge(TimeUnit timeUnit) {
+ this.reportUnit = timeUnit;
+ this.lastRecordedTimeNano = System.nanoTime();
+ }
+
+ void refreshLastRecordedTime() {
+ this.lastRecordedTimeNano = System.nanoTime();
+ }
+
+ @Override
+ public Long getValue() {
+ return reportUnit.convert(System.nanoTime() - lastRecordedTimeNano, TimeUnit.NANOSECONDS);
+ }
+ }
}