You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/03/30 22:04:14 UTC

[03/50] [abbrv] flink git commit: [FLINK-6170] [metrics] Don't rely on stats snapshot for checkpoint metrics

[FLINK-6170] [metrics] Don't rely on stats snapshot for checkpoint metrics

This closes #3597.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d0695c05
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d0695c05
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d0695c05

Branch: refs/heads/table-retraction
Commit: d0695c054737f18ade0d5c5d95e56202d041fc60
Parents: 64c7b11
Author: Ufuk Celebi <uc...@apache.org>
Authored: Wed Mar 22 18:08:13 2017 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Mar 22 21:20:56 2017 +0100

----------------------------------------------------------------------
 .../checkpoint/CheckpointStatsTracker.java      |  30 +--
 .../checkpoint/CheckpointStatsTrackerTest.java  | 232 +++++++++++++++++--
 2 files changed, 235 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d0695c05/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
index d324c25..c7efb7b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
@@ -18,6 +18,13 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.annotation.Nullable;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Metric;
@@ -26,14 +33,6 @@ import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 
-import javax.annotation.Nullable;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReentrantLock;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * Tracker for checkpoint statistics.
  *
@@ -96,6 +95,10 @@ public class CheckpointStatsTracker {
 	 */
 	private volatile boolean dirty;
 
+	/** The latest completed checkpoint. Used by the latest completed checkpoint metrics. */
+	@Nullable
+	private volatile transient CompletedCheckpointStats latestCompletedCheckpoint;
+
 	/**
 	 * Creates a new checkpoint stats tracker.
 	 *
@@ -241,6 +244,8 @@ public class CheckpointStatsTracker {
 	private void reportCompletedCheckpoint(CompletedCheckpointStats completed) {
 		statsReadWriteLock.lock();
 		try {
+			latestCompletedCheckpoint = completed;
+
 			counts.incrementCompletedCheckpoints();
 			history.replacePendingCheckpointById(completed);
 
@@ -400,7 +405,7 @@ public class CheckpointStatsTracker {
 	private class LatestCompletedCheckpointSizeGauge implements Gauge<Long> {
 		@Override
 		public Long getValue() {
-			CompletedCheckpointStats completed = latestSnapshot.getHistory().getLatestCompletedCheckpoint();
+			CompletedCheckpointStats completed = latestCompletedCheckpoint;
 			if (completed != null) {
 				return completed.getStateSize();
 			} else {
@@ -412,7 +417,7 @@ public class CheckpointStatsTracker {
 	private class LatestCompletedCheckpointDurationGauge implements Gauge<Long> {
 		@Override
 		public Long getValue() {
-			CompletedCheckpointStats completed = latestSnapshot.getHistory().getLatestCompletedCheckpoint();
+			CompletedCheckpointStats completed = latestCompletedCheckpoint;
 			if (completed != null) {
 				return completed.getEndToEndDuration();
 			} else {
@@ -421,11 +426,10 @@ public class CheckpointStatsTracker {
 		}
 	}
 
-
 	private class LatestCompletedCheckpointAlignmentBufferedGauge implements Gauge<Long> {
 		@Override
 		public Long getValue() {
-			CompletedCheckpointStats completed = latestSnapshot.getHistory().getLatestCompletedCheckpoint();
+			CompletedCheckpointStats completed = latestCompletedCheckpoint;;
 			if (completed != null) {
 				return completed.getAlignmentBuffered();
 			} else {
@@ -437,7 +441,7 @@ public class CheckpointStatsTracker {
 	private class LatestCompletedCheckpointExternalPathGauge implements Gauge<String> {
 		@Override
 		public String getValue() {
-			CompletedCheckpointStats completed = latestSnapshot.getHistory().getLatestCompletedCheckpoint();
+			CompletedCheckpointStats completed = latestCompletedCheckpoint;
 			if (completed != null) {
 				return completed.getExternalPath();
 			} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/d0695c05/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
index 7ab71cb..aaf1774 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
@@ -18,18 +18,6 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
-import org.junit.Test;
-
-import java.util.Collections;
-import java.util.Iterator;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
@@ -42,6 +30,23 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.junit.Test;
+
 public class CheckpointStatsTrackerTest {
 
 	/**
@@ -275,10 +280,10 @@ public class CheckpointStatsTrackerTest {
 	}
 
 	/**
-	 * Tests the registered metrics.
+	 * Tests the registration of the checkpoint metrics.
 	 */
 	@Test
-	public void testMetrics() throws Exception {
+	public void testMetricsRegistration() throws Exception {
 		MetricGroup metricGroup = mock(MetricGroup.class);
 
 		ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
@@ -305,6 +310,205 @@ public class CheckpointStatsTrackerTest {
 		verify(metricGroup, times(9)).gauge(any(String.class), any(Gauge.class));
 	}
 
+	/**
+	 * Tests that the metrics are updated properly. We had a bug that required new stats
+	 * snapshots in order to update the metrics.
+	 */
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testMetricsAreUpdated() throws Exception {
+		final Map<String, Gauge<?>> registeredGauges = new HashMap<>();
+
+		MetricGroup metricGroup = new MetricGroup() {
+			@Override
+			public Counter counter(int name) {
+				throw new UnsupportedOperationException("Not expected in this test");
+			}
+
+			@Override
+			public Counter counter(String name) {
+				throw new UnsupportedOperationException("Not expected in this test");
+			}
+
+			@Override
+			public <C extends Counter> C counter(int name, C counter) {
+				throw new UnsupportedOperationException("Not expected in this test");
+			}
+
+			@Override
+			public <C extends Counter> C counter(String name, C counter) {
+				throw new UnsupportedOperationException("Not expected in this test");
+			}
+
+			@Override
+			public <T, G extends Gauge<T>> G gauge(int name, G gauge) {
+				throw new UnsupportedOperationException("Not expected in this test");
+			}
+
+			@Override
+			public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
+				registeredGauges.put(name, gauge);
+				return gauge;
+			}
+
+			@Override
+			public <H extends Histogram> H histogram(String name, H histogram) {
+				throw new UnsupportedOperationException("Not expected in this test");
+			}
+
+			@Override
+			public <H extends Histogram> H histogram(int name, H histogram) {
+				throw new UnsupportedOperationException("Not expected in this test");
+			}
+
+			@Override
+			public <M extends Meter> M meter(String name, M meter) {
+				throw new UnsupportedOperationException("Not expected in this test");
+			}
+
+			@Override
+			public <M extends Meter> M meter(int name, M meter) {
+				throw new UnsupportedOperationException("Not expected in this test");
+			}
+
+			@Override
+			public MetricGroup addGroup(int name) {
+				throw new UnsupportedOperationException("Not expected in this test");
+			}
+
+			@Override
+			public MetricGroup addGroup(String name) {
+				throw new UnsupportedOperationException("Not expected in this test");
+			}
+
+			@Override
+			public String[] getScopeComponents() {
+				throw new UnsupportedOperationException("Not expected in this test");
+			}
+
+			@Override
+			public Map<String, String> getAllVariables() {
+				throw new UnsupportedOperationException("Not expected in this test");
+			}
+
+			@Override
+			public String getMetricIdentifier(String metricName) {
+				throw new UnsupportedOperationException("Not expected in this test");
+			}
+
+			@Override
+			public String getMetricIdentifier(String metricName, CharacterFilter filter) {
+				throw new UnsupportedOperationException("Not expected in this test");
+			}
+		};
+
+		ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
+		when(jobVertex.getJobVertexId()).thenReturn(new JobVertexID());
+		when(jobVertex.getParallelism()).thenReturn(1);
+
+		CheckpointStatsTracker stats = new CheckpointStatsTracker(
+			0,
+			Collections.singletonList(jobVertex),
+			mock(JobSnapshottingSettings.class),
+			metricGroup);
+
+		// Make sure to adjust this test if metrics are added/removed
+		assertEquals(9, registeredGauges.size());
+
+		// Check initial values
+		Gauge<Long> numCheckpoints = (Gauge<Long>) registeredGauges.get(CheckpointStatsTracker.NUMBER_OF_CHECKPOINTS_METRIC);
+		Gauge<Integer> numInProgressCheckpoints = (Gauge<Integer>) registeredGauges.get(CheckpointStatsTracker.NUMBER_OF_IN_PROGRESS_CHECKPOINTS_METRIC);
+		Gauge<Long> numCompletedCheckpoints = (Gauge<Long>) registeredGauges.get(CheckpointStatsTracker.NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC);
+		Gauge<Long> numFailedCheckpoints = (Gauge<Long>) registeredGauges.get(CheckpointStatsTracker.NUMBER_OF_FAILED_CHECKPOINTS_METRIC);
+		Gauge<Long> latestRestoreTimestamp = (Gauge<Long>) registeredGauges.get(CheckpointStatsTracker.LATEST_RESTORED_CHECKPOINT_TIMESTAMP_METRIC);
+		Gauge<Long> latestCompletedSize = (Gauge<Long>) registeredGauges.get(CheckpointStatsTracker.LATEST_COMPLETED_CHECKPOINT_SIZE_METRIC);
+		Gauge<Long> latestCompletedDuration = (Gauge<Long>) registeredGauges.get(CheckpointStatsTracker.LATEST_COMPLETED_CHECKPOINT_DURATION_METRIC);
+		Gauge<Long> latestCompletedAlignmentBuffered = (Gauge<Long>) registeredGauges.get(CheckpointStatsTracker.LATEST_COMPLETED_CHECKPOINT_ALIGNMENT_BUFFERED_METRIC);
+		Gauge<String> latestCompletedExternalPath = (Gauge<String>) registeredGauges.get(CheckpointStatsTracker.LATEST_COMPLETED_CHECKPOINT_EXTERNAL_PATH_METRIC);
+
+		assertEquals(Long.valueOf(0), numCheckpoints.getValue());
+		assertEquals(Integer.valueOf(0), numInProgressCheckpoints.getValue());
+		assertEquals(Long.valueOf(0), numCompletedCheckpoints.getValue());
+		assertEquals(Long.valueOf(0), numFailedCheckpoints.getValue());
+		assertEquals(Long.valueOf(-1), latestRestoreTimestamp.getValue());
+		assertEquals(Long.valueOf(-1), latestCompletedSize.getValue());
+		assertEquals(Long.valueOf(-1), latestCompletedDuration.getValue());
+		assertEquals(Long.valueOf(-1), latestCompletedAlignmentBuffered.getValue());
+		assertEquals("n/a", latestCompletedExternalPath.getValue());
+
+		PendingCheckpointStats pending = stats.reportPendingCheckpoint(
+			0,
+			0,
+			CheckpointProperties.forStandardCheckpoint());
+
+		// Check counts
+		assertEquals(Long.valueOf(1), numCheckpoints.getValue());
+		assertEquals(Integer.valueOf(1), numInProgressCheckpoints.getValue());
+		assertEquals(Long.valueOf(0), numCompletedCheckpoints.getValue());
+		assertEquals(Long.valueOf(0), numFailedCheckpoints.getValue());
+
+		long ackTimestamp = 11231230L;
+		long stateSize = 12381238L;
+		long ignored = 0;
+		long alignmenetBuffered = 182812L;
+		String externalPath = "myexternalpath";
+
+		SubtaskStateStats subtaskStats = new SubtaskStateStats(
+			0,
+			ackTimestamp,
+			stateSize,
+			ignored,
+			ignored,
+			alignmenetBuffered,
+			ignored);
+
+		assertTrue(pending.reportSubtaskStats(jobVertex.getJobVertexId(), subtaskStats));
+
+		pending.reportCompletedCheckpoint(externalPath);
+
+		// Verify completed checkpoint updated
+		assertEquals(Long.valueOf(1), numCheckpoints.getValue());
+		assertEquals(Integer.valueOf(0), numInProgressCheckpoints.getValue());
+		assertEquals(Long.valueOf(1), numCompletedCheckpoints.getValue());
+		assertEquals(Long.valueOf(0), numFailedCheckpoints.getValue());
+		assertEquals(Long.valueOf(-1), latestRestoreTimestamp.getValue());
+		assertEquals(Long.valueOf(stateSize), latestCompletedSize.getValue());
+		assertEquals(Long.valueOf(ackTimestamp), latestCompletedDuration.getValue());
+		assertEquals(Long.valueOf(alignmenetBuffered), latestCompletedAlignmentBuffered.getValue());
+		assertEquals(externalPath, latestCompletedExternalPath.getValue());
+
+		// Check failed
+		PendingCheckpointStats nextPending = stats.reportPendingCheckpoint(
+			1,
+			11,
+			CheckpointProperties.forStandardCheckpoint());
+
+		long failureTimestamp = 1230123L;
+		nextPending.reportFailedCheckpoint(failureTimestamp, null);
+
+		// Verify updated
+		assertEquals(Long.valueOf(2), numCheckpoints.getValue());
+		assertEquals(Integer.valueOf(0), numInProgressCheckpoints.getValue());
+		assertEquals(Long.valueOf(1), numCompletedCheckpoints.getValue());
+		assertEquals(Long.valueOf(1), numFailedCheckpoints.getValue()); // one failed now
+
+		// Check restore
+		long restoreTimestamp = 183419283L;
+		RestoredCheckpointStats restored = new RestoredCheckpointStats(
+			1,
+			CheckpointProperties.forStandardCheckpoint(),
+			restoreTimestamp,
+			null);
+		stats.reportRestoredCheckpoint(restored);
+
+		assertEquals(Long.valueOf(2), numCheckpoints.getValue());
+		assertEquals(Integer.valueOf(0), numInProgressCheckpoints.getValue());
+		assertEquals(Long.valueOf(1), numCompletedCheckpoints.getValue());
+		assertEquals(Long.valueOf(1), numFailedCheckpoints.getValue());
+
+		assertEquals(Long.valueOf(restoreTimestamp), latestRestoreTimestamp.getValue());
+	}
+
 	// ------------------------------------------------------------------------
 
 	/**