You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/22 11:51:31 UTC
[3/4] flink git commit: [FLINK-5763] [checkpoints] Add isSavepoint()
to CheckpointProperties
[FLINK-5763] [checkpoints] Add isSavepoint() to CheckpointProperties
This closes #3345
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fcc1efcb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fcc1efcb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fcc1efcb
Branch: refs/heads/master
Commit: fcc1efcb05bce13e435946107a842727b1e3ee20
Parents: 1cb8cde
Author: Ufuk Celebi <uc...@apache.org>
Authored: Thu Feb 16 16:52:32 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 22 12:14:55 2017 +0100
----------------------------------------------------------------------
.../CheckpointStatsDetailsHandler.java | 3 +-
.../checkpoints/CheckpointStatsHandler.java | 5 ++-
.../CheckpointStatsDetailsHandlerTest.java | 6 ++--
.../checkpoints/CheckpointStatsHandlerTest.java | 8 ++---
.../checkpoint/CheckpointProperties.java | 32 ++++++++++----------
.../checkpoint/CheckpointStatsHistory.java | 2 +-
.../checkpoint/CheckpointPropertiesTest.java | 8 ++---
.../checkpoint/CheckpointStatsHistoryTest.java | 3 ++
8 files changed, 34 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fcc1efcb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
index 33d6cf7..d461f03 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
-import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
@@ -80,7 +79,7 @@ public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequest
gen.writeNumberField("id", checkpoint.getCheckpointId());
gen.writeStringField("status", checkpoint.getStatus().toString());
- gen.writeBooleanField("is_savepoint", CheckpointProperties.isSavepoint(checkpoint.getProperties()));
+ gen.writeBooleanField("is_savepoint", checkpoint.getProperties().isSavepoint());
gen.writeNumberField("trigger_timestamp", checkpoint.getTriggerTimestamp());
gen.writeNumberField("latest_ack_timestamp", checkpoint.getLatestAckTimestamp());
gen.writeNumberField("state_size", checkpoint.getStateSize());
http://git-wip-us.apache.org/repos/asf/flink/blob/fcc1efcb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
index 8aab5fa..404b2c7 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
-import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
@@ -170,7 +169,7 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
gen.writeObjectFieldStart("restored");
gen.writeNumberField("id", restored.getCheckpointId());
gen.writeNumberField("restore_timestamp", restored.getRestoreTimestamp());
- gen.writeBooleanField("is_savepoint", CheckpointProperties.isSavepoint(restored.getProperties()));
+ gen.writeBooleanField("is_savepoint", restored.getProperties().isSavepoint());
String externalPath = restored.getExternalPath();
if (externalPath != null) {
@@ -197,7 +196,7 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
gen.writeStartObject();
gen.writeNumberField("id", checkpoint.getCheckpointId());
gen.writeStringField("status", checkpoint.getStatus().toString());
- gen.writeBooleanField("is_savepoint", CheckpointProperties.isSavepoint(checkpoint.getProperties()));
+ gen.writeBooleanField("is_savepoint", checkpoint.getProperties().isSavepoint());
gen.writeNumberField("trigger_timestamp", checkpoint.getTriggerTimestamp());
gen.writeNumberField("latest_ack_timestamp", checkpoint.getLatestAckTimestamp());
gen.writeNumberField("state_size", checkpoint.getStateSize());
http://git-wip-us.apache.org/repos/asf/flink/blob/fcc1efcb/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
index fb5cfc5..dfad46d 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
@@ -128,7 +128,7 @@ public class CheckpointStatsDetailsHandlerTest {
assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong());
assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText());
- assertEquals(CheckpointProperties.isSavepoint(checkpoint.getProperties()), rootNode.get("is_savepoint").asBoolean());
+ assertEquals(checkpoint.getProperties().isSavepoint(), rootNode.get("is_savepoint").asBoolean());
assertEquals(checkpoint.getTriggerTimestamp(), rootNode.get("trigger_timestamp").asLong());
assertEquals(checkpoint.getLatestAckTimestamp(), rootNode.get("latest_ack_timestamp").asLong());
assertEquals(checkpoint.getStateSize(), rootNode.get("state_size").asLong());
@@ -172,7 +172,7 @@ public class CheckpointStatsDetailsHandlerTest {
assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong());
assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText());
- assertEquals(CheckpointProperties.isSavepoint(checkpoint.getProperties()), rootNode.get("is_savepoint").asBoolean());
+ assertEquals(checkpoint.getProperties().isSavepoint(), rootNode.get("is_savepoint").asBoolean());
assertEquals(checkpoint.getTriggerTimestamp(), rootNode.get("trigger_timestamp").asLong());
assertEquals(checkpoint.getLatestAckTimestamp(), rootNode.get("latest_ack_timestamp").asLong());
assertEquals(checkpoint.getStateSize(), rootNode.get("state_size").asLong());
@@ -218,7 +218,7 @@ public class CheckpointStatsDetailsHandlerTest {
assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong());
assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText());
- assertEquals(CheckpointProperties.isSavepoint(checkpoint.getProperties()), rootNode.get("is_savepoint").asBoolean());
+ assertEquals(checkpoint.getProperties().isSavepoint(), rootNode.get("is_savepoint").asBoolean());
assertEquals(checkpoint.getTriggerTimestamp(), rootNode.get("trigger_timestamp").asLong());
assertEquals(checkpoint.getLatestAckTimestamp(), rootNode.get("latest_ack_timestamp").asLong());
assertEquals(checkpoint.getStateSize(), rootNode.get("state_size").asLong());
http://git-wip-us.apache.org/repos/asf/flink/blob/fcc1efcb/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
index 23a1900..29f3ae9 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
@@ -241,7 +241,7 @@ public class CheckpointStatsHandlerTest {
JsonNode latestRestoredNode = latestNode.get("restored");
assertEquals(latestRestored.getCheckpointId(), latestRestoredNode.get("id").asLong());
assertEquals(latestRestored.getRestoreTimestamp(), latestRestoredNode.get("restore_timestamp").asLong());
- assertEquals(CheckpointProperties.isSavepoint(latestRestored.getProperties()), latestRestoredNode.get("is_savepoint").asBoolean());
+ assertEquals(latestRestored.getProperties().isSavepoint(), latestRestoredNode.get("is_savepoint").asBoolean());
assertEquals(latestRestored.getExternalPath(), latestRestoredNode.get("external_path").asText());
JsonNode historyNode = rootNode.get("history");
@@ -252,7 +252,7 @@ public class CheckpointStatsHandlerTest {
assertEquals(inProgress.getCheckpointId(), inProgressNode.get("id").asLong());
assertEquals(inProgress.getStatus().toString(), inProgressNode.get("status").asText());
- assertEquals(CheckpointProperties.isSavepoint(inProgress.getProperties()), inProgressNode.get("is_savepoint").asBoolean());
+ assertEquals(inProgress.getProperties().isSavepoint(), inProgressNode.get("is_savepoint").asBoolean());
assertEquals(inProgress.getTriggerTimestamp(), inProgressNode.get("trigger_timestamp").asLong());
assertEquals(inProgress.getLatestAckTimestamp(), inProgressNode.get("latest_ack_timestamp").asLong());
assertEquals(inProgress.getStateSize(), inProgressNode.get("state_size").asLong());
@@ -266,7 +266,7 @@ public class CheckpointStatsHandlerTest {
assertEquals(completedSavepoint.getCheckpointId(), completedSavepointNode.get("id").asLong());
assertEquals(completedSavepoint.getStatus().toString(), completedSavepointNode.get("status").asText());
- assertEquals(CheckpointProperties.isSavepoint(completedSavepoint.getProperties()), completedSavepointNode.get("is_savepoint").asBoolean());
+ assertEquals(completedSavepoint.getProperties().isSavepoint(), completedSavepointNode.get("is_savepoint").asBoolean());
assertEquals(completedSavepoint.getTriggerTimestamp(), completedSavepointNode.get("trigger_timestamp").asLong());
assertEquals(completedSavepoint.getLatestAckTimestamp(), completedSavepointNode.get("latest_ack_timestamp").asLong());
assertEquals(completedSavepoint.getStateSize(), completedSavepointNode.get("state_size").asLong());
@@ -283,7 +283,7 @@ public class CheckpointStatsHandlerTest {
assertEquals(failed.getCheckpointId(), failedNode.get("id").asLong());
assertEquals(failed.getStatus().toString(), failedNode.get("status").asText());
- assertEquals(CheckpointProperties.isSavepoint(failed.getProperties()), failedNode.get("is_savepoint").asBoolean());
+ assertEquals(failed.getProperties().isSavepoint(), failedNode.get("is_savepoint").asBoolean());
assertEquals(failed.getTriggerTimestamp(), failedNode.get("trigger_timestamp").asLong());
assertEquals(failed.getLatestAckTimestamp(), failedNode.get("latest_ack_timestamp").asLong());
assertEquals(failed.getStateSize(), failedNode.get("state_size").asLong());
http://git-wip-us.apache.org/repos/asf/flink/blob/fcc1efcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
index 4d8bab2..45c8a1b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
@@ -86,7 +86,7 @@ public class CheckpointProperties implements Serializable {
* @see CheckpointCoordinator
* @see PendingCheckpoint
*/
- public boolean forceCheckpoint() {
+ boolean forceCheckpoint() {
return forced;
}
@@ -98,7 +98,7 @@ public class CheckpointProperties implements Serializable {
*
* @see PendingCheckpoint
*/
- public boolean externalizeCheckpoint() {
+ boolean externalizeCheckpoint() {
return externalize;
}
@@ -117,7 +117,7 @@ public class CheckpointProperties implements Serializable {
*
* @see CompletedCheckpointStore
*/
- public boolean discardOnSubsumed() {
+ boolean discardOnSubsumed() {
return discardSubsumed;
}
@@ -131,7 +131,7 @@ public class CheckpointProperties implements Serializable {
*
* @see CompletedCheckpointStore
*/
- public boolean discardOnJobFinished() {
+ boolean discardOnJobFinished() {
return discardFinished;
}
@@ -145,7 +145,7 @@ public class CheckpointProperties implements Serializable {
*
* @see CompletedCheckpointStore
*/
- public boolean discardOnJobCancelled() {
+ boolean discardOnJobCancelled() {
return discardCancelled;
}
@@ -159,7 +159,7 @@ public class CheckpointProperties implements Serializable {
*
* @see CompletedCheckpointStore
*/
- public boolean discardOnJobFailed() {
+ boolean discardOnJobFailed() {
return discardFailed;
}
@@ -173,10 +173,19 @@ public class CheckpointProperties implements Serializable {
*
* @see CompletedCheckpointStore
*/
- public boolean discardOnJobSuspended() {
+ boolean discardOnJobSuspended() {
return discardSuspended;
}
+ /**
+ * Returns whether the checkpoint properties describe a standard savepoint.
+ *
+ * @return <code>true</code> if the properties describe a savepoint, <code>false</code> otherwise.
+ */
+ public boolean isSavepoint() {
+ return this == STANDARD_SAVEPOINT;
+ }
+
// ------------------------------------------------------------------------
@Override
@@ -306,13 +315,4 @@ public class CheckpointProperties implements Serializable {
}
}
- /**
- * Returns whether the checkpoint properties describe a standard savepoint.
- *
- * @param props Checkpoint properties to check.
- * @return <code>true</code> if the properties describe a savepoint, <code>false</code> otherwise.
- */
- public static boolean isSavepoint(CheckpointProperties props) {
- return STANDARD_SAVEPOINT.equals(props);
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fcc1efcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
index 13ce642..ce14c2d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
@@ -255,7 +255,7 @@ public class CheckpointStatsHistory implements Serializable {
// Update the latest checkpoint stats
if (completedOrFailed.getStatus().isCompleted()) {
CompletedCheckpointStats completed = (CompletedCheckpointStats) completedOrFailed;
- if (CheckpointProperties.isSavepoint(completed.getProperties()) &&
+ if (completed.getProperties().isSavepoint() &&
(latestSavepoint == null ||
completed.getCheckpointId() > latestSavepoint.getCheckpointId())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/fcc1efcb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
index fb3bd65..52ac54c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
@@ -93,22 +93,22 @@ public class CheckpointPropertiesTest {
public void testIsSavepoint() throws Exception {
{
CheckpointProperties props = CheckpointProperties.forStandardCheckpoint();
- assertFalse(CheckpointProperties.isSavepoint(props));
+ assertFalse(props.isSavepoint());
}
{
CheckpointProperties props = CheckpointProperties.forExternalizedCheckpoint(true);
- assertFalse(CheckpointProperties.isSavepoint(props));
+ assertFalse(props.isSavepoint());
}
{
CheckpointProperties props = CheckpointProperties.forExternalizedCheckpoint(false);
- assertFalse(CheckpointProperties.isSavepoint(props));
+ assertFalse(props.isSavepoint());
}
{
CheckpointProperties props = CheckpointProperties.forStandardSavepoint();
- assertTrue(CheckpointProperties.isSavepoint(props));
+ assertTrue(props.isSavepoint());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fcc1efcb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
index 7541806..3c373f1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
@@ -175,12 +175,14 @@ public class CheckpointStatsHistoryTest {
PendingCheckpointStats pending = mock(PendingCheckpointStats.class);
when(pending.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS);
when(pending.getCheckpointId()).thenReturn(checkpointId);
+ when(pending.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
return pending;
}
private CompletedCheckpointStats createCompletedCheckpointStats(long checkpointId) {
CompletedCheckpointStats completed = mock(CompletedCheckpointStats.class);
when(completed.getStatus()).thenReturn(CheckpointStatsStatus.COMPLETED);
+ when(completed.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
when(completed.getCheckpointId()).thenReturn(checkpointId);
return completed;
}
@@ -188,6 +190,7 @@ public class CheckpointStatsHistoryTest {
private FailedCheckpointStats createFailedCheckpointStats(long checkpointId) {
FailedCheckpointStats failed = mock(FailedCheckpointStats.class);
when(failed.getStatus()).thenReturn(CheckpointStatsStatus.FAILED);
+ when(failed.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
when(failed.getCheckpointId()).thenReturn(checkpointId);
return failed;
}