You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "stevenzwu (via GitHub)" <gi...@apache.org> on 2023/02/07 17:31:26 UTC

[GitHub] [iceberg] stevenzwu opened a new pull request, #6764: Flink: improve metrics (elapsedSecondsSinceLastSuccessfulCommit) and …

stevenzwu opened a new pull request, #6764:
URL: https://github.com/apache/iceberg/pull/6764

   …logging for IcebergFilesCommitter


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] mas-chen commented on a diff in pull request #6764: Flink: improve metrics (elapsedSecondsSinceLastSuccessfulCommit) and …

Posted by "mas-chen (via GitHub)" <gi...@apache.org>.
mas-chen commented on code in PR #6764:
URL: https://github.com/apache/iceberg/pull/6764#discussion_r1099674674


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitterMetrics.java:
##########
@@ -54,12 +60,34 @@ void commitDuration(long commitDurationMs) {
     lastCommitDurationMs.set(commitDurationMs);
   }
 
+  /** This is called upon a successful commit. */
   void updateCommitSummary(CommitSummary stats) {
+    elapsedSecondsSinceLastSuccessfulCommit.refreshLastSuccessfulCommitTime();
     committedDataFilesCount.inc(stats.dataFilesCount());
     committedDataFilesRecordCount.inc(stats.dataFilesRecordCount());
     committedDataFilesByteCount.inc(stats.dataFilesByteCount());
     committedDeleteFilesCount.inc(stats.deleteFilesCount());
     committedDeleteFilesRecordCount.inc(stats.deleteFilesRecordCount());
     committedDeleteFilesByteCount.inc(stats.deleteFilesByteCount());
   }
+
+  private static class ElapsedTimeGauge implements Gauge<Long> {
+    private final TimeUnit reportUnit;
+    private volatile long lastSuccessfulCommitTimeNano;
+
+    ElapsedTimeGauge(TimeUnit timeUnit) {
+      this.reportUnit = timeUnit;
+      this.lastSuccessfulCommitTimeNano = System.nanoTime();
+    }
+
+    void refreshLastSuccessfulCommitTime() {
+      this.lastSuccessfulCommitTimeNano = System.nanoTime();
+    }
+
+    @Override
+    public Long getValue() {
+      return reportUnit.convert(

Review Comment:
   No, before the first checkpoint. `lastSuccessfulCommitTimeNano` would be 0 and the delta would be large



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6764: Flink: improve metrics (elapsedSecondsSinceLastSuccessfulCommit) and …

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #6764:
URL: https://github.com/apache/iceberg/pull/6764#discussion_r1100291622


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitterMetrics.java:
##########
@@ -54,12 +60,38 @@ void commitDuration(long commitDurationMs) {
     lastCommitDurationMs.set(commitDurationMs);
   }
 
+  /** This is called upon a successful commit. */
   void updateCommitSummary(CommitSummary stats) {
+    elapsedSecondsSinceLastSuccessfulCommit.refreshLastRecordedTime();
     committedDataFilesCount.inc(stats.dataFilesCount());
     committedDataFilesRecordCount.inc(stats.dataFilesRecordCount());
     committedDataFilesByteCount.inc(stats.dataFilesByteCount());
     committedDeleteFilesCount.inc(stats.deleteFilesCount());
     committedDeleteFilesRecordCount.inc(stats.deleteFilesRecordCount());
     committedDeleteFilesByteCount.inc(stats.deleteFilesByteCount());
   }
+
+  /**
+   * This gauge measures the elapsed time between now and last recorded time
+   * set by {@link ElapsedTimeGauge#refreshLastRecordedTime()}.
+   */
+  private static class ElapsedTimeGauge implements Gauge<Long> {
+    private final TimeUnit reportUnit;
+    private volatile long lastRecordedTimeNano;
+
+    ElapsedTimeGauge(TimeUnit timeUnit) {
+      this.reportUnit = timeUnit;
+      this.lastRecordedTimeNano = System.nanoTime();
+    }
+
+    void refreshLastRecordedTime() {

Review Comment:
   @pvary I renamed this method from `refreshLastSuccessfulCommitTime` to `refreshLastRecordedTime`, as the class `ElapsedTimeGauge` was intended as a more general gauge.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] mas-chen commented on a diff in pull request #6764: Flink: improve metrics (elapsedSecondsSinceLastSuccessfulCommit) and …

Posted by "mas-chen (via GitHub)" <gi...@apache.org>.
mas-chen commented on code in PR #6764:
URL: https://github.com/apache/iceberg/pull/6764#discussion_r1099564976


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitterMetrics.java:
##########
@@ -54,12 +60,34 @@ void commitDuration(long commitDurationMs) {
     lastCommitDurationMs.set(commitDurationMs);
   }
 
+  /** This is called upon a successful commit. */
   void updateCommitSummary(CommitSummary stats) {
+    elapsedSecondsSinceLastSuccessfulCommit.refreshLastSuccessfulCommitTime();
     committedDataFilesCount.inc(stats.dataFilesCount());
     committedDataFilesRecordCount.inc(stats.dataFilesRecordCount());
     committedDataFilesByteCount.inc(stats.dataFilesByteCount());
     committedDeleteFilesCount.inc(stats.deleteFilesCount());
     committedDeleteFilesRecordCount.inc(stats.deleteFilesRecordCount());
     committedDeleteFilesByteCount.inc(stats.deleteFilesByteCount());
   }
+
+  private static class ElapsedTimeGauge implements Gauge<Long> {
+    private final TimeUnit reportUnit;
+    private volatile long lastSuccessfulCommitTimeNano;
+
+    ElapsedTimeGauge(TimeUnit timeUnit) {
+      this.reportUnit = timeUnit;
+      this.lastSuccessfulCommitTimeNano = System.nanoTime();
+    }
+
+    void refreshLastSuccessfulCommitTime() {
+      this.lastSuccessfulCommitTimeNano = System.nanoTime();
+    }
+
+    @Override
+    public Long getValue() {
+      return reportUnit.convert(

Review Comment:
   It's possible that this metric may polled by the metric reporter before a single checkpoint has completed. Can that edge case be handled? Reporting a huge number is going to be confusing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu merged pull request #6764: Flink: improve metrics (elapsedSecondsSinceLastSuccessfulCommit) and …

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu merged PR #6764:
URL: https://github.com/apache/iceberg/pull/6764


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] mas-chen commented on a diff in pull request #6764: Flink: improve metrics (elapsedSecondsSinceLastSuccessfulCommit) and …

Posted by "mas-chen (via GitHub)" <gi...@apache.org>.
mas-chen commented on code in PR #6764:
URL: https://github.com/apache/iceberg/pull/6764#discussion_r1099720211


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitterMetrics.java:
##########
@@ -54,12 +60,34 @@ void commitDuration(long commitDurationMs) {
     lastCommitDurationMs.set(commitDurationMs);
   }
 
+  /** This is called upon a successful commit. */
   void updateCommitSummary(CommitSummary stats) {
+    elapsedSecondsSinceLastSuccessfulCommit.refreshLastSuccessfulCommitTime();
     committedDataFilesCount.inc(stats.dataFilesCount());
     committedDataFilesRecordCount.inc(stats.dataFilesRecordCount());
     committedDataFilesByteCount.inc(stats.dataFilesByteCount());
     committedDeleteFilesCount.inc(stats.deleteFilesCount());
     committedDeleteFilesRecordCount.inc(stats.deleteFilesRecordCount());
     committedDeleteFilesByteCount.inc(stats.deleteFilesByteCount());
   }
+
+  private static class ElapsedTimeGauge implements Gauge<Long> {
+    private final TimeUnit reportUnit;
+    private volatile long lastSuccessfulCommitTimeNano;
+
+    ElapsedTimeGauge(TimeUnit timeUnit) {
+      this.reportUnit = timeUnit;
+      this.lastSuccessfulCommitTimeNano = System.nanoTime();
+    }
+
+    void refreshLastSuccessfulCommitTime() {
+      this.lastSuccessfulCommitTimeNano = System.nanoTime();
+    }
+
+    @Override
+    public Long getValue() {
+      return reportUnit.convert(

Review Comment:
   I missed the constructor init for that variable. Nevermind!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6764: Flink: improve metrics (elapsedSecondsSinceLastSuccessfulCommit) and …

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #6764:
URL: https://github.com/apache/iceberg/pull/6764#discussion_r1099404793


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitterMetrics.java:
##########
@@ -37,6 +40,9 @@ class IcebergFilesCommitterMetrics {
         metrics.addGroup("IcebergFilesCommitter").addGroup("table", fullTableName);
     committerMetrics.gauge("lastCheckpointDurationMs", lastCheckpointDurationMs::get);
     committerMetrics.gauge("lastCommitDurationMs", lastCommitDurationMs::get);
+    this.elapsedSecondsSinceLastSuccessfulCommit = new ElapsedTimeGauge(TimeUnit.SECONDS);

Review Comment:
   PR #6765 for doc update



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] mas-chen commented on a diff in pull request #6764: Flink: improve metrics (elapsedSecondsSinceLastSuccessfulCommit) and …

Posted by "mas-chen (via GitHub)" <gi...@apache.org>.
mas-chen commented on code in PR #6764:
URL: https://github.com/apache/iceberg/pull/6764#discussion_r1099564976


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitterMetrics.java:
##########
@@ -54,12 +60,34 @@ void commitDuration(long commitDurationMs) {
     lastCommitDurationMs.set(commitDurationMs);
   }
 
+  /** This is called upon a successful commit. */
   void updateCommitSummary(CommitSummary stats) {
+    elapsedSecondsSinceLastSuccessfulCommit.refreshLastSuccessfulCommitTime();
     committedDataFilesCount.inc(stats.dataFilesCount());
     committedDataFilesRecordCount.inc(stats.dataFilesRecordCount());
     committedDataFilesByteCount.inc(stats.dataFilesByteCount());
     committedDeleteFilesCount.inc(stats.deleteFilesCount());
     committedDeleteFilesRecordCount.inc(stats.deleteFilesRecordCount());
     committedDeleteFilesByteCount.inc(stats.deleteFilesByteCount());
   }
+
+  private static class ElapsedTimeGauge implements Gauge<Long> {
+    private final TimeUnit reportUnit;
+    private volatile long lastSuccessfulCommitTimeNano;
+
+    ElapsedTimeGauge(TimeUnit timeUnit) {
+      this.reportUnit = timeUnit;
+      this.lastSuccessfulCommitTimeNano = System.nanoTime();
+    }
+
+    void refreshLastSuccessfulCommitTime() {
+      this.lastSuccessfulCommitTimeNano = System.nanoTime();
+    }
+
+    @Override
+    public Long getValue() {
+      return reportUnit.convert(

Review Comment:
   It's possible that this metric may polled by the metric reporter before a single checkpoint has completed. Can that edge case be handled? Reporting a huge number is going to be confusing and cause flaky alerts



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6764: Flink: improve metrics (elapsedSecondsSinceLastSuccessfulCommit) and …

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #6764:
URL: https://github.com/apache/iceberg/pull/6764#discussion_r1099586805


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitterMetrics.java:
##########
@@ -54,12 +60,34 @@ void commitDuration(long commitDurationMs) {
     lastCommitDurationMs.set(commitDurationMs);
   }
 
+  /** This is called upon a successful commit. */
   void updateCommitSummary(CommitSummary stats) {
+    elapsedSecondsSinceLastSuccessfulCommit.refreshLastSuccessfulCommitTime();
     committedDataFilesCount.inc(stats.dataFilesCount());
     committedDataFilesRecordCount.inc(stats.dataFilesRecordCount());
     committedDataFilesByteCount.inc(stats.dataFilesByteCount());
     committedDeleteFilesCount.inc(stats.deleteFilesCount());
     committedDeleteFilesRecordCount.inc(stats.deleteFilesRecordCount());
     committedDeleteFilesByteCount.inc(stats.deleteFilesByteCount());
   }
+
+  private static class ElapsedTimeGauge implements Gauge<Long> {
+    private final TimeUnit reportUnit;
+    private volatile long lastSuccessfulCommitTimeNano;
+
+    ElapsedTimeGauge(TimeUnit timeUnit) {
+      this.reportUnit = timeUnit;
+      this.lastSuccessfulCommitTimeNano = System.nanoTime();
+    }
+
+    void refreshLastSuccessfulCommitTime() {
+      this.lastSuccessfulCommitTimeNano = System.nanoTime();
+    }
+
+    @Override
+    public Long getValue() {
+      return reportUnit.convert(

Review Comment:
   > It's possible that this metric may polled by the metric reporter before a single checkpoint has completed. 
   
   @mas-chen you were referring to the first checkpoint, right? this is not an issue. Note that the report value is a delta value as `System.nanoTime() - lastSuccessfulCommitTimeNano`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6764: Flink: improve metrics (elapsedSecondsSinceLastSuccessfulCommit) and …

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #6764:
URL: https://github.com/apache/iceberg/pull/6764#discussion_r1098992688


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitterMetrics.java:
##########
@@ -37,6 +40,9 @@ class IcebergFilesCommitterMetrics {
         metrics.addGroup("IcebergFilesCommitter").addGroup("table", fullTableName);
     committerMetrics.gauge("lastCheckpointDurationMs", lastCheckpointDurationMs::get);
     committerMetrics.gauge("lastCommitDurationMs", lastCommitDurationMs::get);
+    this.elapsedSecondsSinceLastSuccessfulCommit = new ElapsedTimeGauge(TimeUnit.SECONDS);

Review Comment:
   Iceberg commit happens after a successful checkpoint. Iceberg commits can fail (for whatever reason) while Flink checkpoints succeeded. this metric is very useful to set up alerts to detect consecutive Iceberg commit failures.
   
   Will send a separate PR for doc on FlinkSink metrics. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6764: Flink: improve metrics (elapsedSecondsSinceLastSuccessfulCommit) and …

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #6764:
URL: https://github.com/apache/iceberg/pull/6764#discussion_r1098987976


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java:
##########
@@ -230,6 +230,7 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
     // the files,
     // Besides, we need to maintain the max-committed-checkpoint-id to be increasing.
     if (checkpointId > maxCommittedCheckpointId) {
+      LOG.info("Checkpoint {} completed. Attempting commit.", checkpointId);

Review Comment:
   add those INFO logging for easier navigation of code paths



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a diff in pull request #6764: Flink: improve metrics (elapsedSecondsSinceLastSuccessfulCommit) and …

Posted by "pvary (via GitHub)" <gi...@apache.org>.
pvary commented on code in PR #6764:
URL: https://github.com/apache/iceberg/pull/6764#discussion_r1101006954


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitterMetrics.java:
##########
@@ -54,12 +60,38 @@ void commitDuration(long commitDurationMs) {
     lastCommitDurationMs.set(commitDurationMs);
   }
 
+  /** This is called upon a successful commit. */
   void updateCommitSummary(CommitSummary stats) {
+    elapsedSecondsSinceLastSuccessfulCommit.refreshLastRecordedTime();
     committedDataFilesCount.inc(stats.dataFilesCount());
     committedDataFilesRecordCount.inc(stats.dataFilesRecordCount());
     committedDataFilesByteCount.inc(stats.dataFilesByteCount());
     committedDeleteFilesCount.inc(stats.deleteFilesCount());
     committedDeleteFilesRecordCount.inc(stats.deleteFilesRecordCount());
     committedDeleteFilesByteCount.inc(stats.deleteFilesByteCount());
   }
+
+  /**
+   * This gauge measures the elapsed time between now and last recorded time
+   * set by {@link ElapsedTimeGauge#refreshLastRecordedTime()}.
+   */
+  private static class ElapsedTimeGauge implements Gauge<Long> {
+    private final TimeUnit reportUnit;
+    private volatile long lastRecordedTimeNano;
+
+    ElapsedTimeGauge(TimeUnit timeUnit) {
+      this.reportUnit = timeUnit;
+      this.lastRecordedTimeNano = System.nanoTime();
+    }
+
+    void refreshLastRecordedTime() {

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org