You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/03/30 22:04:14 UTC
[03/50] [abbrv] flink git commit: [FLINK-6170] [metrics] Don't rely
on stats snapshot for checkpoint metrics
[FLINK-6170] [metrics] Don't rely on stats snapshot for checkpoint metrics
This closes #3597.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d0695c05
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d0695c05
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d0695c05
Branch: refs/heads/table-retraction
Commit: d0695c054737f18ade0d5c5d95e56202d041fc60
Parents: 64c7b11
Author: Ufuk Celebi <uc...@apache.org>
Authored: Wed Mar 22 18:08:13 2017 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Mar 22 21:20:56 2017 +0100
----------------------------------------------------------------------
.../checkpoint/CheckpointStatsTracker.java | 30 +--
.../checkpoint/CheckpointStatsTrackerTest.java | 232 +++++++++++++++++--
2 files changed, 235 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d0695c05/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
index d324c25..c7efb7b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
@@ -18,6 +18,13 @@
package org.apache.flink.runtime.checkpoint;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Metric;
@@ -26,14 +33,6 @@ import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
-import javax.annotation.Nullable;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReentrantLock;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
/**
* Tracker for checkpoint statistics.
*
@@ -96,6 +95,10 @@ public class CheckpointStatsTracker {
*/
private volatile boolean dirty;
+ /** The latest completed checkpoint. Used by the latest completed checkpoint metrics. */
+ @Nullable
+ private volatile transient CompletedCheckpointStats latestCompletedCheckpoint;
+
/**
* Creates a new checkpoint stats tracker.
*
@@ -241,6 +244,8 @@ public class CheckpointStatsTracker {
private void reportCompletedCheckpoint(CompletedCheckpointStats completed) {
statsReadWriteLock.lock();
try {
+ latestCompletedCheckpoint = completed;
+
counts.incrementCompletedCheckpoints();
history.replacePendingCheckpointById(completed);
@@ -400,7 +405,7 @@ public class CheckpointStatsTracker {
private class LatestCompletedCheckpointSizeGauge implements Gauge<Long> {
@Override
public Long getValue() {
- CompletedCheckpointStats completed = latestSnapshot.getHistory().getLatestCompletedCheckpoint();
+ CompletedCheckpointStats completed = latestCompletedCheckpoint;
if (completed != null) {
return completed.getStateSize();
} else {
@@ -412,7 +417,7 @@ public class CheckpointStatsTracker {
private class LatestCompletedCheckpointDurationGauge implements Gauge<Long> {
@Override
public Long getValue() {
- CompletedCheckpointStats completed = latestSnapshot.getHistory().getLatestCompletedCheckpoint();
+ CompletedCheckpointStats completed = latestCompletedCheckpoint;
if (completed != null) {
return completed.getEndToEndDuration();
} else {
@@ -421,11 +426,10 @@ public class CheckpointStatsTracker {
}
}
-
private class LatestCompletedCheckpointAlignmentBufferedGauge implements Gauge<Long> {
@Override
public Long getValue() {
- CompletedCheckpointStats completed = latestSnapshot.getHistory().getLatestCompletedCheckpoint();
+ CompletedCheckpointStats completed = latestCompletedCheckpoint;;
if (completed != null) {
return completed.getAlignmentBuffered();
} else {
@@ -437,7 +441,7 @@ public class CheckpointStatsTracker {
private class LatestCompletedCheckpointExternalPathGauge implements Gauge<String> {
@Override
public String getValue() {
- CompletedCheckpointStats completed = latestSnapshot.getHistory().getLatestCompletedCheckpoint();
+ CompletedCheckpointStats completed = latestCompletedCheckpoint;
if (completed != null) {
return completed.getExternalPath();
} else {
http://git-wip-us.apache.org/repos/asf/flink/blob/d0695c05/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
index 7ab71cb..aaf1774 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
@@ -18,18 +18,6 @@
package org.apache.flink.runtime.checkpoint;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
-import org.junit.Test;
-
-import java.util.Collections;
-import java.util.Iterator;
-
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
@@ -42,6 +30,23 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.junit.Test;
+
public class CheckpointStatsTrackerTest {
/**
@@ -275,10 +280,10 @@ public class CheckpointStatsTrackerTest {
}
/**
- * Tests the registered metrics.
+ * Tests the registration of the checkpoint metrics.
*/
@Test
- public void testMetrics() throws Exception {
+ public void testMetricsRegistration() throws Exception {
MetricGroup metricGroup = mock(MetricGroup.class);
ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
@@ -305,6 +310,205 @@ public class CheckpointStatsTrackerTest {
verify(metricGroup, times(9)).gauge(any(String.class), any(Gauge.class));
}
+ /**
+ * Tests that the metrics are updated properly. We had a bug that required new stats
+ * snapshots in order to update the metrics.
+ */
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testMetricsAreUpdated() throws Exception {
+ final Map<String, Gauge<?>> registeredGauges = new HashMap<>();
+
+ MetricGroup metricGroup = new MetricGroup() {
+ @Override
+ public Counter counter(int name) {
+ throw new UnsupportedOperationException("Not expected in this test");
+ }
+
+ @Override
+ public Counter counter(String name) {
+ throw new UnsupportedOperationException("Not expected in this test");
+ }
+
+ @Override
+ public <C extends Counter> C counter(int name, C counter) {
+ throw new UnsupportedOperationException("Not expected in this test");
+ }
+
+ @Override
+ public <C extends Counter> C counter(String name, C counter) {
+ throw new UnsupportedOperationException("Not expected in this test");
+ }
+
+ @Override
+ public <T, G extends Gauge<T>> G gauge(int name, G gauge) {
+ throw new UnsupportedOperationException("Not expected in this test");
+ }
+
+ @Override
+ public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
+ registeredGauges.put(name, gauge);
+ return gauge;
+ }
+
+ @Override
+ public <H extends Histogram> H histogram(String name, H histogram) {
+ throw new UnsupportedOperationException("Not expected in this test");
+ }
+
+ @Override
+ public <H extends Histogram> H histogram(int name, H histogram) {
+ throw new UnsupportedOperationException("Not expected in this test");
+ }
+
+ @Override
+ public <M extends Meter> M meter(String name, M meter) {
+ throw new UnsupportedOperationException("Not expected in this test");
+ }
+
+ @Override
+ public <M extends Meter> M meter(int name, M meter) {
+ throw new UnsupportedOperationException("Not expected in this test");
+ }
+
+ @Override
+ public MetricGroup addGroup(int name) {
+ throw new UnsupportedOperationException("Not expected in this test");
+ }
+
+ @Override
+ public MetricGroup addGroup(String name) {
+ throw new UnsupportedOperationException("Not expected in this test");
+ }
+
+ @Override
+ public String[] getScopeComponents() {
+ throw new UnsupportedOperationException("Not expected in this test");
+ }
+
+ @Override
+ public Map<String, String> getAllVariables() {
+ throw new UnsupportedOperationException("Not expected in this test");
+ }
+
+ @Override
+ public String getMetricIdentifier(String metricName) {
+ throw new UnsupportedOperationException("Not expected in this test");
+ }
+
+ @Override
+ public String getMetricIdentifier(String metricName, CharacterFilter filter) {
+ throw new UnsupportedOperationException("Not expected in this test");
+ }
+ };
+
+ ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
+ when(jobVertex.getJobVertexId()).thenReturn(new JobVertexID());
+ when(jobVertex.getParallelism()).thenReturn(1);
+
+ CheckpointStatsTracker stats = new CheckpointStatsTracker(
+ 0,
+ Collections.singletonList(jobVertex),
+ mock(JobSnapshottingSettings.class),
+ metricGroup);
+
+ // Make sure to adjust this test if metrics are added/removed
+ assertEquals(9, registeredGauges.size());
+
+ // Check initial values
+ Gauge<Long> numCheckpoints = (Gauge<Long>) registeredGauges.get(CheckpointStatsTracker.NUMBER_OF_CHECKPOINTS_METRIC);
+ Gauge<Integer> numInProgressCheckpoints = (Gauge<Integer>) registeredGauges.get(CheckpointStatsTracker.NUMBER_OF_IN_PROGRESS_CHECKPOINTS_METRIC);
+ Gauge<Long> numCompletedCheckpoints = (Gauge<Long>) registeredGauges.get(CheckpointStatsTracker.NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC);
+ Gauge<Long> numFailedCheckpoints = (Gauge<Long>) registeredGauges.get(CheckpointStatsTracker.NUMBER_OF_FAILED_CHECKPOINTS_METRIC);
+ Gauge<Long> latestRestoreTimestamp = (Gauge<Long>) registeredGauges.get(CheckpointStatsTracker.LATEST_RESTORED_CHECKPOINT_TIMESTAMP_METRIC);
+ Gauge<Long> latestCompletedSize = (Gauge<Long>) registeredGauges.get(CheckpointStatsTracker.LATEST_COMPLETED_CHECKPOINT_SIZE_METRIC);
+ Gauge<Long> latestCompletedDuration = (Gauge<Long>) registeredGauges.get(CheckpointStatsTracker.LATEST_COMPLETED_CHECKPOINT_DURATION_METRIC);
+ Gauge<Long> latestCompletedAlignmentBuffered = (Gauge<Long>) registeredGauges.get(CheckpointStatsTracker.LATEST_COMPLETED_CHECKPOINT_ALIGNMENT_BUFFERED_METRIC);
+ Gauge<String> latestCompletedExternalPath = (Gauge<String>) registeredGauges.get(CheckpointStatsTracker.LATEST_COMPLETED_CHECKPOINT_EXTERNAL_PATH_METRIC);
+
+ assertEquals(Long.valueOf(0), numCheckpoints.getValue());
+ assertEquals(Integer.valueOf(0), numInProgressCheckpoints.getValue());
+ assertEquals(Long.valueOf(0), numCompletedCheckpoints.getValue());
+ assertEquals(Long.valueOf(0), numFailedCheckpoints.getValue());
+ assertEquals(Long.valueOf(-1), latestRestoreTimestamp.getValue());
+ assertEquals(Long.valueOf(-1), latestCompletedSize.getValue());
+ assertEquals(Long.valueOf(-1), latestCompletedDuration.getValue());
+ assertEquals(Long.valueOf(-1), latestCompletedAlignmentBuffered.getValue());
+ assertEquals("n/a", latestCompletedExternalPath.getValue());
+
+ PendingCheckpointStats pending = stats.reportPendingCheckpoint(
+ 0,
+ 0,
+ CheckpointProperties.forStandardCheckpoint());
+
+ // Check counts
+ assertEquals(Long.valueOf(1), numCheckpoints.getValue());
+ assertEquals(Integer.valueOf(1), numInProgressCheckpoints.getValue());
+ assertEquals(Long.valueOf(0), numCompletedCheckpoints.getValue());
+ assertEquals(Long.valueOf(0), numFailedCheckpoints.getValue());
+
+ long ackTimestamp = 11231230L;
+ long stateSize = 12381238L;
+ long ignored = 0;
+ long alignmenetBuffered = 182812L;
+ String externalPath = "myexternalpath";
+
+ SubtaskStateStats subtaskStats = new SubtaskStateStats(
+ 0,
+ ackTimestamp,
+ stateSize,
+ ignored,
+ ignored,
+ alignmenetBuffered,
+ ignored);
+
+ assertTrue(pending.reportSubtaskStats(jobVertex.getJobVertexId(), subtaskStats));
+
+ pending.reportCompletedCheckpoint(externalPath);
+
+ // Verify completed checkpoint updated
+ assertEquals(Long.valueOf(1), numCheckpoints.getValue());
+ assertEquals(Integer.valueOf(0), numInProgressCheckpoints.getValue());
+ assertEquals(Long.valueOf(1), numCompletedCheckpoints.getValue());
+ assertEquals(Long.valueOf(0), numFailedCheckpoints.getValue());
+ assertEquals(Long.valueOf(-1), latestRestoreTimestamp.getValue());
+ assertEquals(Long.valueOf(stateSize), latestCompletedSize.getValue());
+ assertEquals(Long.valueOf(ackTimestamp), latestCompletedDuration.getValue());
+ assertEquals(Long.valueOf(alignmenetBuffered), latestCompletedAlignmentBuffered.getValue());
+ assertEquals(externalPath, latestCompletedExternalPath.getValue());
+
+ // Check failed
+ PendingCheckpointStats nextPending = stats.reportPendingCheckpoint(
+ 1,
+ 11,
+ CheckpointProperties.forStandardCheckpoint());
+
+ long failureTimestamp = 1230123L;
+ nextPending.reportFailedCheckpoint(failureTimestamp, null);
+
+ // Verify updated
+ assertEquals(Long.valueOf(2), numCheckpoints.getValue());
+ assertEquals(Integer.valueOf(0), numInProgressCheckpoints.getValue());
+ assertEquals(Long.valueOf(1), numCompletedCheckpoints.getValue());
+ assertEquals(Long.valueOf(1), numFailedCheckpoints.getValue()); // one failed now
+
+ // Check restore
+ long restoreTimestamp = 183419283L;
+ RestoredCheckpointStats restored = new RestoredCheckpointStats(
+ 1,
+ CheckpointProperties.forStandardCheckpoint(),
+ restoreTimestamp,
+ null);
+ stats.reportRestoredCheckpoint(restored);
+
+ assertEquals(Long.valueOf(2), numCheckpoints.getValue());
+ assertEquals(Integer.valueOf(0), numInProgressCheckpoints.getValue());
+ assertEquals(Long.valueOf(1), numCompletedCheckpoints.getValue());
+ assertEquals(Long.valueOf(1), numFailedCheckpoints.getValue());
+
+ assertEquals(Long.valueOf(restoreTimestamp), latestRestoreTimestamp.getValue());
+ }
+
// ------------------------------------------------------------------------
/**