You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/03 12:37:29 UTC
[3/6] flink git commit: [hotfix] [jobmanager] Reduce complexits when
archiving ExecutionVertex
[hotfix] [jobmanager] Reduce complexits when archiving ExecutionVertex
This fixes the inefficiency where the archiving operation iterated over the entire
evicted history of prior execution attempts when converting them to
archived executions.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4820b413
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4820b413
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4820b413
Branch: refs/heads/master
Commit: 4820b413a55a3bbb1853251ecdb94b4c70dc5e2b
Parents: 10e4e32
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jan 31 19:52:57 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 3 10:28:23 2017 +0100
----------------------------------------------------------------------
.../executiongraph/ArchivedExecutionVertex.java | 22 ++++-
.../flink/runtime/util/EvictingBoundedList.java | 73 ++++++++++++++-
.../runtime/util/EvictingBoundedListTest.java | 97 +++++++++++++++++++-
3 files changed, 182 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4820b413/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
index 56fc7a6..5053cae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
@@ -26,6 +26,7 @@ import java.io.Serializable;
public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializable {
private static final long serialVersionUID = -6708241535015028576L;
+
private final int subTaskIndex;
private final EvictingBoundedList<ArchivedExecution> priorExecutions;
@@ -35,13 +36,11 @@ public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializa
private final ArchivedExecution currentExecution; // this field must never be null
+ // ------------------------------------------------------------------------
+
public ArchivedExecutionVertex(ExecutionVertex vertex) {
this.subTaskIndex = vertex.getParallelSubtaskIndex();
- EvictingBoundedList<Execution> copyOfPriorExecutionsList = vertex.getCopyOfPriorExecutionsList();
- priorExecutions = new EvictingBoundedList<>(copyOfPriorExecutionsList.getSizeLimit());
- for (Execution priorExecution : copyOfPriorExecutionsList) {
- priorExecutions.add(priorExecution != null ? priorExecution.archive() : null);
- }
+ this.priorExecutions = vertex.getCopyOfPriorExecutionsList().map(ARCHIVER);
this.taskNameWithSubtask = vertex.getTaskNameWithSubtaskIndex();
this.currentExecution = vertex.getCurrentExecutionAttempt().archive();
}
@@ -93,4 +92,17 @@ public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializa
throw new IllegalArgumentException("attempt does not exist");
}
}
+
+ // ------------------------------------------------------------------------
+ // utilities
+ // ------------------------------------------------------------------------
+
+ private static final EvictingBoundedList.Function<Execution, ArchivedExecution> ARCHIVER =
+ new EvictingBoundedList.Function<Execution, ArchivedExecution>() {
+
+ @Override
+ public ArchivedExecution apply(Execution value) {
+ return value.archive();
+ }
+ };
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4820b413/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
index f4c155a..2c5b6a9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
@@ -29,8 +29,9 @@ import java.util.NoSuchElementException;
* This class implements a list (array based) that is physically bounded in maximum size, but can virtually grow beyond
* the bounded size. When the list grows beyond the size bound, elements are dropped from the head of the list (FIFO
* order). If dropped elements are accessed, a default element is returned instead.
- * <p>
- * TODO this class could eventually implement the whole actual List interface.
+ *
+ * <p>The list by itself is serializable, but a full list can only be serialized if the values
+ * are also serializable.
*
* @param <T> type of the list elements
*/
@@ -38,12 +39,25 @@ public class EvictingBoundedList<T> implements Iterable<T>, Serializable {
private static final long serialVersionUID = -1863961980953613146L;
+ @SuppressWarnings("NonSerializableFieldInSerializableClass")
+ /** the default element returned for positions that were evicted */
private final T defaultElement;
+
+ @SuppressWarnings("NonSerializableFieldInSerializableClass")
+ /** the array (viewed as a circular buffer) that holds the latest (= non-evicted) elements */
private final Object[] elements;
+
+ /** The next index to put an element in the array */
private int idx;
+
+ /** The current number of (virtual) elements in the list */
private int count;
+
+ /** Modification count for fail-fast iterators */
private long modCount;
+ // ------------------------------------------------------------------------
+
public EvictingBoundedList(int sizeLimit) {
this(sizeLimit, null);
}
@@ -65,6 +79,8 @@ public class EvictingBoundedList<T> implements Iterable<T>, Serializable {
this.modCount = 0L;
}
+ // ------------------------------------------------------------------------
+
public int size() {
return count;
}
@@ -93,8 +109,11 @@ public class EvictingBoundedList<T> implements Iterable<T>, Serializable {
}
public T get(int index) {
- Preconditions.checkArgument(index >= 0 && index < count);
- return isDroppedIndex(index) ? getDefaultElement() : accessInternal(index % elements.length);
+ if (index >= 0 && index < count) {
+ return isDroppedIndex(index) ? getDefaultElement() : accessInternal(index % elements.length);
+ } else {
+ throw new IndexOutOfBoundsException(String.valueOf(index));
+ }
}
public int getSizeLimit() {
@@ -157,4 +176,50 @@ public class EvictingBoundedList<T> implements Iterable<T>, Serializable {
}
};
}
+
+ /**
+ * Creates a new list that replaces its elements with transformed elements.
+ * The list retains the same size and position-to-element mapping.
+ *
+ * <p>Note that null values are automatically mapped to null values.
+ *
+ * @param transform The function used to transform each element
+ * @param <R> The type of the elements in the result list.
+ *
+ * @return The list with the mapped elements
+ */
+ public <R> EvictingBoundedList<R> map(Function<T, R> transform) {
+ // map the default element
+ final R newDefault = defaultElement == null ? null : transform.apply(defaultElement);
+
+ // copy the list with the new default
+ final EvictingBoundedList<R> result = new EvictingBoundedList<>(elements.length, newDefault);
+ result.count = count;
+ result.idx = idx;
+
+ // map all the entries in the list
+ final int numElements = Math.min(elements.length, count);
+ for (int i = 0; i < numElements; i++) {
+ result.elements[i] = transform.apply(accessInternal(i));
+ }
+
+ return result;
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * A simple unary function that can be used to transform elements via the
+ * {@link EvictingBoundedList#map(Function)} method.
+ */
+ public interface Function<I, O> {
+
+ /**
+ * Transforms the value.
+ *
+ * @param value The value to transform
+ * @return The transformed value
+ */
+ O apply(I value);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4820b413/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
index e0a1c70..7109dac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
@@ -28,9 +28,12 @@ import java.util.NoSuchElementException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
public class EvictingBoundedListTest {
@Test
@@ -114,7 +117,7 @@ public class EvictingBoundedListTest {
try {
list.get(0);
fail();
- } catch (IllegalArgumentException ignore) {
+ } catch (IndexOutOfBoundsException ignore) {
}
}
@@ -161,4 +164,96 @@ public class EvictingBoundedListTest {
}
}
+
+ @Test
+ public void testMapWithHalfFullList() {
+ final Object[] originals = { new Object(), new Object(), new Object() };
+ final Object defaultValue = new Object();
+
+ final EvictingBoundedList<Object> original = new EvictingBoundedList<>(5, defaultValue);
+ for (Object o : originals) {
+ original.add(o);
+ }
+
+ final EvictingBoundedList<TransformedObject> transformed = original.map(new Mapper());
+
+ assertEquals(original.size(), transformed.size());
+ assertEquals(original.getSizeLimit(), transformed.getSizeLimit());
+ assertEquals(defaultValue, transformed.getDefaultElement().original);
+
+ int i = 0;
+ for (TransformedObject to : transformed) {
+ assertEquals(originals[i++], to.original);
+ }
+
+ try {
+ transformed.get(originals.length);
+ fail("should have failed with an exception");
+ } catch (IndexOutOfBoundsException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testMapWithEvictedElements() {
+ final Object[] originals = { new Object(), new Object(), new Object(), new Object(), new Object() };
+ final Object defaultValue = new Object();
+
+ final EvictingBoundedList<Object> original = new EvictingBoundedList<>(2, defaultValue);
+ for (Object o : originals) {
+ original.add(o);
+ }
+
+ final EvictingBoundedList<TransformedObject> transformed = original.map(new Mapper());
+
+ assertEquals(originals.length, transformed.size());
+ assertEquals(original.size(), transformed.size());
+ assertEquals(original.getSizeLimit(), transformed.getSizeLimit());
+ assertEquals(defaultValue, transformed.getDefaultElement().original);
+
+ for (int i = 0; i < originals.length; i++) {
+ if (i < originals.length - transformed.getSizeLimit()) {
+ assertEquals(transformed.getDefaultElement(), transformed.get(i));
+ } else {
+ assertEquals(originals[i], transformed.get(i).original);
+ }
+ }
+
+ try {
+ transformed.get(originals.length);
+ fail("should have failed with an exception");
+ } catch (IndexOutOfBoundsException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testMapWithNullDefault() {
+ final EvictingBoundedList<Object> original = new EvictingBoundedList<>(5, null);
+ final EvictingBoundedList<TransformedObject> transformed = original.map(new Mapper());
+
+ assertEquals(original.size(), transformed.size());
+ assertNull(transformed.getDefaultElement());
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static final class TransformedObject {
+
+ final Object original;
+
+ TransformedObject(Object original) {
+ this.original = checkNotNull(original);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static final class Mapper implements EvictingBoundedList.Function<Object, TransformedObject> {
+
+ @Override
+ public TransformedObject apply(Object value) {
+ return new TransformedObject(value);
+ }
+ }
}