You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/03 12:37:29 UTC

[3/6] flink git commit: [hotfix] [jobmanager] Reduce complexits when archiving ExecutionVertex

[hotfix] [jobmanager] Reduce complexits when archiving ExecutionVertex

This fixes the inefficiency where the archiving operation iterated over the entire
evicted history of prior execution attempts when converting them to
archived executions.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4820b413
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4820b413
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4820b413

Branch: refs/heads/master
Commit: 4820b413a55a3bbb1853251ecdb94b4c70dc5e2b
Parents: 10e4e32
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jan 31 19:52:57 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 3 10:28:23 2017 +0100

----------------------------------------------------------------------
 .../executiongraph/ArchivedExecutionVertex.java | 22 ++++-
 .../flink/runtime/util/EvictingBoundedList.java | 73 ++++++++++++++-
 .../runtime/util/EvictingBoundedListTest.java   | 97 +++++++++++++++++++-
 3 files changed, 182 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4820b413/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
index 56fc7a6..5053cae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
@@ -26,6 +26,7 @@ import java.io.Serializable;
 public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializable {
 
 	private static final long serialVersionUID = -6708241535015028576L;
+
 	private final int subTaskIndex;
 
 	private final EvictingBoundedList<ArchivedExecution> priorExecutions;
@@ -35,13 +36,11 @@ public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializa
 
 	private final ArchivedExecution currentExecution;    // this field must never be null
 
+	// ------------------------------------------------------------------------
+
 	public ArchivedExecutionVertex(ExecutionVertex vertex) {
 		this.subTaskIndex = vertex.getParallelSubtaskIndex();
-		EvictingBoundedList<Execution> copyOfPriorExecutionsList = vertex.getCopyOfPriorExecutionsList();
-		priorExecutions = new EvictingBoundedList<>(copyOfPriorExecutionsList.getSizeLimit());
-		for (Execution priorExecution : copyOfPriorExecutionsList) {
-			priorExecutions.add(priorExecution != null ? priorExecution.archive() : null);
-		}
+		this.priorExecutions = vertex.getCopyOfPriorExecutionsList().map(ARCHIVER);
 		this.taskNameWithSubtask = vertex.getTaskNameWithSubtaskIndex();
 		this.currentExecution = vertex.getCurrentExecutionAttempt().archive();
 	}
@@ -93,4 +92,17 @@ public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializa
 			throw new IllegalArgumentException("attempt does not exist");
 		}
 	}
+
+	// ------------------------------------------------------------------------
+	//  utilities
+	// ------------------------------------------------------------------------
+
+	private static final EvictingBoundedList.Function<Execution, ArchivedExecution> ARCHIVER =
+			new EvictingBoundedList.Function<Execution, ArchivedExecution>() {
+
+		@Override
+		public ArchivedExecution apply(Execution value) {
+			return value.archive();
+		}
+	};
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4820b413/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
index f4c155a..2c5b6a9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
@@ -29,8 +29,9 @@ import java.util.NoSuchElementException;
  * This class implements a list (array based) that is physically bounded in maximum size, but can virtually grow beyond
  * the bounded size. When the list grows beyond the size bound, elements are dropped from the head of the list (FIFO
  * order). If dropped elements are accessed, a default element is returned instead.
- * <p>
- * TODO this class could eventually implement the whole actual List interface.
+ * 
+ * <p>The list by itself is serializable, but a full list can only be serialized if the values
+ * are also serializable.
  *
  * @param <T> type of the list elements
  */
@@ -38,12 +39,25 @@ public class EvictingBoundedList<T> implements Iterable<T>, Serializable {
 
 	private static final long serialVersionUID = -1863961980953613146L;
 
+	@SuppressWarnings("NonSerializableFieldInSerializableClass")
+	/** the default element returned for positions that were evicted */
 	private final T defaultElement;
+
+	@SuppressWarnings("NonSerializableFieldInSerializableClass")
+	/** the array (viewed as a circular buffer) that holds the latest (= non-evicted) elements */
 	private final Object[] elements;
+
+	/** The next index to put an element in the array */
 	private int idx;
+
+	/** The current number of (virtual) elements in the list */
 	private int count;
+
+	/** Modification count for fail-fast iterators */
 	private long modCount;
 
+	// ------------------------------------------------------------------------
+
 	public EvictingBoundedList(int sizeLimit) {
 		this(sizeLimit, null);
 	}
@@ -65,6 +79,8 @@ public class EvictingBoundedList<T> implements Iterable<T>, Serializable {
 		this.modCount = 0L;
 	}
 
+	// ------------------------------------------------------------------------
+
 	public int size() {
 		return count;
 	}
@@ -93,8 +109,11 @@ public class EvictingBoundedList<T> implements Iterable<T>, Serializable {
 	}
 
 	public T get(int index) {
-		Preconditions.checkArgument(index >= 0 && index < count);
-		return isDroppedIndex(index) ? getDefaultElement() : accessInternal(index % elements.length);
+		if (index >= 0 && index < count) {
+			return isDroppedIndex(index) ? getDefaultElement() : accessInternal(index % elements.length);
+		} else {
+			throw new IndexOutOfBoundsException(String.valueOf(index));
+		}
 	}
 
 	public int getSizeLimit() {
@@ -157,4 +176,50 @@ public class EvictingBoundedList<T> implements Iterable<T>, Serializable {
 			}
 		};
 	}
+
+	/**
+	 * Creates a new list that replaces its elements with transformed elements.
+	 * The list retains the same size and position-to-element mapping.
+	 * 
+	 * <p>Note that null values are automatically mapped to null values.
+	 * 
+	 * @param transform The function used to transform each element
+	 * @param <R> The type of the elements in the result list.
+	 * 
+	 * @return The list with the mapped elements
+	 */
+	public <R> EvictingBoundedList<R> map(Function<T, R> transform) {
+		// map the default element
+		final R newDefault = defaultElement == null ? null : transform.apply(defaultElement);
+
+		// copy the list with the new default
+		final EvictingBoundedList<R> result = new EvictingBoundedList<>(elements.length, newDefault);
+		result.count = count;
+		result.idx = idx;
+
+		// map all the entries in the list
+		final int numElements = Math.min(elements.length, count);
+		for (int i = 0; i < numElements; i++) {
+			result.elements[i] = transform.apply(accessInternal(i));
+		}
+
+		return result;
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A simple unary function that can be used to transform elements via the
+	 * {@link EvictingBoundedList#map(Function)} method.
+	 */
+	public interface Function<I, O> {
+
+		/**
+		 * Transforms the value.
+		 * 
+		 * @param value The value to transform
+		 * @return The transformed value
+		 */
+		O apply(I value);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4820b413/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
index e0a1c70..7109dac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
@@ -28,9 +28,12 @@ import java.util.NoSuchElementException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 public class EvictingBoundedListTest {
 
 	@Test
@@ -114,7 +117,7 @@ public class EvictingBoundedListTest {
 		try {
 			list.get(0);
 			fail();
-		} catch (IllegalArgumentException ignore) {
+		} catch (IndexOutOfBoundsException ignore) {
 		}
 	}
 
@@ -161,4 +164,96 @@ public class EvictingBoundedListTest {
 
 		}
 	}
+
+	@Test
+	public void testMapWithHalfFullList() {
+		final Object[] originals = { new Object(), new Object(), new Object() };
+		final Object defaultValue = new Object();
+
+		final EvictingBoundedList<Object> original = new EvictingBoundedList<>(5, defaultValue);
+		for (Object o : originals) {
+			original.add(o);
+		}
+
+		final EvictingBoundedList<TransformedObject> transformed = original.map(new Mapper());
+
+		assertEquals(original.size(), transformed.size());
+		assertEquals(original.getSizeLimit(), transformed.getSizeLimit());
+		assertEquals(defaultValue, transformed.getDefaultElement().original);
+
+		int i = 0;
+		for (TransformedObject to : transformed) {
+			assertEquals(originals[i++], to.original); 
+		}
+
+		try {
+			transformed.get(originals.length);
+			fail("should have failed with an exception");
+		} catch (IndexOutOfBoundsException e) {
+			// expected
+		}
+	}
+
+	@Test
+	public void testMapWithEvictedElements() {
+		final Object[] originals = { new Object(), new Object(), new Object(), new Object(), new Object() };
+		final Object defaultValue = new Object();
+
+		final EvictingBoundedList<Object> original = new EvictingBoundedList<>(2, defaultValue);
+		for (Object o : originals) {
+			original.add(o);
+		}
+
+		final EvictingBoundedList<TransformedObject> transformed = original.map(new Mapper());
+
+		assertEquals(originals.length, transformed.size());
+		assertEquals(original.size(), transformed.size());
+		assertEquals(original.getSizeLimit(), transformed.getSizeLimit());
+		assertEquals(defaultValue, transformed.getDefaultElement().original);
+
+		for (int i = 0; i < originals.length; i++) {
+			if (i < originals.length - transformed.getSizeLimit()) {
+				assertEquals(transformed.getDefaultElement(), transformed.get(i));
+			} else {
+				assertEquals(originals[i], transformed.get(i).original);
+			}
+		}
+
+		try {
+			transformed.get(originals.length);
+			fail("should have failed with an exception");
+		} catch (IndexOutOfBoundsException e) {
+			// expected
+		}
+	}
+
+	@Test
+	public void testMapWithNullDefault() {
+		final EvictingBoundedList<Object> original = new EvictingBoundedList<>(5, null);
+		final EvictingBoundedList<TransformedObject> transformed = original.map(new Mapper());
+
+		assertEquals(original.size(), transformed.size());
+		assertNull(transformed.getDefaultElement());
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class TransformedObject {
+
+		final Object original;
+
+		TransformedObject(Object original) {
+			this.original = checkNotNull(original);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class Mapper implements EvictingBoundedList.Function<Object, TransformedObject> {
+
+		@Override
+		public TransformedObject apply(Object value) {
+			return new TransformedObject(value);
+		}
+	}
 }