You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/22 11:51:31 UTC

[3/4] flink git commit: [FLINK-5763] [checkpoints] Add isSavepoint() to CheckpointProperties

[FLINK-5763] [checkpoints] Add isSavepoint() to CheckpointProperties

This closes #3345


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fcc1efcb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fcc1efcb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fcc1efcb

Branch: refs/heads/master
Commit: fcc1efcb05bce13e435946107a842727b1e3ee20
Parents: 1cb8cde
Author: Ufuk Celebi <uc...@apache.org>
Authored: Thu Feb 16 16:52:32 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 22 12:14:55 2017 +0100

----------------------------------------------------------------------
 .../CheckpointStatsDetailsHandler.java          |  3 +-
 .../checkpoints/CheckpointStatsHandler.java     |  5 ++-
 .../CheckpointStatsDetailsHandlerTest.java      |  6 ++--
 .../checkpoints/CheckpointStatsHandlerTest.java |  8 ++---
 .../checkpoint/CheckpointProperties.java        | 32 ++++++++++----------
 .../checkpoint/CheckpointStatsHistory.java      |  2 +-
 .../checkpoint/CheckpointPropertiesTest.java    |  8 ++---
 .../checkpoint/CheckpointStatsHistoryTest.java  |  3 ++
 8 files changed, 34 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fcc1efcb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
index 33d6cf7..d461f03 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
-import org.apache.flink.runtime.checkpoint.CheckpointProperties;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
 import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
@@ -80,7 +79,7 @@ public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequest
 
 		gen.writeNumberField("id", checkpoint.getCheckpointId());
 		gen.writeStringField("status", checkpoint.getStatus().toString());
-		gen.writeBooleanField("is_savepoint", CheckpointProperties.isSavepoint(checkpoint.getProperties()));
+		gen.writeBooleanField("is_savepoint", checkpoint.getProperties().isSavepoint());
 		gen.writeNumberField("trigger_timestamp", checkpoint.getTriggerTimestamp());
 		gen.writeNumberField("latest_ack_timestamp", checkpoint.getLatestAckTimestamp());
 		gen.writeNumberField("state_size", checkpoint.getStateSize());

http://git-wip-us.apache.org/repos/asf/flink/blob/fcc1efcb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
index 8aab5fa..404b2c7 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
-import org.apache.flink.runtime.checkpoint.CheckpointProperties;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
@@ -170,7 +169,7 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
 			gen.writeObjectFieldStart("restored");
 			gen.writeNumberField("id", restored.getCheckpointId());
 			gen.writeNumberField("restore_timestamp", restored.getRestoreTimestamp());
-			gen.writeBooleanField("is_savepoint", CheckpointProperties.isSavepoint(restored.getProperties()));
+			gen.writeBooleanField("is_savepoint", restored.getProperties().isSavepoint());
 
 			String externalPath = restored.getExternalPath();
 			if (externalPath != null) {
@@ -197,7 +196,7 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
 			gen.writeStartObject();
 			gen.writeNumberField("id", checkpoint.getCheckpointId());
 			gen.writeStringField("status", checkpoint.getStatus().toString());
-			gen.writeBooleanField("is_savepoint", CheckpointProperties.isSavepoint(checkpoint.getProperties()));
+			gen.writeBooleanField("is_savepoint", checkpoint.getProperties().isSavepoint());
 			gen.writeNumberField("trigger_timestamp", checkpoint.getTriggerTimestamp());
 			gen.writeNumberField("latest_ack_timestamp", checkpoint.getLatestAckTimestamp());
 			gen.writeNumberField("state_size", checkpoint.getStateSize());

http://git-wip-us.apache.org/repos/asf/flink/blob/fcc1efcb/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
index fb5cfc5..dfad46d 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
@@ -128,7 +128,7 @@ public class CheckpointStatsDetailsHandlerTest {
 
 		assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong());
 		assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText());
-		assertEquals(CheckpointProperties.isSavepoint(checkpoint.getProperties()), rootNode.get("is_savepoint").asBoolean());
+		assertEquals(checkpoint.getProperties().isSavepoint(), rootNode.get("is_savepoint").asBoolean());
 		assertEquals(checkpoint.getTriggerTimestamp(), rootNode.get("trigger_timestamp").asLong());
 		assertEquals(checkpoint.getLatestAckTimestamp(), rootNode.get("latest_ack_timestamp").asLong());
 		assertEquals(checkpoint.getStateSize(), rootNode.get("state_size").asLong());
@@ -172,7 +172,7 @@ public class CheckpointStatsDetailsHandlerTest {
 
 		assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong());
 		assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText());
-		assertEquals(CheckpointProperties.isSavepoint(checkpoint.getProperties()), rootNode.get("is_savepoint").asBoolean());
+		assertEquals(checkpoint.getProperties().isSavepoint(), rootNode.get("is_savepoint").asBoolean());
 		assertEquals(checkpoint.getTriggerTimestamp(), rootNode.get("trigger_timestamp").asLong());
 		assertEquals(checkpoint.getLatestAckTimestamp(), rootNode.get("latest_ack_timestamp").asLong());
 		assertEquals(checkpoint.getStateSize(), rootNode.get("state_size").asLong());
@@ -218,7 +218,7 @@ public class CheckpointStatsDetailsHandlerTest {
 
 		assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong());
 		assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText());
-		assertEquals(CheckpointProperties.isSavepoint(checkpoint.getProperties()), rootNode.get("is_savepoint").asBoolean());
+		assertEquals(checkpoint.getProperties().isSavepoint(), rootNode.get("is_savepoint").asBoolean());
 		assertEquals(checkpoint.getTriggerTimestamp(), rootNode.get("trigger_timestamp").asLong());
 		assertEquals(checkpoint.getLatestAckTimestamp(), rootNode.get("latest_ack_timestamp").asLong());
 		assertEquals(checkpoint.getStateSize(), rootNode.get("state_size").asLong());

http://git-wip-us.apache.org/repos/asf/flink/blob/fcc1efcb/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
index 23a1900..29f3ae9 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
@@ -241,7 +241,7 @@ public class CheckpointStatsHandlerTest {
 		JsonNode latestRestoredNode = latestNode.get("restored");
 		assertEquals(latestRestored.getCheckpointId(), latestRestoredNode.get("id").asLong());
 		assertEquals(latestRestored.getRestoreTimestamp(), latestRestoredNode.get("restore_timestamp").asLong());
-		assertEquals(CheckpointProperties.isSavepoint(latestRestored.getProperties()), latestRestoredNode.get("is_savepoint").asBoolean());
+		assertEquals(latestRestored.getProperties().isSavepoint(), latestRestoredNode.get("is_savepoint").asBoolean());
 		assertEquals(latestRestored.getExternalPath(), latestRestoredNode.get("external_path").asText());
 
 		JsonNode historyNode = rootNode.get("history");
@@ -252,7 +252,7 @@ public class CheckpointStatsHandlerTest {
 
 		assertEquals(inProgress.getCheckpointId(), inProgressNode.get("id").asLong());
 		assertEquals(inProgress.getStatus().toString(), inProgressNode.get("status").asText());
-		assertEquals(CheckpointProperties.isSavepoint(inProgress.getProperties()), inProgressNode.get("is_savepoint").asBoolean());
+		assertEquals(inProgress.getProperties().isSavepoint(), inProgressNode.get("is_savepoint").asBoolean());
 		assertEquals(inProgress.getTriggerTimestamp(), inProgressNode.get("trigger_timestamp").asLong());
 		assertEquals(inProgress.getLatestAckTimestamp(), inProgressNode.get("latest_ack_timestamp").asLong());
 		assertEquals(inProgress.getStateSize(), inProgressNode.get("state_size").asLong());
@@ -266,7 +266,7 @@ public class CheckpointStatsHandlerTest {
 
 		assertEquals(completedSavepoint.getCheckpointId(), completedSavepointNode.get("id").asLong());
 		assertEquals(completedSavepoint.getStatus().toString(), completedSavepointNode.get("status").asText());
-		assertEquals(CheckpointProperties.isSavepoint(completedSavepoint.getProperties()), completedSavepointNode.get("is_savepoint").asBoolean());
+		assertEquals(completedSavepoint.getProperties().isSavepoint(), completedSavepointNode.get("is_savepoint").asBoolean());
 		assertEquals(completedSavepoint.getTriggerTimestamp(), completedSavepointNode.get("trigger_timestamp").asLong());
 		assertEquals(completedSavepoint.getLatestAckTimestamp(), completedSavepointNode.get("latest_ack_timestamp").asLong());
 		assertEquals(completedSavepoint.getStateSize(), completedSavepointNode.get("state_size").asLong());
@@ -283,7 +283,7 @@ public class CheckpointStatsHandlerTest {
 
 		assertEquals(failed.getCheckpointId(), failedNode.get("id").asLong());
 		assertEquals(failed.getStatus().toString(), failedNode.get("status").asText());
-		assertEquals(CheckpointProperties.isSavepoint(failed.getProperties()), failedNode.get("is_savepoint").asBoolean());
+		assertEquals(failed.getProperties().isSavepoint(), failedNode.get("is_savepoint").asBoolean());
 		assertEquals(failed.getTriggerTimestamp(), failedNode.get("trigger_timestamp").asLong());
 		assertEquals(failed.getLatestAckTimestamp(), failedNode.get("latest_ack_timestamp").asLong());
 		assertEquals(failed.getStateSize(), failedNode.get("state_size").asLong());

http://git-wip-us.apache.org/repos/asf/flink/blob/fcc1efcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
index 4d8bab2..45c8a1b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
@@ -86,7 +86,7 @@ public class CheckpointProperties implements Serializable {
 	 * @see CheckpointCoordinator
 	 * @see PendingCheckpoint
 	 */
-	public boolean forceCheckpoint() {
+	boolean forceCheckpoint() {
 		return forced;
 	}
 
@@ -98,7 +98,7 @@ public class CheckpointProperties implements Serializable {
 	 *
 	 * @see PendingCheckpoint
 	 */
-	public boolean externalizeCheckpoint() {
+	boolean externalizeCheckpoint() {
 		return externalize;
 	}
 
@@ -117,7 +117,7 @@ public class CheckpointProperties implements Serializable {
 	 *
 	 * @see CompletedCheckpointStore
 	 */
-	public boolean discardOnSubsumed() {
+	boolean discardOnSubsumed() {
 		return discardSubsumed;
 	}
 
@@ -131,7 +131,7 @@ public class CheckpointProperties implements Serializable {
 	 *
 	 * @see CompletedCheckpointStore
 	 */
-	public boolean discardOnJobFinished() {
+	boolean discardOnJobFinished() {
 		return discardFinished;
 	}
 
@@ -145,7 +145,7 @@ public class CheckpointProperties implements Serializable {
 	 *
 	 * @see CompletedCheckpointStore
 	 */
-	public boolean discardOnJobCancelled() {
+	boolean discardOnJobCancelled() {
 		return discardCancelled;
 	}
 
@@ -159,7 +159,7 @@ public class CheckpointProperties implements Serializable {
 	 *
 	 * @see CompletedCheckpointStore
 	 */
-	public boolean discardOnJobFailed() {
+	boolean discardOnJobFailed() {
 		return discardFailed;
 	}
 
@@ -173,10 +173,19 @@ public class CheckpointProperties implements Serializable {
 	 *
 	 * @see CompletedCheckpointStore
 	 */
-	public boolean discardOnJobSuspended() {
+	boolean discardOnJobSuspended() {
 		return discardSuspended;
 	}
 
+	/**
+	 * Returns whether the checkpoint properties describe a standard savepoint.
+	 *
+	 * @return <code>true</code> if the properties describe a savepoint, <code>false</code> otherwise.
+	 */
+	public boolean isSavepoint() {
+		return this == STANDARD_SAVEPOINT;
+	}
+
 	// ------------------------------------------------------------------------
 
 	@Override
@@ -306,13 +315,4 @@ public class CheckpointProperties implements Serializable {
 		}
 	}
 
-	/**
-	 * Returns whether the checkpoint properties describe a standard savepoint.
-	 *
-	 * @param props Checkpoint properties to check.
-	 * @return <code>true</code> if the properties describe a savepoint, <code>false</code> otherwise.
-	 */
-	public static boolean isSavepoint(CheckpointProperties props) {
-		return STANDARD_SAVEPOINT.equals(props);
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fcc1efcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
index 13ce642..ce14c2d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
@@ -255,7 +255,7 @@ public class CheckpointStatsHistory implements Serializable {
 		// Update the latest checkpoint stats
 		if (completedOrFailed.getStatus().isCompleted()) {
 			CompletedCheckpointStats completed = (CompletedCheckpointStats) completedOrFailed;
-			if (CheckpointProperties.isSavepoint(completed.getProperties()) &&
+			if (completed.getProperties().isSavepoint() &&
 				(latestSavepoint == null ||
 					completed.getCheckpointId() > latestSavepoint.getCheckpointId())) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fcc1efcb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
index fb3bd65..52ac54c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
@@ -93,22 +93,22 @@ public class CheckpointPropertiesTest {
 	public void testIsSavepoint() throws Exception {
 		{
 			CheckpointProperties props = CheckpointProperties.forStandardCheckpoint();
-			assertFalse(CheckpointProperties.isSavepoint(props));
+			assertFalse(props.isSavepoint());
 		}
 
 		{
 			CheckpointProperties props = CheckpointProperties.forExternalizedCheckpoint(true);
-			assertFalse(CheckpointProperties.isSavepoint(props));
+			assertFalse(props.isSavepoint());
 		}
 
 		{
 			CheckpointProperties props = CheckpointProperties.forExternalizedCheckpoint(false);
-			assertFalse(CheckpointProperties.isSavepoint(props));
+			assertFalse(props.isSavepoint());
 		}
 
 		{
 			CheckpointProperties props = CheckpointProperties.forStandardSavepoint();
-			assertTrue(CheckpointProperties.isSavepoint(props));
+			assertTrue(props.isSavepoint());
 		}
 
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/fcc1efcb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
index 7541806..3c373f1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
@@ -175,12 +175,14 @@ public class CheckpointStatsHistoryTest {
 		PendingCheckpointStats pending = mock(PendingCheckpointStats.class);
 		when(pending.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS);
 		when(pending.getCheckpointId()).thenReturn(checkpointId);
+		when(pending.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
 		return pending;
 	}
 
 	private CompletedCheckpointStats createCompletedCheckpointStats(long checkpointId) {
 		CompletedCheckpointStats completed = mock(CompletedCheckpointStats.class);
 		when(completed.getStatus()).thenReturn(CheckpointStatsStatus.COMPLETED);
+		when(completed.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
 		when(completed.getCheckpointId()).thenReturn(checkpointId);
 		return completed;
 	}
@@ -188,6 +190,7 @@ public class CheckpointStatsHistoryTest {
 	private FailedCheckpointStats createFailedCheckpointStats(long checkpointId) {
 		FailedCheckpointStats failed = mock(FailedCheckpointStats.class);
 		when(failed.getStatus()).thenReturn(CheckpointStatsStatus.FAILED);
+		when(failed.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
 		when(failed.getCheckpointId()).thenReturn(checkpointId);
 		return failed;
 	}