You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/08/13 18:14:56 UTC
[flink] branch release-1.5 updated: [FLINK-10066] Keep only
archived version of previous executions
This is an automated email from the ASF dual-hosted git repository.
srichter pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.5 by this push:
new 2217c09 [FLINK-10066] Keep only archived version of previous executions
2217c09 is described below
commit 2217c09c887c9c1f3d42eca77f7c6af90a4a438a
Author: Stefan Richter <s....@data-artisans.com>
AuthorDate: Mon Aug 6 10:12:51 2018 +0200
[FLINK-10066] Keep only archived version of previous executions
This closes #6500.
---
.../runtime/executiongraph/ArchivedExecution.java | 31 ++++---
.../executiongraph/ArchivedExecutionVertex.java | 15 +---
.../runtime/executiongraph/ExecutionVertex.java | 14 ++--
.../flink/runtime/util/EvictingBoundedList.java | 49 +----------
.../SubtaskCurrentAttemptDetailsHandlerTest.java | 3 +
...askExecutionAttemptAccumulatorsHandlerTest.java | 1 +
.../SubtaskExecutionAttemptDetailsHandlerTest.java | 1 +
.../legacy/utils/ArchivedExecutionBuilder.java | 8 ++
.../legacy/utils/ArchivedJobGenerationUtils.java | 3 +
.../runtime/util/EvictingBoundedListTest.java | 98 +---------------------
10 files changed, 51 insertions(+), 172 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
index 4b1c62f..ab8c94c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.executiongraph;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
@@ -40,6 +41,8 @@ public class ArchivedExecution implements AccessExecution, Serializable {
private final TaskManagerLocation assignedResourceLocation; // for the archived execution
+ private final AllocationID assignedAllocationID;
+
/* Continuously updated map of user-defined accumulators */
private final StringifiedAccumulatorResult[] userAccumulators;
@@ -48,21 +51,24 @@ public class ArchivedExecution implements AccessExecution, Serializable {
private final IOMetrics ioMetrics;
public ArchivedExecution(Execution execution) {
- this.userAccumulators = execution.getUserAccumulatorsStringified();
- this.attemptId = execution.getAttemptId();
- this.attemptNumber = execution.getAttemptNumber();
- this.stateTimestamps = execution.getStateTimestamps();
- this.parallelSubtaskIndex = execution.getVertex().getParallelSubtaskIndex();
- this.state = execution.getState();
- this.failureCause = ExceptionUtils.stringifyException(execution.getFailureCause());
- this.assignedResourceLocation = execution.getAssignedResourceLocation();
- this.ioMetrics = execution.getIOMetrics();
+ this(
+ execution.getUserAccumulatorsStringified(),
+ execution.getIOMetrics(),
+ execution.getAttemptId(),
+ execution.getAttemptNumber(),
+ execution.getState(),
+ ExceptionUtils.stringifyException(execution.getFailureCause()),
+ execution.getAssignedResourceLocation(),
+ execution.getAssignedAllocationID(),
+ execution.getVertex().getParallelSubtaskIndex(),
+ execution.getStateTimestamps());
}
public ArchivedExecution(
StringifiedAccumulatorResult[] userAccumulators, IOMetrics ioMetrics,
ExecutionAttemptID attemptId, int attemptNumber, ExecutionState state, String failureCause,
- TaskManagerLocation assignedResourceLocation, int parallelSubtaskIndex, long[] stateTimestamps) {
+ TaskManagerLocation assignedResourceLocation, AllocationID assignedAllocationID, int parallelSubtaskIndex,
+ long[] stateTimestamps) {
this.userAccumulators = userAccumulators;
this.ioMetrics = ioMetrics;
this.failureCause = failureCause;
@@ -72,6 +78,7 @@ public class ArchivedExecution implements AccessExecution, Serializable {
this.state = state;
this.stateTimestamps = stateTimestamps;
this.parallelSubtaskIndex = parallelSubtaskIndex;
+ this.assignedAllocationID = assignedAllocationID;
}
// --------------------------------------------------------------------------------------------
@@ -103,6 +110,10 @@ public class ArchivedExecution implements AccessExecution, Serializable {
return assignedResourceLocation;
}
+ public AllocationID getAssignedAllocationID() {
+ return assignedAllocationID;
+ }
+
@Override
public String getFailureCauseAsString() {
return failureCause;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
index 36669d3..04efa04 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
@@ -40,7 +40,7 @@ public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializa
public ArchivedExecutionVertex(ExecutionVertex vertex) {
this.subTaskIndex = vertex.getParallelSubtaskIndex();
- this.priorExecutions = vertex.getCopyOfPriorExecutionsList().map(ARCHIVER);
+ this.priorExecutions = vertex.getCopyOfPriorExecutionsList();
this.taskNameWithSubtask = vertex.getTaskNameWithSubtaskIndex();
this.currentExecution = vertex.getCurrentExecutionAttempt().archive();
}
@@ -101,17 +101,4 @@ public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializa
throw new IllegalArgumentException("attempt does not exist");
}
}
-
- // ------------------------------------------------------------------------
- // utilities
- // ------------------------------------------------------------------------
-
- private static final EvictingBoundedList.Function<Execution, ArchivedExecution> ARCHIVER =
- new EvictingBoundedList.Function<Execution, ArchivedExecution>() {
-
- @Override
- public ArchivedExecution apply(Execution value) {
- return value.archive();
- }
- };
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index e385318..e422801 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -92,7 +92,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
private final int subTaskIndex;
- private final EvictingBoundedList<Execution> priorExecutions;
+ private final EvictingBoundedList<ArchivedExecution> priorExecutions;
private final Time timeout;
@@ -287,7 +287,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
}
@Override
- public Execution getPriorExecutionAttempt(int attemptNumber) {
+ public ArchivedExecution getPriorExecutionAttempt(int attemptNumber) {
synchronized (priorExecutions) {
if (attemptNumber >= 0 && attemptNumber < priorExecutions.size()) {
return priorExecutions.get(attemptNumber);
@@ -297,7 +297,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
}
}
- public Execution getLatestPriorExecution() {
+ public ArchivedExecution getLatestPriorExecution() {
synchronized (priorExecutions) {
final int size = priorExecutions.size();
if (size > 0) {
@@ -316,16 +316,16 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
* @return The latest prior execution location, or null, if there is none, yet.
*/
public TaskManagerLocation getLatestPriorLocation() {
- Execution latestPriorExecution = getLatestPriorExecution();
+ ArchivedExecution latestPriorExecution = getLatestPriorExecution();
return latestPriorExecution != null ? latestPriorExecution.getAssignedResourceLocation() : null;
}
public AllocationID getLatestPriorAllocation() {
- Execution latestPriorExecution = getLatestPriorExecution();
+ ArchivedExecution latestPriorExecution = getLatestPriorExecution();
return latestPriorExecution != null ? latestPriorExecution.getAssignedAllocationID() : null;
}
- EvictingBoundedList<Execution> getCopyOfPriorExecutionsList() {
+ EvictingBoundedList<ArchivedExecution> getCopyOfPriorExecutionsList() {
synchronized (priorExecutions) {
return new EvictingBoundedList<>(priorExecutions);
}
@@ -577,7 +577,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
final ExecutionState oldState = oldExecution.getState();
if (oldState.isTerminal()) {
- priorExecutions.add(oldExecution);
+ priorExecutions.add(oldExecution.archive());
final Execution newExecution = new Execution(
getExecutionGraph().getFutureExecutor(),
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
index 2c5b6a9..9a529f3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.util;
import org.apache.flink.util.Preconditions;
+import javax.annotation.Nonnull;
+
import java.io.Serializable;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
@@ -146,6 +148,7 @@ public class EvictingBoundedList<T> implements Iterable<T>, Serializable {
return (T) elements[arrayIndex];
}
+ @Nonnull
@Override
public Iterator<T> iterator() {
return new Iterator<T>() {
@@ -176,50 +179,4 @@ public class EvictingBoundedList<T> implements Iterable<T>, Serializable {
}
};
}
-
- /**
- * Creates a new list that replaces its elements with transformed elements.
- * The list retains the same size and position-to-element mapping.
- *
- * <p>Note that null values are automatically mapped to null values.
- *
- * @param transform The function used to transform each element
- * @param <R> The type of the elements in the result list.
- *
- * @return The list with the mapped elements
- */
- public <R> EvictingBoundedList<R> map(Function<T, R> transform) {
- // map the default element
- final R newDefault = defaultElement == null ? null : transform.apply(defaultElement);
-
- // copy the list with the new default
- final EvictingBoundedList<R> result = new EvictingBoundedList<>(elements.length, newDefault);
- result.count = count;
- result.idx = idx;
-
- // map all the entries in the list
- final int numElements = Math.min(elements.length, count);
- for (int i = 0; i < numElements; i++) {
- result.elements[i] = transform.apply(accessInternal(i));
- }
-
- return result;
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * A simple unary function that can be used to transform elements via the
- * {@link EvictingBoundedList#map(Function)} method.
- */
- public interface Function<I, O> {
-
- /**
- * Transforms the value.
- *
- * @param value The value to transform
- * @return The transformed value
- */
- O apply(I value);
- }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
index af8b995..99dcbf7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ArchivedExecution;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
@@ -93,6 +94,7 @@ public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger {
timestamps[expectedState.ordinal()] = finishedTs;
final LocalTaskManagerLocation assignedResourceLocation = new LocalTaskManagerLocation();
+ final AllocationID allocationID = new AllocationID();
final int subtaskIndex = 1;
final int attempt = 2;
@@ -104,6 +106,7 @@ public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger {
expectedState,
null,
assignedResourceLocation,
+ allocationID,
subtaskIndex,
timestamps);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
index df4ff04..1099e4e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
@@ -99,6 +99,7 @@ public class SubtaskExecutionAttemptAccumulatorsHandlerTest extends TestLogger {
ExecutionState.FINISHED,
null,
null,
+ null,
subtaskIndex,
new long[ExecutionState.values().length]);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
index 8e44c0e..22a7d77 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
@@ -104,6 +104,7 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger {
expectedState,
null,
null,
+ null,
subtaskIndex,
new long[ExecutionState.values().length]),
new EvictingBoundedList<>(0)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionBuilder.java
index ad5cd6b..38c1d7e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionBuilder.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest.handler.legacy.utils;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ArchivedExecution;
@@ -42,6 +43,7 @@ public class ArchivedExecutionBuilder {
private ExecutionState state;
private String failureCause;
private TaskManagerLocation assignedResourceLocation;
+ private AllocationID assignedAllocationID;
private StringifiedAccumulatorResult[] userAccumulators;
private IOMetrics ioMetrics;
private int parallelSubtaskIndex;
@@ -77,6 +79,11 @@ public class ArchivedExecutionBuilder {
return this;
}
+ public ArchivedExecutionBuilder setAssignedAllocationID(AllocationID assignedAllocationID) {
+ this.assignedAllocationID = assignedAllocationID;
+ return this;
+ }
+
public ArchivedExecutionBuilder setUserAccumulators(StringifiedAccumulatorResult[] userAccumulators) {
this.userAccumulators = userAccumulators;
return this;
@@ -101,6 +108,7 @@ public class ArchivedExecutionBuilder {
state != null ? state : ExecutionState.FINISHED,
failureCause != null ? failureCause : "(null)",
assignedResourceLocation != null ? assignedResourceLocation : new TaskManagerLocation(new ResourceID("tm"), InetAddress.getLocalHost(), 1234),
+ assignedAllocationID != null ? assignedAllocationID : new AllocationID(0L, 0L),
parallelSubtaskIndex,
stateTimestamps != null ? stateTimestamps : new long[]{1, 2, 3, 4, 5, 5, 5, 5}
);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java
index 92b0d8a..caf0fed 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.rest.handler.legacy.utils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecution;
@@ -108,11 +109,13 @@ public class ArchivedJobGenerationUtils {
StringifiedAccumulatorResult acc1 = new StringifiedAccumulatorResult("name1", "type1", "value1");
StringifiedAccumulatorResult acc2 = new StringifiedAccumulatorResult("name2", "type2", "value2");
TaskManagerLocation location = new TaskManagerLocation(new ResourceID("hello"), InetAddress.getLocalHost(), 1234);
+ AllocationID allocationID = new AllocationID(42L, 43L);
originalAttempt = new ArchivedExecutionBuilder()
.setStateTimestamps(new long[]{1, 2, 3, 4, 5, 6, 7, 8, 9})
.setParallelSubtaskIndex(1)
.setAttemptNumber(0)
.setAssignedResourceLocation(location)
+ .setAssignedAllocationID(allocationID)
.setUserAccumulators(new StringifiedAccumulatorResult[]{acc1, acc2})
.setState(ExecutionState.FINISHED)
.setFailureCause("attemptException")
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
index 7109dac..40bd021 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
@@ -28,12 +28,12 @@ import java.util.NoSuchElementException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
+/**
+ * Tests for {@link EvictingBoundedList}.
+ */
public class EvictingBoundedListTest {
@Test
@@ -164,96 +164,4 @@ public class EvictingBoundedListTest {
}
}
-
- @Test
- public void testMapWithHalfFullList() {
- final Object[] originals = { new Object(), new Object(), new Object() };
- final Object defaultValue = new Object();
-
- final EvictingBoundedList<Object> original = new EvictingBoundedList<>(5, defaultValue);
- for (Object o : originals) {
- original.add(o);
- }
-
- final EvictingBoundedList<TransformedObject> transformed = original.map(new Mapper());
-
- assertEquals(original.size(), transformed.size());
- assertEquals(original.getSizeLimit(), transformed.getSizeLimit());
- assertEquals(defaultValue, transformed.getDefaultElement().original);
-
- int i = 0;
- for (TransformedObject to : transformed) {
- assertEquals(originals[i++], to.original);
- }
-
- try {
- transformed.get(originals.length);
- fail("should have failed with an exception");
- } catch (IndexOutOfBoundsException e) {
- // expected
- }
- }
-
- @Test
- public void testMapWithEvictedElements() {
- final Object[] originals = { new Object(), new Object(), new Object(), new Object(), new Object() };
- final Object defaultValue = new Object();
-
- final EvictingBoundedList<Object> original = new EvictingBoundedList<>(2, defaultValue);
- for (Object o : originals) {
- original.add(o);
- }
-
- final EvictingBoundedList<TransformedObject> transformed = original.map(new Mapper());
-
- assertEquals(originals.length, transformed.size());
- assertEquals(original.size(), transformed.size());
- assertEquals(original.getSizeLimit(), transformed.getSizeLimit());
- assertEquals(defaultValue, transformed.getDefaultElement().original);
-
- for (int i = 0; i < originals.length; i++) {
- if (i < originals.length - transformed.getSizeLimit()) {
- assertEquals(transformed.getDefaultElement(), transformed.get(i));
- } else {
- assertEquals(originals[i], transformed.get(i).original);
- }
- }
-
- try {
- transformed.get(originals.length);
- fail("should have failed with an exception");
- } catch (IndexOutOfBoundsException e) {
- // expected
- }
- }
-
- @Test
- public void testMapWithNullDefault() {
- final EvictingBoundedList<Object> original = new EvictingBoundedList<>(5, null);
- final EvictingBoundedList<TransformedObject> transformed = original.map(new Mapper());
-
- assertEquals(original.size(), transformed.size());
- assertNull(transformed.getDefaultElement());
- }
-
- // ------------------------------------------------------------------------
-
- private static final class TransformedObject {
-
- final Object original;
-
- TransformedObject(Object original) {
- this.original = checkNotNull(original);
- }
- }
-
- // ------------------------------------------------------------------------
-
- private static final class Mapper implements EvictingBoundedList.Function<Object, TransformedObject> {
-
- @Override
- public TransformedObject apply(Object value) {
- return new TransformedObject(value);
- }
- }
}