You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/03/02 17:27:39 UTC
[1/2] flink git commit: [FLINK-5941] Integrate Archiver pattern into
handlers
Repository: flink
Updated Branches:
refs/heads/master 243ef69bf -> 7fe0eb477
http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
index 939f439..0076d42 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
@@ -21,12 +21,34 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
import org.junit.Assert;
import org.junit.Test;
+import java.io.IOException;
+import java.util.Collection;
+
public class SubtasksTimesHandlerTest {
+
+ @Test
+ public void testArchiver() throws Exception {
+ JsonArchivist archivist = new SubtasksTimesHandler.SubtasksTimesJsonArchivist();
+ AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+ AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+ AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt();
+
+ Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+ Assert.assertEquals(1, archives.size());
+
+ ArchivedJson archive = archives.iterator().next();
+ Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices/" + originalTask.getJobVertexId() + "/subtasktimes", archive.getPath());
+ compareSubtaskTimes(originalTask, originalAttempt, archive.getJson());
+ }
+
@Test
public void testGetPaths() {
SubtasksTimesHandler handler = new SubtasksTimesHandler(null);
@@ -41,6 +63,10 @@ public class SubtasksTimesHandlerTest {
AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt();
String json = SubtasksTimesHandler.createSubtaskTimesJson(originalTask);
+ compareSubtaskTimes(originalTask, originalAttempt, json);
+ }
+
+ private static void compareSubtaskTimes(AccessExecutionJobVertex originalTask, AccessExecution originalAttempt, String json) throws IOException {
JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json);
Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText());
http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
index e570e18..9d339f5 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
@@ -20,14 +20,20 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
import org.junit.Assert;
import org.junit.Test;
+import java.io.IOException;
+import java.util.Collection;
import java.util.Collections;
import static org.junit.Assert.assertEquals;
@@ -38,6 +44,37 @@ import static org.mockito.Mockito.when;
public class CheckpointConfigHandlerTest {
@Test
+ public void testArchiver() throws IOException {
+ JsonArchivist archivist = new CheckpointConfigHandler.CheckpointConfigJsonArchivist();
+ GraphAndSettings graphAndSettings = createGraphAndSettings(true, true);
+
+ AccessExecutionGraph graph = graphAndSettings.graph;
+ when(graph.getJobID()).thenReturn(new JobID());
+ JobSnapshottingSettings settings = graphAndSettings.snapshottingSettings;
+ ExternalizedCheckpointSettings externalizedSettings = graphAndSettings.externalizedSettings;
+
+ Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(graph);
+ Assert.assertEquals(1, archives.size());
+ ArchivedJson archive = archives.iterator().next();
+ Assert.assertEquals("/jobs/" + graph.getJobID() + "/checkpoints/config", archive.getPath());
+
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode rootNode = mapper.readTree(archive.getJson());
+
+ Assert.assertEquals("exactly_once", rootNode.get("mode").asText());
+ Assert.assertEquals(settings.getCheckpointInterval(), rootNode.get("interval").asLong());
+ Assert.assertEquals(settings.getCheckpointTimeout(), rootNode.get("timeout").asLong());
+ Assert.assertEquals(settings.getMinPauseBetweenCheckpoints(), rootNode.get("min_pause").asLong());
+ Assert.assertEquals(settings.getMaxConcurrentCheckpoints(), rootNode.get("max_concurrent").asInt());
+
+ JsonNode externalizedNode = rootNode.get("externalization");
+ Assert.assertNotNull(externalizedNode);
+ Assert.assertEquals(externalizedSettings.externalizeCheckpoints(), externalizedNode.get("enabled").asBoolean());
+ Assert.assertEquals(externalizedSettings.deleteOnCancellation(), externalizedNode.get("delete_on_cancellation").asBoolean());
+
+ }
+
+ @Test
public void testGetPaths() {
CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
String[] paths = handler.getPaths();
@@ -50,26 +87,10 @@ public class CheckpointConfigHandlerTest {
*/
@Test
public void testSimpleConfig() throws Exception {
- long interval = 18231823L;
- long timeout = 996979L;
- long minPause = 119191919L;
- int maxConcurrent = 12929329;
- ExternalizedCheckpointSettings externalized = ExternalizedCheckpointSettings.none();
+ GraphAndSettings graphAndSettings = createGraphAndSettings(false, true);
- JobSnapshottingSettings settings = new JobSnapshottingSettings(
- Collections.<JobVertexID>emptyList(),
- Collections.<JobVertexID>emptyList(),
- Collections.<JobVertexID>emptyList(),
- interval,
- timeout,
- minPause,
- maxConcurrent,
- externalized,
- null,
- true);
-
- AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
- when(graph.getJobSnapshottingSettings()).thenReturn(settings);
+ AccessExecutionGraph graph = graphAndSettings.graph;
+ JobSnapshottingSettings settings = graphAndSettings.snapshottingSettings;
CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
@@ -78,10 +99,10 @@ public class CheckpointConfigHandlerTest {
JsonNode rootNode = mapper.readTree(json);
assertEquals("exactly_once", rootNode.get("mode").asText());
- assertEquals(interval, rootNode.get("interval").asLong());
- assertEquals(timeout, rootNode.get("timeout").asLong());
- assertEquals(minPause, rootNode.get("min_pause").asLong());
- assertEquals(maxConcurrent, rootNode.get("max_concurrent").asInt());
+ assertEquals(settings.getCheckpointInterval(), rootNode.get("interval").asLong());
+ assertEquals(settings.getCheckpointTimeout(), rootNode.get("timeout").asLong());
+ assertEquals(settings.getMinPauseBetweenCheckpoints(), rootNode.get("min_pause").asLong());
+ assertEquals(settings.getMaxConcurrentCheckpoints(), rootNode.get("max_concurrent").asInt());
JsonNode externalizedNode = rootNode.get("externalization");
assertNotNull(externalizedNode);
@@ -93,20 +114,9 @@ public class CheckpointConfigHandlerTest {
*/
@Test
public void testAtLeastOnce() throws Exception {
- JobSnapshottingSettings settings = new JobSnapshottingSettings(
- Collections.<JobVertexID>emptyList(),
- Collections.<JobVertexID>emptyList(),
- Collections.<JobVertexID>emptyList(),
- 996979L,
- 1818L,
- 1212L,
- 12,
- ExternalizedCheckpointSettings.none(),
- null,
- false); // at least once
+ GraphAndSettings graphAndSettings = createGraphAndSettings(false, false);
- AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
- when(graph.getJobSnapshottingSettings()).thenReturn(settings);
+ AccessExecutionGraph graph = graphAndSettings.graph;
CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
@@ -122,30 +132,60 @@ public class CheckpointConfigHandlerTest {
*/
@Test
public void testEnabledExternalizedCheckpointSettings() throws Exception {
- ExternalizedCheckpointSettings externalizedSettings = ExternalizedCheckpointSettings.externalizeCheckpoints(true);
+ GraphAndSettings graphAndSettings = createGraphAndSettings(true, false);
+
+ AccessExecutionGraph graph = graphAndSettings.graph;
+ ExternalizedCheckpointSettings externalizedSettings = graphAndSettings.externalizedSettings;
+
+ CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
+ String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
+
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode externalizedNode = mapper.readTree(json).get("externalization");
+ assertNotNull(externalizedNode);
+ assertEquals(externalizedSettings.externalizeCheckpoints(), externalizedNode.get("enabled").asBoolean());
+ assertEquals(externalizedSettings.deleteOnCancellation(), externalizedNode.get("delete_on_cancellation").asBoolean());
+ }
+
+ private static GraphAndSettings createGraphAndSettings(boolean externalized, boolean exactlyOnce) {
+ long interval = 18231823L;
+ long timeout = 996979L;
+ long minPause = 119191919L;
+ int maxConcurrent = 12929329;
+ ExternalizedCheckpointSettings externalizedSetting = externalized
+ ? ExternalizedCheckpointSettings.externalizeCheckpoints(true)
+ : ExternalizedCheckpointSettings.none();
JobSnapshottingSettings settings = new JobSnapshottingSettings(
Collections.<JobVertexID>emptyList(),
Collections.<JobVertexID>emptyList(),
Collections.<JobVertexID>emptyList(),
- 996979L,
- 1818L,
- 1212L,
- 12,
- externalizedSettings,
+ interval,
+ timeout,
+ minPause,
+ maxConcurrent,
+ externalizedSetting,
null,
- false); // at least once
+ exactlyOnce);
AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
when(graph.getJobSnapshottingSettings()).thenReturn(settings);
- CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
- String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
+ return new GraphAndSettings(graph, settings, externalizedSetting);
+ }
- ObjectMapper mapper = new ObjectMapper();
- JsonNode externalizedNode = mapper.readTree(json).get("externalization");
- assertNotNull(externalizedNode);
- assertEquals(externalizedSettings.externalizeCheckpoints(), externalizedNode.get("enabled").asBoolean());
- assertEquals(externalizedSettings.deleteOnCancellation(), externalizedNode.get("delete_on_cancellation").asBoolean());
+ private static class GraphAndSettings {
+ public final AccessExecutionGraph graph;
+ public final JobSnapshottingSettings snapshottingSettings;
+ public final ExternalizedCheckpointSettings externalizedSettings;
+
+ public GraphAndSettings(
+ AccessExecutionGraph graph,
+ JobSnapshottingSettings snapshottingSettings,
+ ExternalizedCheckpointSettings externalizedSettings) {
+ this.graph = graph;
+ this.snapshottingSettings = snapshottingSettings;
+ this.externalizedSettings = externalizedSettings;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
index ca9b606..770b032 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
@@ -32,12 +33,19 @@ import org.apache.flink.runtime.checkpoint.TaskStateStats;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+
import org.junit.Assert;
import org.junit.Test;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
@@ -50,6 +58,44 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class CheckpointStatsDetailsHandlerTest {
+
+ @Test
+ public void testArchiver() throws IOException {
+ JsonArchivist archivist = new CheckpointStatsDetailsHandler.CheckpointStatsDetailsJsonArchivist();
+
+ CompletedCheckpointStats completedCheckpoint = createCompletedCheckpoint();
+ FailedCheckpointStats failedCheckpoint = createFailedCheckpoint();
+ List<AbstractCheckpointStats> checkpoints = new ArrayList<>();
+ checkpoints.add(failedCheckpoint);
+ checkpoints.add(completedCheckpoint);
+
+ CheckpointStatsHistory history = mock(CheckpointStatsHistory.class);
+ when(history.getCheckpoints()).thenReturn(checkpoints);
+ CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class);
+ when(snapshot.getHistory()).thenReturn(history);
+
+ AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+ when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
+ when(graph.getJobID()).thenReturn(new JobID());
+
+ ObjectMapper mapper = new ObjectMapper();
+
+ Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(graph);
+ Assert.assertEquals(2, archives.size());
+
+ Iterator<ArchivedJson> iterator = archives.iterator();
+ ArchivedJson archive1 = iterator.next();
+ Assert.assertEquals(
+ "/jobs/" + graph.getJobID() + "/checkpoints/details/" + failedCheckpoint.getCheckpointId(),
+ archive1.getPath());
+ compareFailedCheckpoint(failedCheckpoint, mapper.readTree(archive1.getJson()));
+
+ ArchivedJson archive2 = iterator.next();
+ Assert.assertEquals(
+ "/jobs/" + graph.getJobID() + "/checkpoints/details/" + completedCheckpoint.getCheckpointId(),
+ archive2.getPath());
+ compareCompletedCheckpoint(completedCheckpoint, mapper.readTree(archive2.getJson()));
+ }
@Test
public void testGetPaths() {
@@ -146,8 +192,7 @@ public class CheckpointStatsDetailsHandlerTest {
assertEquals(checkpoint.getNumberOfSubtasks(), rootNode.get("num_subtasks").asInt());
assertEquals(checkpoint.getNumberOfAcknowledgedSubtasks(), rootNode.get("num_acknowledged_subtasks").asInt());
- verifyTaskNode(task1, rootNode);
- verifyTaskNode(task2, rootNode);
+ verifyTaskNodes(taskStats, rootNode);
}
/**
@@ -155,6 +200,32 @@ public class CheckpointStatsDetailsHandlerTest {
*/
@Test
public void testCheckpointDetailsRequestCompletedCheckpoint() throws Exception {
+ CompletedCheckpointStats checkpoint = createCompletedCheckpoint();
+
+ JsonNode rootNode = triggerRequest(checkpoint);
+
+ compareCompletedCheckpoint(checkpoint, rootNode);
+
+ verifyTaskNodes(checkpoint.getAllTaskStateStats(), rootNode);
+ }
+
+ /**
+ * Tests a checkpoint details request for a failed checkpoint.
+ */
+ @Test
+ public void testCheckpointDetailsRequestFailedCheckpoint() throws Exception {
+ FailedCheckpointStats checkpoint = createFailedCheckpoint();
+
+ JsonNode rootNode = triggerRequest(checkpoint);
+
+ compareFailedCheckpoint(checkpoint, rootNode);
+
+ verifyTaskNodes(checkpoint.getAllTaskStateStats(), rootNode);
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static CompletedCheckpointStats createCompletedCheckpoint() {
CompletedCheckpointStats checkpoint = mock(CompletedCheckpointStats.class);
when(checkpoint.getCheckpointId()).thenReturn(1818213L);
when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.COMPLETED);
@@ -177,8 +248,10 @@ public class CheckpointStatsDetailsHandlerTest {
when(checkpoint.getAllTaskStateStats()).thenReturn(taskStats);
- JsonNode rootNode = triggerRequest(checkpoint);
+ return checkpoint;
+ }
+ private static void compareCompletedCheckpoint(CompletedCheckpointStats checkpoint, JsonNode rootNode) {
assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong());
assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText());
assertEquals(checkpoint.getProperties().isSavepoint(), rootNode.get("is_savepoint").asBoolean());
@@ -191,18 +264,11 @@ public class CheckpointStatsDetailsHandlerTest {
assertEquals(checkpoint.getExternalPath(), rootNode.get("external_path").asText());
assertEquals(checkpoint.getNumberOfSubtasks(), rootNode.get("num_subtasks").asInt());
assertEquals(checkpoint.getNumberOfAcknowledgedSubtasks(), rootNode.get("num_acknowledged_subtasks").asInt());
-
- verifyTaskNode(task1, rootNode);
- verifyTaskNode(task2, rootNode);
}
- /**
- * Tests a checkpoint details request for a failed checkpoint.
- */
- @Test
- public void testCheckpointDetailsRequestFailedCheckpoint() throws Exception {
+ private static FailedCheckpointStats createFailedCheckpoint() {
FailedCheckpointStats checkpoint = mock(FailedCheckpointStats.class);
- when(checkpoint.getCheckpointId()).thenReturn(1818213L);
+ when(checkpoint.getCheckpointId()).thenReturn(1818214L);
when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.FAILED);
when(checkpoint.getProperties()).thenReturn(CheckpointProperties.forStandardSavepoint());
when(checkpoint.getTriggerTimestamp()).thenReturn(1818L);
@@ -223,8 +289,10 @@ public class CheckpointStatsDetailsHandlerTest {
when(checkpoint.getAllTaskStateStats()).thenReturn(taskStats);
- JsonNode rootNode = triggerRequest(checkpoint);
+ return checkpoint;
+ }
+ private static void compareFailedCheckpoint(FailedCheckpointStats checkpoint, JsonNode rootNode) {
assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong());
assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText());
assertEquals(checkpoint.getProperties().isSavepoint(), rootNode.get("is_savepoint").asBoolean());
@@ -237,13 +305,8 @@ public class CheckpointStatsDetailsHandlerTest {
assertEquals(checkpoint.getFailureMessage(), rootNode.get("failure_message").asText());
assertEquals(checkpoint.getNumberOfSubtasks(), rootNode.get("num_subtasks").asInt());
assertEquals(checkpoint.getNumberOfAcknowledgedSubtasks(), rootNode.get("num_acknowledged_subtasks").asInt());
-
- verifyTaskNode(task1, rootNode);
- verifyTaskNode(task2, rootNode);
}
- // ------------------------------------------------------------------------
-
private static JsonNode triggerRequest(AbstractCheckpointStats checkpoint) throws Exception {
CheckpointStatsHistory history = mock(CheckpointStatsHistory.class);
when(history.getCheckpointById(anyLong())).thenReturn(checkpoint);
@@ -262,16 +325,18 @@ public class CheckpointStatsDetailsHandlerTest {
return mapper.readTree(json);
}
- private static void verifyTaskNode(TaskStateStats task, JsonNode parentNode) {
- long duration = ThreadLocalRandom.current().nextInt(128);
-
- JsonNode taskNode = parentNode.get("tasks").get(task.getJobVertexId().toString());
- assertEquals(task.getLatestAckTimestamp(), taskNode.get("latest_ack_timestamp").asLong());
- assertEquals(task.getStateSize(), taskNode.get("state_size").asLong());
- assertEquals(task.getEndToEndDuration(task.getLatestAckTimestamp() - duration), taskNode.get("end_to_end_duration").asLong());
- assertEquals(task.getAlignmentBuffered(), taskNode.get("alignment_buffered").asLong());
- assertEquals(task.getNumberOfSubtasks(), taskNode.get("num_subtasks").asInt());
- assertEquals(task.getNumberOfAcknowledgedSubtasks(), taskNode.get("num_acknowledged_subtasks").asInt());
+ private static void verifyTaskNodes(Collection<TaskStateStats> tasks, JsonNode parentNode) {
+ for (TaskStateStats task : tasks) {
+ long duration = ThreadLocalRandom.current().nextInt(128);
+
+ JsonNode taskNode = parentNode.get("tasks").get(task.getJobVertexId().toString());
+ assertEquals(task.getLatestAckTimestamp(), taskNode.get("latest_ack_timestamp").asLong());
+ assertEquals(task.getStateSize(), taskNode.get("state_size").asLong());
+ assertEquals(task.getEndToEndDuration(task.getLatestAckTimestamp() - duration), taskNode.get("end_to_end_duration").asLong());
+ assertEquals(task.getAlignmentBuffered(), taskNode.get("alignment_buffered").asLong());
+ assertEquals(task.getNumberOfSubtasks(), taskNode.get("num_subtasks").asInt());
+ assertEquals(task.getNumberOfAcknowledgedSubtasks(), taskNode.get("num_acknowledged_subtasks").asInt());
+ }
}
private static TaskStateStats createTaskStateStats() {
http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
index ab7c7a3..1e4a255 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
@@ -34,10 +35,15 @@ import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
import org.junit.Assert;
import org.junit.Test;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -51,6 +57,32 @@ import static org.mockito.Mockito.when;
public class CheckpointStatsHandlerTest {
@Test
+ public void testArchiver() throws IOException {
+ JsonArchivist archivist = new CheckpointStatsDetailsHandler.CheckpointStatsDetailsJsonArchivist();
+ TestCheckpointStats testCheckpointStats = createTestCheckpointStats();
+ when(testCheckpointStats.graph.getJobID()).thenReturn(new JobID());
+
+ Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(testCheckpointStats.graph);
+ Assert.assertEquals(3, archives.size());
+
+ ObjectMapper mapper = new ObjectMapper();
+
+ Iterator<ArchivedJson> iterator = archives.iterator();
+ ArchivedJson archive1 = iterator.next();
+ Assert.assertEquals("/jobs/" + testCheckpointStats.graph.getJobID() + "/checkpoints/details/" + testCheckpointStats.inProgress.getCheckpointId(), archive1.getPath());
+ compareInProgressCheckpoint(testCheckpointStats.inProgress, mapper.readTree(archive1.getJson()));
+
+ ArchivedJson archive2 = iterator.next();
+ Assert.assertEquals("/jobs/" + testCheckpointStats.graph.getJobID() + "/checkpoints/details/" + testCheckpointStats.completedSavepoint.getCheckpointId(), archive2.getPath());
+ compareCompletedSavepoint(testCheckpointStats.completedSavepoint, mapper.readTree(archive2.getJson()));
+
+ ArchivedJson archive3 = iterator.next();
+ Assert.assertEquals("/jobs/" + testCheckpointStats.graph.getJobID() + "/checkpoints/details/" + testCheckpointStats.failed.getCheckpointId(), archive3.getPath());
+ compareFailedCheckpoint(testCheckpointStats.failed, mapper.readTree(archive3.getJson()));
+ }
+
+
+ @Test
public void testGetPaths() {
CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphHolder.class));
String[] paths = handler.getPaths();
@@ -63,6 +95,18 @@ public class CheckpointStatsHandlerTest {
*/
@Test
public void testCheckpointStatsRequest() throws Exception {
+ TestCheckpointStats testCheckpointStats = createTestCheckpointStats();
+
+ CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphHolder.class));
+ String json = handler.handleRequest(testCheckpointStats.graph, Collections.<String, String>emptyMap());
+
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode rootNode = mapper.readTree(json);
+
+ compareCheckpointStats(testCheckpointStats, rootNode);
+ }
+
+ private static TestCheckpointStats createTestCheckpointStats() {
// Counts
CheckpointStatsCounts counts = mock(CheckpointStatsCounts.class);
when(counts.getNumberOfRestoredCheckpoints()).thenReturn(123123123L);
@@ -104,7 +148,7 @@ public class CheckpointStatsHandlerTest {
when(latestCompleted.getExternalPath()).thenReturn("latest-completed-external-path");
CompletedCheckpointStats latestSavepoint = mock(CompletedCheckpointStats.class);
- when(latestSavepoint.getCheckpointId()).thenReturn(1992139L);
+ when(latestSavepoint.getCheckpointId()).thenReturn(1992140L);
when(latestSavepoint.getTriggerTimestamp()).thenReturn(1919191900L);
when(latestSavepoint.getLatestAckTimestamp()).thenReturn(1977791901L);
when(latestSavepoint.getStateSize()).thenReturn(111939272822L);
@@ -133,7 +177,7 @@ public class CheckpointStatsHandlerTest {
List<AbstractCheckpointStats> checkpoints = new ArrayList<>();
PendingCheckpointStats inProgress = mock(PendingCheckpointStats.class);
- when(inProgress.getCheckpointId()).thenReturn(1992139L);
+ when(inProgress.getCheckpointId()).thenReturn(1992141L);
when(inProgress.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS);
when(inProgress.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
when(inProgress.getTriggerTimestamp()).thenReturn(1919191900L);
@@ -189,12 +233,15 @@ public class CheckpointStatsHandlerTest {
AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
- CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphHolder.class));
- String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
-
- ObjectMapper mapper = new ObjectMapper();
- JsonNode rootNode = mapper.readTree(json);
+ return new TestCheckpointStats(
+ graph, counts, stateSizeSummary, durationSummary, alignmentBufferedSummary, summary,
+ latestCompleted, latestSavepoint, latestFailed, latestRestored, inProgress,
+ completedSavepoint, failed, history, snapshot
+ );
+ }
+ private static void compareCheckpointStats(TestCheckpointStats checkpointStats, JsonNode rootNode) {
+ CheckpointStatsCounts counts = checkpointStats.counts;
JsonNode countNode = rootNode.get("counts");
assertEquals(counts.getNumberOfRestoredCheckpoints(), countNode.get("restored").asLong());
assertEquals(counts.getTotalNumberOfCheckpoints(), countNode.get("total").asLong());
@@ -202,22 +249,26 @@ public class CheckpointStatsHandlerTest {
assertEquals(counts.getNumberOfCompletedCheckpoints(), countNode.get("completed").asLong());
assertEquals(counts.getNumberOfFailedCheckpoints(), countNode.get("failed").asLong());
+ MinMaxAvgStats stateSizeSummary = checkpointStats.stateSizeSummary;
JsonNode summaryNode = rootNode.get("summary");
JsonNode sizeSummaryNode = summaryNode.get("state_size");
assertEquals(stateSizeSummary.getMinimum(), sizeSummaryNode.get("min").asLong());
assertEquals(stateSizeSummary.getMaximum(), sizeSummaryNode.get("max").asLong());
assertEquals(stateSizeSummary.getAverage(), sizeSummaryNode.get("avg").asLong());
+ MinMaxAvgStats durationSummary = checkpointStats.durationSummary;
JsonNode durationSummaryNode = summaryNode.get("end_to_end_duration");
assertEquals(durationSummary.getMinimum(), durationSummaryNode.get("min").asLong());
assertEquals(durationSummary.getMaximum(), durationSummaryNode.get("max").asLong());
assertEquals(durationSummary.getAverage(), durationSummaryNode.get("avg").asLong());
+ MinMaxAvgStats alignmentBufferedSummary = checkpointStats.alignmentBufferedSummary;
JsonNode alignmentBufferedNode = summaryNode.get("alignment_buffered");
assertEquals(alignmentBufferedSummary.getMinimum(), alignmentBufferedNode.get("min").asLong());
assertEquals(alignmentBufferedSummary.getMaximum(), alignmentBufferedNode.get("max").asLong());
assertEquals(alignmentBufferedSummary.getAverage(), alignmentBufferedNode.get("avg").asLong());
+ CompletedCheckpointStats latestCompleted = checkpointStats.latestCompleted;
JsonNode latestNode = rootNode.get("latest");
JsonNode latestCheckpointNode = latestNode.get("completed");
assertEquals(latestCompleted.getCheckpointId(), latestCheckpointNode.get("id").asLong());
@@ -228,6 +279,7 @@ public class CheckpointStatsHandlerTest {
assertEquals(latestCompleted.getAlignmentBuffered(), latestCheckpointNode.get("alignment_buffered").asLong());
assertEquals(latestCompleted.getExternalPath(), latestCheckpointNode.get("external_path").asText());
+ CompletedCheckpointStats latestSavepoint = checkpointStats.latestSavepoint;
JsonNode latestSavepointNode = latestNode.get("savepoint");
assertEquals(latestSavepoint.getCheckpointId(), latestSavepointNode.get("id").asLong());
assertEquals(latestSavepoint.getTriggerTimestamp(), latestSavepointNode.get("trigger_timestamp").asLong());
@@ -237,6 +289,7 @@ public class CheckpointStatsHandlerTest {
assertEquals(latestSavepoint.getAlignmentBuffered(), latestSavepointNode.get("alignment_buffered").asLong());
assertEquals(latestSavepoint.getExternalPath(), latestSavepointNode.get("external_path").asText());
+ FailedCheckpointStats latestFailed = checkpointStats.latestFailed;
JsonNode latestFailedNode = latestNode.get("failed");
assertEquals(latestFailed.getCheckpointId(), latestFailedNode.get("id").asLong());
assertEquals(latestFailed.getTriggerTimestamp(), latestFailedNode.get("trigger_timestamp").asLong());
@@ -247,6 +300,7 @@ public class CheckpointStatsHandlerTest {
assertEquals(latestFailed.getFailureTimestamp(), latestFailedNode.get("failure_timestamp").asLong());
assertEquals(latestFailed.getFailureMessage(), latestFailedNode.get("failure_message").asText());
+ RestoredCheckpointStats latestRestored = checkpointStats.latestRestored;
JsonNode latestRestoredNode = latestNode.get("restored");
assertEquals(latestRestored.getCheckpointId(), latestRestoredNode.get("id").asLong());
assertEquals(latestRestored.getRestoreTimestamp(), latestRestoredNode.get("restore_timestamp").asLong());
@@ -259,6 +313,25 @@ public class CheckpointStatsHandlerTest {
assertTrue(it.hasNext());
JsonNode inProgressNode = it.next();
+ PendingCheckpointStats inProgress = checkpointStats.inProgress;
+ compareInProgressCheckpoint(inProgress, inProgressNode);
+
+ assertTrue(it.hasNext());
+ JsonNode completedSavepointNode = it.next();
+
+ CompletedCheckpointStats completedSavepoint = checkpointStats.completedSavepoint;
+ compareCompletedSavepoint(completedSavepoint, completedSavepointNode);
+
+ assertTrue(it.hasNext());
+ JsonNode failedNode = it.next();
+
+ FailedCheckpointStats failed = checkpointStats.failed;
+ compareFailedCheckpoint(failed, failedNode);
+
+ assertFalse(it.hasNext());
+ }
+
+ private static void compareInProgressCheckpoint(PendingCheckpointStats inProgress, JsonNode inProgressNode) {
assertEquals(inProgress.getCheckpointId(), inProgressNode.get("id").asLong());
assertEquals(inProgress.getStatus().toString(), inProgressNode.get("status").asText());
assertEquals(inProgress.getProperties().isSavepoint(), inProgressNode.get("is_savepoint").asBoolean());
@@ -269,10 +342,9 @@ public class CheckpointStatsHandlerTest {
assertEquals(inProgress.getAlignmentBuffered(), inProgressNode.get("alignment_buffered").asLong());
assertEquals(inProgress.getNumberOfSubtasks(), inProgressNode.get("num_subtasks").asInt());
assertEquals(inProgress.getNumberOfAcknowledgedSubtasks(), inProgressNode.get("num_acknowledged_subtasks").asInt());
+ }
- assertTrue(it.hasNext());
- JsonNode completedSavepointNode = it.next();
-
+ private static void compareCompletedSavepoint(CompletedCheckpointStats completedSavepoint, JsonNode completedSavepointNode) {
assertEquals(completedSavepoint.getCheckpointId(), completedSavepointNode.get("id").asLong());
assertEquals(completedSavepoint.getStatus().toString(), completedSavepointNode.get("status").asText());
assertEquals(completedSavepoint.getProperties().isSavepoint(), completedSavepointNode.get("is_savepoint").asBoolean());
@@ -286,10 +358,9 @@ public class CheckpointStatsHandlerTest {
assertEquals(completedSavepoint.getExternalPath(), completedSavepointNode.get("external_path").asText());
assertEquals(completedSavepoint.isDiscarded(), completedSavepointNode.get("discarded").asBoolean());
+ }
- assertTrue(it.hasNext());
- JsonNode failedNode = it.next();
-
+ private static void compareFailedCheckpoint(FailedCheckpointStats failed, JsonNode failedNode) {
assertEquals(failed.getCheckpointId(), failedNode.get("id").asLong());
assertEquals(failed.getStatus().toString(), failedNode.get("status").asText());
assertEquals(failed.getProperties().isSavepoint(), failedNode.get("is_savepoint").asBoolean());
@@ -303,7 +374,56 @@ public class CheckpointStatsHandlerTest {
assertEquals(failed.getFailureTimestamp(), failedNode.get("failure_timestamp").asLong());
assertEquals(failed.getFailureMessage(), failedNode.get("failure_message").asText());
-
- assertFalse(it.hasNext());
+ }
+
+ private static class TestCheckpointStats {
+ public final AccessExecutionGraph graph;
+ public final CheckpointStatsCounts counts;
+ public final MinMaxAvgStats stateSizeSummary;
+ public final MinMaxAvgStats durationSummary;
+ public final MinMaxAvgStats alignmentBufferedSummary;
+ public final CompletedCheckpointStatsSummary summary;
+ public final CompletedCheckpointStats latestCompleted;
+ public final CompletedCheckpointStats latestSavepoint;
+ public final FailedCheckpointStats latestFailed;
+ public final RestoredCheckpointStats latestRestored;
+ public final PendingCheckpointStats inProgress;
+ public final CompletedCheckpointStats completedSavepoint;
+ public final FailedCheckpointStats failed;
+ public final CheckpointStatsHistory history;
+ public final CheckpointStatsSnapshot snapshot;
+
+ public TestCheckpointStats(
+ AccessExecutionGraph graph,
+ CheckpointStatsCounts counts,
+ MinMaxAvgStats stateSizeSummary,
+ MinMaxAvgStats durationSummary,
+ MinMaxAvgStats alignmentBufferedSummary,
+ CompletedCheckpointStatsSummary summary,
+ CompletedCheckpointStats latestCompleted,
+ CompletedCheckpointStats latestSavepoint,
+ FailedCheckpointStats latestFailed,
+ RestoredCheckpointStats latestRestored,
+ PendingCheckpointStats inProgress,
+ CompletedCheckpointStats completedSavepoint,
+ FailedCheckpointStats failed,
+ CheckpointStatsHistory history,
+ CheckpointStatsSnapshot snapshot) {
+ this.graph = graph;
+ this.counts = counts;
+ this.stateSizeSummary = stateSizeSummary;
+ this.durationSummary = durationSummary;
+ this.alignmentBufferedSummary = alignmentBufferedSummary;
+ this.summary = summary;
+ this.latestCompleted = latestCompleted;
+ this.latestSavepoint = latestSavepoint;
+ this.latestFailed = latestFailed;
+ this.latestRestored = latestRestored;
+ this.inProgress = inProgress;
+ this.completedSavepoint = completedSavepoint;
+ this.failed = failed;
+ this.history = history;
+ this.snapshot = snapshot;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
index 26433fa..bbab621 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
@@ -31,9 +32,13 @@ import org.apache.flink.runtime.checkpoint.TaskStateStats;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
import org.junit.Assert;
import org.junit.Test;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -55,6 +60,42 @@ import static org.mockito.Mockito.when;
public class CheckpointStatsSubtaskDetailsHandlerTest {
@Test
+ public void testArchiver() throws Exception {
+ JsonArchivist archivist = new CheckpointStatsDetailsSubtasksHandler.CheckpointStatsDetailsSubtasksJsonArchivist();
+ ObjectMapper mapper = new ObjectMapper();
+
+ PendingCheckpointStats checkpoint = mock(PendingCheckpointStats.class);
+ when(checkpoint.getCheckpointId()).thenReturn(1992139L);
+ when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS);
+ when(checkpoint.getTriggerTimestamp()).thenReturn(0L); // ack timestamp = duration
+
+ TaskStateStats task = createTaskStateStats(1237);
+ when(checkpoint.getAllTaskStateStats()).thenReturn(Collections.singletonList(task));
+
+ CheckpointStatsHistory history = mock(CheckpointStatsHistory.class);
+ when(history.getCheckpoints()).thenReturn(Collections.<AbstractCheckpointStats>singletonList(checkpoint));
+ CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class);
+ when(snapshot.getHistory()).thenReturn(history);
+
+ AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+ when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
+ when(graph.getJobID()).thenReturn(new JobID());
+
+ Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(graph);
+ Assert.assertEquals(1, archives.size());
+
+ ArchivedJson archive = archives.iterator().next();
+ Assert.assertEquals(
+ "/jobs/" + graph.getJobID() + "/checkpoints/details/" + checkpoint.getCheckpointId() + "/subtasks/" + task.getJobVertexId(),
+ archive.getPath());
+ JsonNode rootNode = mapper.readTree(archive.getJson());
+ assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong());
+ assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText());
+
+ verifyTaskNode(rootNode, task, checkpoint.getTriggerTimestamp());
+ }
+
+ @Test
public void testGetPaths() {
CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
String[] paths = handler.getPaths();
http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
index 0340d87..ed339ed 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
@@ -105,7 +105,7 @@ public class ArchivedJobGenerationUtils {
originalAttempt = new ArchivedExecutionBuilder()
.setStateTimestamps(new long[]{1, 2, 3, 4, 5, 6, 7, 8, 9})
.setParallelSubtaskIndex(1)
- .setAttemptNumber(3)
+ .setAttemptNumber(0)
.setAssignedResourceLocation(location)
.setUserAccumulators(new StringifiedAccumulatorResult[]{acc1, acc2})
.setState(ExecutionState.FINISHED)
http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
new file mode 100644
index 0000000..22e011c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.history;
+
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A simple container for a handler's JSON response and the REST URLs for which the response would've been returned.
+ *
+ * These are created by {@link JsonArchivist}s, and used by the {@link MemoryArchivist} to create a directory structure
+ * resembling the REST API.
+ */
+public class ArchivedJson {
+ private final String path;
+ private final String json;
+
+ public ArchivedJson(String path, String json) {
+ this.path = Preconditions.checkNotNull(path);
+ this.json = Preconditions.checkNotNull(json);
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public String getJson() {
+ return json;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/JsonArchivist.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/JsonArchivist.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/JsonArchivist.java
new file mode 100644
index 0000000..a87cc47
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/JsonArchivist.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.history;
+
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Interface for all classes that want to participate in the archiving of job-related json responses.
+ *
+ * Note that all JsonArchivists that are to be used for the history server must be added
+ * to {@link WebRuntimeMonitor#getArchivers()}.
+ */
+public interface JsonArchivist {
+
+ /**
+ * Returns a {@link Collection} of {@link ArchivedJson}s containing JSON responses and their respective REST URL
+ * for a given job.
+ *
+ * The collection should contain one entry for every response that could be generated for the given
+ * job, for example one entry for each task. The REST URLs should be unique and must not contain placeholders.
+ *
+ * @param graph AccessExecutionGraph for which the responses should be generated
+ *
+ * @return Collection containing an ArchivedJson for every response that could be generated for the given job
+ * @throws IOException thrown if the JSON generation fails
+ */
+ Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException;
+}
[2/2] flink git commit: [FLINK-5941] Integrate Archiver pattern into
handlers
Posted by ch...@apache.org.
[FLINK-5941] Integrate Archiver pattern into handlers
This closes #3444.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7fe0eb47
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7fe0eb47
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7fe0eb47
Branch: refs/heads/master
Commit: 7fe0eb477df52cfd7254695a67d41f3cba34ef0a
Parents: 243ef69
Author: zentol <ch...@apache.org>
Authored: Mon Feb 20 16:30:35 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Thu Mar 2 18:27:15 2017 +0100
----------------------------------------------------------------------
.../runtime/webmonitor/WebRuntimeMonitor.java | 42 ++++++
.../handlers/CurrentJobsOverviewHandler.java | 26 ++++
.../handlers/JobAccumulatorsHandler.java | 15 ++
.../webmonitor/handlers/JobConfigHandler.java | 16 ++
.../webmonitor/handlers/JobDetailsHandler.java | 20 +++
.../handlers/JobExceptionsHandler.java | 15 ++
.../webmonitor/handlers/JobPlanHandler.java | 16 ++
.../handlers/JobVertexAccumulatorsHandler.java | 22 +++
.../handlers/JobVertexDetailsHandler.java | 22 +++
.../handlers/JobVertexTaskManagersHandler.java | 20 +++
...taskExecutionAttemptAccumulatorsHandler.java | 40 +++++
.../SubtaskExecutionAttemptDetailsHandler.java | 47 ++++++
.../SubtasksAllAccumulatorsHandler.java | 22 +++
.../handlers/SubtasksTimesHandler.java | 22 +++
.../checkpoints/CheckpointConfigHandler.java | 15 ++
.../CheckpointStatsDetailsHandler.java | 28 ++++
.../CheckpointStatsDetailsSubtasksHandler.java | 31 ++++
.../checkpoints/CheckpointStatsHandler.java | 15 ++
.../CurrentJobsOverviewHandlerTest.java | 32 +++-
.../handlers/JobAccumulatorsHandlerTest.java | 23 +++
.../handlers/JobConfigHandlerTest.java | 21 +++
.../handlers/JobDetailsHandlerTest.java | 28 ++++
.../handlers/JobExceptionsHandlerTest.java | 23 +++
.../webmonitor/handlers/JobPlanHandlerTest.java | 20 +++
.../JobVertexAccumulatorsHandlerTest.java | 25 ++++
.../handlers/JobVertexDetailsHandlerTest.java | 25 ++++
.../JobVertexTaskManagersHandlerTest.java | 26 ++++
...ExecutionAttemptAccumulatorsHandlerTest.java | 33 ++++
...btaskExecutionAttemptDetailsHandlerTest.java | 40 +++++
.../SubtasksAllAccumulatorsHandlerTest.java | 25 ++++
.../handlers/SubtasksTimesHandlerTest.java | 26 ++++
.../CheckpointConfigHandlerTest.java | 140 ++++++++++-------
.../CheckpointStatsDetailsHandlerTest.java | 121 +++++++++++----
.../checkpoints/CheckpointStatsHandlerTest.java | 150 +++++++++++++++++--
...heckpointStatsSubtaskDetailsHandlerTest.java | 41 +++++
.../utils/ArchivedJobGenerationUtils.java | 2 +-
.../webmonitor/history/ArchivedJson.java | 45 ++++++
.../webmonitor/history/JsonArchivist.java | 46 ++++++
38 files changed, 1231 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index dddc69d..e604ce8 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -37,6 +37,7 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
@@ -77,6 +78,7 @@ import org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsC
import org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsDetailsHandler;
import org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsHandler;
import org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsDetailsSubtasksHandler;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.metrics.JobManagerMetricsHandler;
import org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler;
import org.apache.flink.runtime.webmonitor.metrics.JobVertexMetricsHandler;
@@ -424,6 +426,46 @@ public class WebRuntimeMonitor implements WebMonitor {
LOG.info("Web frontend listening at " + address + ':' + port);
}
+ /**
+ * Returns an array of all {@link JsonArchivist}s that are relevant for the history server.
+ *
+ * This method is static to allow easier access from the {@link MemoryArchivist}. Requiring a reference
+ * would imply that the WebRuntimeMonitor is always created before the archivist, which may not hold for all
+ * deployment modes.
+ *
+ * Similarly, no handler implements the JsonArchivist interface itself but instead contains a separate implementing
+ * class; otherwise we would either instantiate several handlers even though their main functionality isn't
+ * required, or yet again require that the WebRuntimeMonitor is started before the archivist.
+ *
+ * @return array of all JsonArchivists relevant for the history server
+ */
+ public static JsonArchivist[] getArchivers() {
+ JsonArchivist[] archivists = new JsonArchivist[]{
+ new CurrentJobsOverviewHandler.CurrentJobsOverviewJsonArchivist(),
+
+ new JobPlanHandler.JobPlanJsonArchivist(),
+ new JobConfigHandler.JobConfigJsonArchivist(),
+ new JobExceptionsHandler.JobExceptionsJsonArchivist(),
+ new JobDetailsHandler.JobDetailsJsonArchivist(),
+ new JobAccumulatorsHandler.JobAccumulatorsJsonArchivist(),
+
+ new CheckpointStatsHandler.CheckpointStatsJsonArchivist(),
+ new CheckpointConfigHandler.CheckpointConfigJsonArchivist(),
+ new CheckpointStatsDetailsHandler.CheckpointStatsDetailsJsonArchivist(),
+ new CheckpointStatsDetailsSubtasksHandler.CheckpointStatsDetailsSubtasksJsonArchivist(),
+
+ new JobVertexDetailsHandler.JobVertexDetailsJsonArchivist(),
+ new SubtasksTimesHandler.SubtasksTimesJsonArchivist(),
+ new JobVertexTaskManagersHandler.JobVertexTaskManagersJsonArchivist(),
+ new JobVertexAccumulatorsHandler.JobVertexAccumulatorsJsonArchivist(),
+ new SubtasksAllAccumulatorsHandler.SubtasksAllAccumulatorsJsonArchivist(),
+
+ new SubtaskExecutionAttemptDetailsHandler.SubtaskExecutionAttemptDetailsJsonArchivist(),
+ new SubtaskExecutionAttemptAccumulatorsHandler.SubtaskExecutionAttemptAccumulatorsJsonArchivist()
+ };
+ return archivists;
+ }
+
@Override
public void start(String jobManagerAkkaUrl) throws Exception {
LOG.info("Starting with JobManager {} on port {}", jobManagerAkkaUrl, getServerPort());
http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
index 00cf138..60a2b27 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
@@ -20,16 +20,22 @@ package org.apache.flink.runtime.webmonitor.handlers;
import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import java.io.IOException;
import java.io.StringWriter;
+import java.util.Collection;
+import java.util.Collections;
import java.util.Map;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -121,6 +127,26 @@ public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler {
}
}
+ public static class CurrentJobsOverviewJsonArchivist implements JsonArchivist {
+
+ @Override
+ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+ StringWriter writer = new StringWriter();
+ try (JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer)) {
+ gen.writeStartObject();
+ gen.writeArrayFieldStart("running");
+ gen.writeEndArray();
+ gen.writeArrayFieldStart("finished");
+ writeJobDetailOverviewAsJson(WebMonitorUtils.createDetailsForJob(graph), gen, System.currentTimeMillis());
+ gen.writeEndArray();
+ gen.writeEndObject();
+ }
+ String json = writer.toString();
+ String path = ALL_JOBS_REST_PATH;
+ return Collections.singleton(new ArchivedJson(path, json));
+ }
+ }
+
public static void writeJobDetailOverviewAsJson(JobDetails details, JsonGenerator gen, long now) throws IOException {
gen.writeStartObject();
http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
index dfc654e..c403aa2 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
@@ -22,9 +22,13 @@ import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import java.io.IOException;
import java.io.StringWriter;
+import java.util.Collection;
+import java.util.Collections;
import java.util.Map;
/**
@@ -48,6 +52,17 @@ public class JobAccumulatorsHandler extends AbstractExecutionGraphRequestHandler
return createJobAccumulatorsJson(graph);
}
+ public static class JobAccumulatorsJsonArchivist implements JsonArchivist {
+
+ @Override
+ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+ String json = createJobAccumulatorsJson(graph);
+ String path = JOB_ACCUMULATORS_REST_PATH
+ .replace(":jobid", graph.getJobID().toString());
+ return Collections.singletonList(new ArchivedJson(path, json));
+ }
+ }
+
public static String createJobAccumulatorsJson(AccessExecutionGraph graph) throws IOException {
StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
index 7d72235..2b96456 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
@@ -26,6 +26,11 @@ import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.api.common.ArchivedExecutionConfig;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import java.util.Collection;
+import java.util.Collections;
/**
* Request handler that returns the execution config of a job.
@@ -48,6 +53,17 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler {
return createJobConfigJson(graph);
}
+ public static class JobConfigJsonArchivist implements JsonArchivist {
+
+ @Override
+ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+ String json = createJobConfigJson(graph);
+ String path = JOB_CONFIG_REST_PATH
+ .replace(":jobid", graph.getJobID().toString());
+ return Collections.singletonList(new ArchivedJson(path, json));
+ }
+ }
+
public static String createJobConfigJson(AccessExecutionGraph graph) throws IOException {
StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
index 6d1f82f..029a4b5 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
@@ -27,12 +27,16 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Map;
/**
@@ -67,6 +71,22 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
return createJobDetailsJson(graph, fetcher);
}
+ public static class JobDetailsJsonArchivist implements JsonArchivist {
+
+ @Override
+ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+ String json = createJobDetailsJson(graph, null);
+ String path1 = JOB_DETAILS_REST_PATH
+ .replace(":jobid", graph.getJobID().toString());
+ String path2 = JOB_DETAILS_VERTICES_REST_PATH
+ .replace(":jobid", graph.getJobID().toString());
+ Collection<ArchivedJson> archives = new ArrayList();
+ archives.add(new ArchivedJson(path1, json));
+ archives.add(new ArchivedJson(path2, json));
+ return archives;
+ }
+ }
+
public static String createJobDetailsJson(AccessExecutionGraph graph, @Nullable MetricFetcher fetcher) throws IOException {
final StringWriter writer = new StringWriter();
final JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
index 0cce61f..81cdc83 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
@@ -23,10 +23,14 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.util.ExceptionUtils;
import java.io.IOException;
import java.io.StringWriter;
+import java.util.Collection;
+import java.util.Collections;
import java.util.Map;
/**
@@ -52,6 +56,17 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler {
return createJobExceptionsJson(graph);
}
+ public static class JobExceptionsJsonArchivist implements JsonArchivist {
+
+ @Override
+ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+ String json = createJobExceptionsJson(graph);
+ String path = JOB_EXCEPTIONS_REST_PATH
+ .replace(":jobid", graph.getJobID().toString());
+ return Collections.singletonList(new ArchivedJson(path, json));
+ }
+ }
+
public static String createJobExceptionsJson(AccessExecutionGraph graph) throws IOException {
StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
index becc2e1..885d04e 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
@@ -20,7 +20,12 @@ package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
import java.util.Map;
/**
@@ -43,4 +48,15 @@ public class JobPlanHandler extends AbstractExecutionGraphRequestHandler {
public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
return graph.getJsonPlan();
}
+
+ public static class JobPlanJsonArchivist implements JsonArchivist {
+
+ @Override
+ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+ String path = JOB_PLAN_REST_PATH
+ .replace(":jobid", graph.getJobID().toString());
+ String json = graph.getJsonPlan();
+ return Collections.singletonList(new ArchivedJson(path, json));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
index ca0488b..2532a1e 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
@@ -21,11 +21,17 @@ package org.apache.flink.runtime.webmonitor.handlers;
import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import java.io.IOException;
import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
import java.util.Map;
@@ -47,6 +53,22 @@ public class JobVertexAccumulatorsHandler extends AbstractJobVertexRequestHandle
return createVertexAccumulatorsJson(jobVertex);
}
+ public static class JobVertexAccumulatorsJsonArchivist implements JsonArchivist {
+
+ @Override
+ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+ List<ArchivedJson> archive = new ArrayList<>();
+ for (AccessExecutionJobVertex task : graph.getAllVertices().values()) {
+ String json = createVertexAccumulatorsJson(task);
+ String path = JOB_VERTEX_ACCUMULATORS_REST_PATH
+ .replace(":jobid", graph.getJobID().toString())
+ .replace(":vertexid", task.getJobVertexId().toString());
+ archive.add(new ArchivedJson(path, json));
+ }
+ return archive;
+ }
+ }
+
public static String createVertexAccumulatorsJson(AccessExecutionJobVertex jobVertex) throws IOException {
StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
index 6e7e47c..d9a1131 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
@@ -21,16 +21,22 @@ package org.apache.flink.runtime.webmonitor.handlers;
import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
import java.util.Map;
/**
@@ -58,6 +64,22 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler {
return createVertexDetailsJson(jobVertex, params.get("jobid"), fetcher);
}
+ public static class JobVertexDetailsJsonArchivist implements JsonArchivist {
+
+ @Override
+ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+ List<ArchivedJson> archive = new ArrayList<>();
+ for (AccessExecutionJobVertex task : graph.getAllVertices().values()) {
+ String json = createVertexDetailsJson(task, graph.getJobID().toString(), null);
+ String path = JOB_VERTEX_DETAILS_REST_PATH
+ .replace(":jobid", graph.getJobID().toString())
+ .replace(":vertexid", task.getJobVertexId().toString());
+ archive.add(new ArchivedJson(path, json));
+ }
+ return archive;
+ }
+ }
+
public static String createVertexDetailsJson(
AccessExecutionJobVertex jobVertex,
String jobID,
http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
index 4fa54bd..3878722 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
@@ -20,11 +20,14 @@ package org.apache.flink.runtime.webmonitor.handlers;
import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics;
@@ -32,6 +35,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.io.StringWriter;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -61,6 +65,22 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle
return createVertexDetailsByTaskManagerJson(jobVertex, params.get("jobid"), fetcher);
}
+ public static class JobVertexTaskManagersJsonArchivist implements JsonArchivist {
+
+ @Override
+ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+ List<ArchivedJson> archive = new ArrayList<>();
+ for (AccessExecutionJobVertex task : graph.getAllVertices().values()) {
+ String json = createVertexDetailsByTaskManagerJson(task, graph.getJobID().toString(), null);
+ String path = JOB_VERTEX_TASKMANAGERS_REST_PATH
+ .replace(":jobid", graph.getJobID().toString())
+ .replace(":vertexid", task.getJobVertexId().toString());
+ archive.add(new ArchivedJson(path, json));
+ }
+ return archive;
+ }
+ }
+
public static String createVertexDetailsByTaskManagerJson(
AccessExecutionJobVertex jobVertex,
String jobID,
http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
index a63016c..9026a22 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -21,10 +21,18 @@ package org.apache.flink.runtime.webmonitor.handlers;
import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import java.io.IOException;
import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
import java.util.Map;
/**
@@ -49,6 +57,38 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA
return createAttemptAccumulatorsJson(execAttempt);
}
+ public static class SubtaskExecutionAttemptAccumulatorsJsonArchivist implements JsonArchivist {
+
+ @Override
+ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+ List<ArchivedJson> archive = new ArrayList<>();
+ for (AccessExecutionJobVertex task : graph.getAllVertices().values()) {
+ for (AccessExecutionVertex subtask : task.getTaskVertices()) {
+ String curAttemptJson = createAttemptAccumulatorsJson(subtask.getCurrentExecutionAttempt());
+ String curAttemptPath = SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH
+ .replace(":jobid", graph.getJobID().toString())
+ .replace(":vertexid", task.getJobVertexId().toString())
+ .replace(":subtasknum", String.valueOf(subtask.getParallelSubtaskIndex()))
+ .replace(":attempt", String.valueOf(subtask.getCurrentExecutionAttempt().getAttemptNumber()));
+
+ archive.add(new ArchivedJson(curAttemptPath, curAttemptJson));
+
+ for (int x = 0; x < subtask.getCurrentExecutionAttempt().getAttemptNumber(); x++) {
+ AccessExecution attempt = subtask.getPriorExecutionAttempt(x);
+ String json = createAttemptAccumulatorsJson(attempt);
+ String path = SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH
+ .replace(":jobid", graph.getJobID().toString())
+ .replace(":vertexid", task.getJobVertexId().toString())
+ .replace(":subtasknum", String.valueOf(subtask.getParallelSubtaskIndex()))
+ .replace(":attempt", String.valueOf(attempt.getAttemptNumber()));
+ archive.add(new ArchivedJson(path, json));
+ }
+ }
+ }
+ return archive;
+ }
+ }
+
public static String createAttemptAccumulatorsJson(AccessExecution execAttempt) throws IOException {
StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
index 5af6af9..078f54a 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
@@ -22,16 +22,26 @@ import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
import java.util.Map;
+import static org.apache.flink.runtime.webmonitor.handlers.SubtaskCurrentAttemptDetailsHandler.SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH;
+
/**
* Request handler providing details about a single task execution attempt.
*/
@@ -56,6 +66,43 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp
return createAttemptDetailsJson(execAttempt, params.get("jobid"), params.get("vertexid"), fetcher);
}
+ public static class SubtaskExecutionAttemptDetailsJsonArchivist implements JsonArchivist {
+
+ @Override
+ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+ List<ArchivedJson> archive = new ArrayList<>();
+ for (AccessExecutionJobVertex task : graph.getAllVertices().values()) {
+ for (AccessExecutionVertex subtask : task.getTaskVertices()) {
+ String curAttemptJson = createAttemptDetailsJson(subtask.getCurrentExecutionAttempt(), graph.getJobID().toString(), task.getJobVertexId().toString(), null);
+ String curAttemptPath1 = SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH
+ .replace(":jobid", graph.getJobID().toString())
+ .replace(":vertexid", task.getJobVertexId().toString())
+ .replace(":subtasknum", String.valueOf(subtask.getParallelSubtaskIndex()));
+ String curAttemptPath2 = SUBTASK_ATTEMPT_DETAILS_REST_PATH
+ .replace(":jobid", graph.getJobID().toString())
+ .replace(":vertexid", task.getJobVertexId().toString())
+ .replace(":subtasknum", String.valueOf(subtask.getParallelSubtaskIndex()))
+ .replace(":attempt", String.valueOf(subtask.getCurrentExecutionAttempt().getAttemptNumber()));
+
+ archive.add(new ArchivedJson(curAttemptPath1, curAttemptJson));
+ archive.add(new ArchivedJson(curAttemptPath2, curAttemptJson));
+
+ for (int x = 0; x < subtask.getCurrentExecutionAttempt().getAttemptNumber(); x++) {
+ AccessExecution attempt = subtask.getPriorExecutionAttempt(x);
+ String json = createAttemptDetailsJson(attempt, graph.getJobID().toString(), task.getJobVertexId().toString(), null);
+ String path = SUBTASK_ATTEMPT_DETAILS_REST_PATH
+ .replace(":jobid", graph.getJobID().toString())
+ .replace(":vertexid", task.getJobVertexId().toString())
+ .replace(":subtasknum", String.valueOf(subtask.getParallelSubtaskIndex()))
+ .replace(":attempt", String.valueOf(attempt.getAttemptNumber()));
+ archive.add(new ArchivedJson(path, json));
+ }
+ }
+ }
+ return archive;
+ }
+ }
+
public static String createAttemptDetailsJson(
AccessExecution execAttempt,
String jobID,
http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
index 10a8773..6c3bc18 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
@@ -21,13 +21,19 @@ package org.apache.flink.runtime.webmonitor.handlers;
import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import java.io.IOException;
import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
import java.util.Map;
/**
@@ -51,6 +57,22 @@ public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHand
return createSubtasksAccumulatorsJson(jobVertex);
}
+ public static class SubtasksAllAccumulatorsJsonArchivist implements JsonArchivist {
+
+ @Override
+ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+ List<ArchivedJson> archive = new ArrayList<>();
+ for (AccessExecutionJobVertex task : graph.getAllVertices().values()) {
+ String json = createSubtasksAccumulatorsJson(task);
+ String path = SUBTASKS_ALL_ACCUMULATORS_REST_PATH
+ .replace(":jobid", graph.getJobID().toString())
+ .replace(":vertexid", task.getJobVertexId().toString());
+ archive.add(new ArchivedJson(path, json));
+ }
+ return archive;
+ }
+ }
+
public static String createSubtasksAccumulatorsJson(AccessExecutionJobVertex jobVertex) throws IOException {
StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
index 08bd722..adefa80 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
@@ -21,13 +21,19 @@ package org.apache.flink.runtime.webmonitor.handlers;
import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import java.io.IOException;
import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
import java.util.Map;
/**
@@ -52,6 +58,22 @@ public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler {
return createSubtaskTimesJson(jobVertex);
}
+ public static class SubtasksTimesJsonArchivist implements JsonArchivist {
+
+ @Override
+ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+ List<ArchivedJson> archive = new ArrayList<>();
+ for (AccessExecutionJobVertex task : graph.getAllVertices().values()) {
+ String json = createSubtaskTimesJson(task);
+ String path = SUBTASK_TIMES_REST_PATH
+ .replace(":jobid", graph.getJobID().toString())
+ .replace(":vertexid", task.getJobVertexId().toString());
+ archive.add(new ArchivedJson(path, json));
+ }
+ return archive;
+ }
+ }
+
public static String createSubtaskTimesJson(AccessExecutionJobVertex jobVertex) throws IOException {
final long now = System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
index 9976298..947b7c3 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
@@ -25,9 +25,13 @@ import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import java.io.IOException;
import java.io.StringWriter;
+import java.util.Collection;
+import java.util.Collections;
import java.util.Map;
/**
@@ -51,6 +55,17 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandle
return createCheckpointConfigJson(graph);
}
+ public static class CheckpointConfigJsonArchivist implements JsonArchivist {
+
+ @Override
+ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+ String json = createCheckpointConfigJson(graph);
+ String path = CHECKPOINT_CONFIG_REST_PATH
+ .replace(":jobid", graph.getJobID().toString());
+ return Collections.singletonList(new ArchivedJson(path, json));
+ }
+ }
+
private static String createCheckpointConfigJson(AccessExecutionGraph graph) throws IOException {
StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
index 4bbb8f6..16fd9bd 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
@@ -28,9 +29,15 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import java.io.IOException;
import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
/**
@@ -79,6 +86,27 @@ public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequest
return createCheckpointDetailsJson(checkpoint);
}
+ public static class CheckpointStatsDetailsJsonArchivist implements JsonArchivist {
+
+ @Override
+ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+ CheckpointStatsSnapshot stats = graph.getCheckpointStatsSnapshot();
+ if (stats == null) {
+ return Collections.emptyList();
+ }
+ CheckpointStatsHistory history = stats.getHistory();
+ List<ArchivedJson> archive = new ArrayList<>();
+ for (AbstractCheckpointStats checkpoint : history.getCheckpoints()) {
+ String json = createCheckpointDetailsJson(checkpoint);
+ String path = CHECKPOINT_STATS_DETAILS_REST_PATH
+ .replace(":jobid", graph.getJobID().toString())
+ .replace(":checkpointid", String.valueOf(checkpoint.getCheckpointId()));
+ archive.add(new ArchivedJson(path, json));
+ }
+ return archive;
+ }
+ }
+
public static String createCheckpointDetailsJson(AbstractCheckpointStats checkpoint) throws IOException {
StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
index b28ecef..bb39b2c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
@@ -31,9 +32,15 @@ import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
import org.apache.flink.runtime.webmonitor.handlers.AbstractJobVertexRequestHandler;
import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import java.io.IOException;
import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
import static org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsHandler.writeMinMaxAvg;
@@ -104,6 +111,30 @@ public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGrap
return createSubtaskCheckpointDetailsJson(checkpoint, taskStats);
}
+ public static class CheckpointStatsDetailsSubtasksJsonArchivist implements JsonArchivist {
+
+ @Override
+ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+ CheckpointStatsSnapshot stats = graph.getCheckpointStatsSnapshot();
+ if (stats == null) {
+ return Collections.emptyList();
+ }
+ CheckpointStatsHistory history = stats.getHistory();
+ List<ArchivedJson> archive = new ArrayList<>();
+ for (AbstractCheckpointStats checkpoint : history.getCheckpoints()) {
+ for (TaskStateStats subtaskStats : checkpoint.getAllTaskStateStats()) {
+ String json = createSubtaskCheckpointDetailsJson(checkpoint, subtaskStats);
+ String path = CHECKPOINT_STATS_DETAILS_SUBTASKS_REST_PATH
+ .replace(":jobid", graph.getJobID().toString())
+ .replace(":checkpointid", String.valueOf(checkpoint.getCheckpointId()))
+ .replace(":vertexid", subtaskStats.getJobVertexId().toString());
+ archive.add(new ArchivedJson(path, json));
+ }
+ }
+ return archive;
+ }
+ }
+
private static String createSubtaskCheckpointDetailsJson(AbstractCheckpointStats checkpoint, TaskStateStats taskStats) throws IOException {
StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
index 585ab26..f004888 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
@@ -32,10 +32,14 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.StringWriter;
+import java.util.Collection;
+import java.util.Collections;
import java.util.Map;
/**
@@ -59,6 +63,17 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
return createCheckpointStatsJson(graph);
}
+ public static class CheckpointStatsJsonArchivist implements JsonArchivist {
+
+ @Override
+ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+ String json = createCheckpointStatsJson(graph);
+ String path = CHECKPOINT_STATS_REST_PATH
+ .replace(":jobid", graph.getJobID().toString());
+ return Collections.singletonList(new ArchivedJson(path, json));
+ }
+ }
+
private static String createCheckpointStatsJson(AccessExecutionGraph graph) throws IOException {
StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
index caf6d8e..097961e 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
@@ -19,19 +19,47 @@ package org.apache.flink.runtime.webmonitor.handlers;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
import org.junit.Assert;
import org.junit.Test;
import scala.concurrent.duration.FiniteDuration;
+import java.io.IOException;
import java.io.StringWriter;
+import java.util.Collection;
import java.util.concurrent.TimeUnit;
public class CurrentJobsOverviewHandlerTest {
+
+ @Test
+ public void testArchiver() throws Exception {
+ JsonArchivist archivist = new CurrentJobsOverviewHandler.CurrentJobsOverviewJsonArchivist();
+ AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+ JobDetails expectedDetails = WebMonitorUtils.createDetailsForJob(originalJob);
+
+ Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+ Assert.assertEquals(1, archives.size());
+
+ ArchivedJson archive = archives.iterator().next();
+ Assert.assertEquals("/joboverview", archive.getPath());
+
+ JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(archive.getJson());
+ ArrayNode running = (ArrayNode) result.get("running");
+ Assert.assertEquals(0, running.size());
+
+ ArrayNode finished = (ArrayNode) result.get("finished");
+ Assert.assertEquals(1, finished.size());
+
+ compareJobOverview(expectedDetails, finished.get(0).toString());
+ }
+
@Test
public void testGetPaths() {
CurrentJobsOverviewHandler handlerAll = new CurrentJobsOverviewHandler(new FiniteDuration(0, TimeUnit.SECONDS), true, true);
@@ -58,8 +86,10 @@ public class CurrentJobsOverviewHandlerTest {
try (JsonGenerator gen = ArchivedJobGenerationUtils.jacksonFactory.createGenerator(writer)) {
CurrentJobsOverviewHandler.writeJobDetailOverviewAsJson(expectedDetails, gen, 0);
}
- String answer = writer.toString();
+ compareJobOverview(expectedDetails, writer.toString());
+ }
+ private static void compareJobOverview(JobDetails expectedDetails, String answer) throws IOException {
JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(answer);
Assert.assertEquals(expectedDetails.getJobId().toString(), result.get("jid").asText());
http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
index 34748b7..f8ea792 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
@@ -20,11 +20,30 @@ package org.apache.flink.runtime.webmonitor.handlers;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
import org.junit.Assert;
import org.junit.Test;
+import java.io.IOException;
+import java.util.Collection;
+
public class JobAccumulatorsHandlerTest {
+
+ @Test
+ public void testArchiver() throws Exception {
+ JsonArchivist archivist = new JobAccumulatorsHandler.JobAccumulatorsJsonArchivist();
+ AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+
+ Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+ Assert.assertEquals(1, archives.size());
+
+ ArchivedJson archive = archives.iterator().next();
+ Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/accumulators", archive.getPath());
+ compareAccumulators(originalJob, archive.getJson());
+ }
+
@Test
public void testGetPaths() {
JobAccumulatorsHandler handler = new JobAccumulatorsHandler(null);
@@ -38,6 +57,10 @@ public class JobAccumulatorsHandlerTest {
AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
String json = JobAccumulatorsHandler.createJobAccumulatorsJson(originalJob);
+ compareAccumulators(originalJob, json);
+ }
+
+ private static void compareAccumulators(AccessExecutionGraph originalJob, String json) throws IOException {
JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json);
ArrayNode accs = (ArrayNode) result.get("job-accumulators");
http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
index f304efe..f47b8ca 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
@@ -20,13 +20,31 @@ package org.apache.flink.runtime.webmonitor.handlers;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.api.common.ArchivedExecutionConfig;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
import org.junit.Assert;
import org.junit.Test;
+import java.io.IOException;
+import java.util.Collection;
import java.util.Map;
public class JobConfigHandlerTest {
+
+ @Test
+ public void testArchiver() throws Exception {
+ JsonArchivist archivist = new JobConfigHandler.JobConfigJsonArchivist();
+ AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+
+ Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+ Assert.assertEquals(1, archives.size());
+
+ ArchivedJson archive = archives.iterator().next();
+ Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/config", archive.getPath());
+ compareJobConfig(originalJob, archive.getJson());
+ }
+
@Test
public void testGetPaths() {
JobConfigHandler handler = new JobConfigHandler(null);
@@ -38,7 +56,10 @@ public class JobConfigHandlerTest {
public void testJsonGeneration() throws Exception {
AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
String answer = JobConfigHandler.createJobConfigJson(originalJob);
+ compareJobConfig(originalJob, answer);
+ }
+ private static void compareJobConfig(AccessExecutionGraph originalJob, String answer) throws IOException {
JsonNode job = ArchivedJobGenerationUtils.mapper.readTree(answer);
Assert.assertEquals(originalJob.getJobID().toString(), job.get("jid").asText());
http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
index 3f80d12..0c4fb7e 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
@@ -26,13 +26,37 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.executiongraph.IOMetrics;
import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
import org.junit.Assert;
import org.junit.Test;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
public class JobDetailsHandlerTest {
+
+ @Test
+ public void testArchiver() throws Exception {
+ JsonArchivist archivist = new JobDetailsHandler.JobDetailsJsonArchivist();
+ AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+
+ Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+ Assert.assertEquals(2, archives.size());
+
+ Iterator<ArchivedJson> iterator = archives.iterator();
+ ArchivedJson archive1 = iterator.next();
+ Assert.assertEquals("/jobs/" + originalJob.getJobID(), archive1.getPath());
+ compareJobDetails(originalJob, archive1.getJson());
+
+ ArchivedJson archive2 = iterator.next();
+ Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices", archive2.getPath());
+ compareJobDetails(originalJob, archive2.getJson());
+ }
+
@Test
public void testGetPaths() {
JobDetailsHandler handler = new JobDetailsHandler(null, null);
@@ -48,6 +72,10 @@ public class JobDetailsHandlerTest {
AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
String json = JobDetailsHandler.createJobDetailsJson(originalJob, null);
+ compareJobDetails(originalJob, json);
+ }
+
+ private static void compareJobDetails(AccessExecutionGraph originalJob, String json) throws IOException {
JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json);
Assert.assertEquals(originalJob.getJobID().toString(), result.get("jid").asText());
http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
index c86ce6a..c51053a 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
@@ -22,12 +22,31 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
import org.apache.flink.util.ExceptionUtils;
import org.junit.Assert;
import org.junit.Test;
+import java.io.IOException;
+import java.util.Collection;
+
public class JobExceptionsHandlerTest {
+
+ @Test
+ public void testArchiver() throws Exception {
+ JsonArchivist archivist = new JobExceptionsHandler.JobExceptionsJsonArchivist();
+ AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+
+ Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+ Assert.assertEquals(1, archives.size());
+
+ ArchivedJson archive = archives.iterator().next();
+ Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/exceptions", archive.getPath());
+ compareExceptions(originalJob, archive.getJson());
+ }
+
@Test
public void testGetPaths() {
JobExceptionsHandler handler = new JobExceptionsHandler(null);
@@ -41,6 +60,10 @@ public class JobExceptionsHandlerTest {
AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
String json = JobExceptionsHandler.createJobExceptionsJson(originalJob);
+ compareExceptions(originalJob, json);
+ }
+
+ private static void compareExceptions(AccessExecutionGraph originalJob, String json) throws IOException {
JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json);
Assert.assertEquals(originalJob.getFailureCauseAsString(), result.get("root-exception").asText());
http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
index 42808ed..2ef5bb9 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
@@ -17,10 +17,30 @@
*/
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
import org.junit.Assert;
import org.junit.Test;
+import java.util.Collection;
+
public class JobPlanHandlerTest {
+
+ @Test
+ public void testArchiver() throws Exception {
+ JsonArchivist archivist = new JobPlanHandler.JobPlanJsonArchivist();
+ AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+
+ Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+ Assert.assertEquals(1, archives.size());
+
+ ArchivedJson archive = archives.iterator().next();
+ Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/plan", archive.getPath());
+ Assert.assertEquals(originalJob.getJsonPlan(), archive.getJson());
+ }
+
@Test
public void testGetPaths() {
JobPlanHandler handler = new JobPlanHandler(null);
http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
index 03c1896..8c88da8 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
@@ -20,12 +20,33 @@ package org.apache.flink.runtime.webmonitor.handlers;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
import org.junit.Assert;
import org.junit.Test;
+import java.io.IOException;
+import java.util.Collection;
+
public class JobVertexAccumulatorsHandlerTest {
+
+ @Test
+ public void testArchiver() throws Exception {
+ JsonArchivist archivist = new JobVertexAccumulatorsHandler.JobVertexAccumulatorsJsonArchivist();
+ AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+ AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+
+ Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+ Assert.assertEquals(1, archives.size());
+
+ ArchivedJson archive = archives.iterator().next();
+ Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices/" + originalTask.getJobVertexId() + "/accumulators", archive.getPath());
+ compareAccumulators(originalTask, archive.getJson());
+ }
+
@Test
public void testGetPaths() {
JobVertexAccumulatorsHandler handler = new JobVertexAccumulatorsHandler(null);
@@ -39,6 +60,10 @@ public class JobVertexAccumulatorsHandlerTest {
AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
String json = JobVertexAccumulatorsHandler.createVertexAccumulatorsJson(originalTask);
+ compareAccumulators(originalTask, json);
+ }
+
+ private static void compareAccumulators(AccessExecutionJobVertex originalTask, String json) throws IOException {
JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json);
Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText());
http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
index e909c8c..0fae8b5 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
@@ -20,14 +20,35 @@ package org.apache.flink.runtime.webmonitor.handlers;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
import org.junit.Assert;
import org.junit.Test;
+import java.io.IOException;
+import java.util.Collection;
+
public class JobVertexDetailsHandlerTest {
+
+ @Test
+ public void testArchiver() throws Exception {
+ JsonArchivist archivist = new JobVertexDetailsHandler.JobVertexDetailsJsonArchivist();
+ AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+ AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+
+ Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+ Assert.assertEquals(1, archives.size());
+
+ ArchivedJson archive = archives.iterator().next();
+ Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices/" + originalTask.getJobVertexId(), archive.getPath());
+ compareVertexDetails(originalTask, archive.getJson());
+ }
+
@Test
public void testGetPaths() {
JobVertexDetailsHandler handler = new JobVertexDetailsHandler(null, null);
@@ -42,6 +63,10 @@ public class JobVertexDetailsHandlerTest {
String json = JobVertexDetailsHandler.createVertexDetailsJson(
originalTask, ArchivedJobGenerationUtils.getTestJob().getJobID().toString(), null);
+ compareVertexDetails(originalTask, json);
+ }
+
+ private static void compareVertexDetails(AccessExecutionJobVertex originalTask, String json) throws IOException {
JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json);
Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText());
http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
index 11e35e5..9271712 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
@@ -20,15 +20,37 @@ package org.apache.flink.runtime.webmonitor.handlers;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.executiongraph.IOMetrics;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
import org.junit.Assert;
import org.junit.Test;
+import java.io.IOException;
+import java.util.Collection;
+
public class JobVertexTaskManagersHandlerTest {
+
+ @Test
+ public void testArchiver() throws Exception {
+ JsonArchivist archivist = new JobVertexTaskManagersHandler.JobVertexTaskManagersJsonArchivist();
+ AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+ AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+ AccessExecutionVertex originalSubtask = ArchivedJobGenerationUtils.getTestSubtask();
+
+ Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+ Assert.assertEquals(1, archives.size());
+
+ ArchivedJson archive = archives.iterator().next();
+ Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices/" + originalTask.getJobVertexId() + "/taskmanagers", archive.getPath());
+ compareVertexTaskManagers(originalTask, originalSubtask, archive.getJson());
+ }
+
@Test
public void testGetPaths() {
JobVertexTaskManagersHandler handler = new JobVertexTaskManagersHandler(null, null);
@@ -44,6 +66,10 @@ public class JobVertexTaskManagersHandlerTest {
String json = JobVertexTaskManagersHandler.createVertexDetailsByTaskManagerJson(
originalTask, ArchivedJobGenerationUtils.getTestJob().getJobID().toString(), null);
+ compareVertexTaskManagers(originalTask, originalSubtask, json);
+ }
+
+ private static void compareVertexTaskManagers(AccessExecutionJobVertex originalTask, AccessExecutionVertex originalSubtask, String json) throws IOException {
JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json);
Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText());
http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
index 8d24bd0..5993d5c 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
@@ -20,11 +20,40 @@ package org.apache.flink.runtime.webmonitor.handlers;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
import org.junit.Assert;
import org.junit.Test;
+import java.io.IOException;
+import java.util.Collection;
+
public class SubtaskExecutionAttemptAccumulatorsHandlerTest {
+
+ @Test
+ public void testArchiver() throws Exception {
+ JsonArchivist archivist = new SubtaskExecutionAttemptAccumulatorsHandler.SubtaskExecutionAttemptAccumulatorsJsonArchivist();
+ AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+ AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+ AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt();
+
+ Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+ Assert.assertEquals(1, archives.size());
+
+ ArchivedJson archive = archives.iterator().next();
+ Assert.assertEquals(
+ "/jobs/" + originalJob.getJobID() +
+ "/vertices/" + originalTask.getJobVertexId() +
+ "/subtasks/" + originalAttempt.getParallelSubtaskIndex() +
+ "/attempts/" + originalAttempt.getAttemptNumber() +
+ "/accumulators",
+ archive.getPath());
+ compareAttemptAccumulators(originalAttempt, archive.getJson());
+ }
+
@Test
public void testGetPaths() {
SubtaskExecutionAttemptAccumulatorsHandler handler = new SubtaskExecutionAttemptAccumulatorsHandler(null);
@@ -38,6 +67,10 @@ public class SubtaskExecutionAttemptAccumulatorsHandlerTest {
AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt();
String json = SubtaskExecutionAttemptAccumulatorsHandler.createAttemptAccumulatorsJson(originalAttempt);
+ compareAttemptAccumulators(originalAttempt, json);
+ }
+
+ private static void compareAttemptAccumulators(AccessExecution originalAttempt, String json) throws IOException {
JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json);
Assert.assertEquals(originalAttempt.getParallelSubtaskIndex(), result.get("subtask").asInt());
http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
index 54f3f9c..f18858e 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
@@ -22,11 +22,47 @@ import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
import org.junit.Assert;
import org.junit.Test;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+
public class SubtaskExecutionAttemptDetailsHandlerTest {
+
+ @Test
+ public void testArchiver() throws Exception {
+ JsonArchivist archivist = new SubtaskExecutionAttemptDetailsHandler.SubtaskExecutionAttemptDetailsJsonArchivist();
+ AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+ AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+ AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt();
+
+ Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+ Assert.assertEquals(2, archives.size());
+
+ Iterator<ArchivedJson> iterator = archives.iterator();
+ ArchivedJson archive1 = iterator.next();
+ Assert.assertEquals(
+ "/jobs/" + originalJob.getJobID() +
+ "/vertices/" + originalTask.getJobVertexId() +
+ "/subtasks/" + originalAttempt.getParallelSubtaskIndex(),
+ archive1.getPath());
+ compareAttemptDetails(originalAttempt, archive1.getJson());
+
+ ArchivedJson archive2 = iterator.next();
+ Assert.assertEquals(
+ "/jobs/" + originalJob.getJobID() +
+ "/vertices/" + originalTask.getJobVertexId() +
+ "/subtasks/" + originalAttempt.getParallelSubtaskIndex() +
+ "/attempts/" + originalAttempt.getAttemptNumber(),
+ archive2.getPath());
+ compareAttemptDetails(originalAttempt, archive2.getJson());
+ }
+
@Test
public void testGetPaths() {
SubtaskExecutionAttemptDetailsHandler handler = new SubtaskExecutionAttemptDetailsHandler(null, null);
@@ -43,6 +79,10 @@ public class SubtaskExecutionAttemptDetailsHandlerTest {
String json = SubtaskExecutionAttemptDetailsHandler.createAttemptDetailsJson(
originalAttempt, originalJob.getJobID().toString(), originalTask.getJobVertexId().toString(), null);
+ compareAttemptDetails(originalAttempt, json);
+ }
+
+ private static void compareAttemptDetails(AccessExecution originalAttempt, String json) throws IOException {
JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json);
Assert.assertEquals(originalAttempt.getParallelSubtaskIndex(), result.get("subtask").asInt());
http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
index 954ebad..dfbe618 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
@@ -19,13 +19,35 @@ package org.apache.flink.runtime.webmonitor.handlers;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
import org.junit.Assert;
import org.junit.Test;
+import java.io.IOException;
+import java.util.Collection;
+
public class SubtasksAllAccumulatorsHandlerTest {
+
+ @Test
+ public void testArchiver() throws Exception {
+ JsonArchivist archivist = new SubtasksAllAccumulatorsHandler.SubtasksAllAccumulatorsJsonArchivist();
+ AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+ AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+
+ Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+ Assert.assertEquals(1, archives.size());
+
+ ArchivedJson archive = archives.iterator().next();
+ Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices/" + originalTask.getJobVertexId() +
+ "/subtasks/accumulators", archive.getPath());
+ compareSubtaskAccumulators(originalTask, archive.getJson());
+ }
+
@Test
public void testGetPaths() {
SubtasksAllAccumulatorsHandler handler = new SubtasksAllAccumulatorsHandler(null);
@@ -38,7 +60,10 @@ public class SubtasksAllAccumulatorsHandlerTest {
public void testJsonGeneration() throws Exception {
AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
String json = SubtasksAllAccumulatorsHandler.createSubtasksAccumulatorsJson(originalTask);
+ compareSubtaskAccumulators(originalTask, json);
+ }
+ private static void compareSubtaskAccumulators(AccessExecutionJobVertex originalTask, String json) throws IOException {
JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json);
Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText());