You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/11/22 23:25:54 UTC

[2/3] flink git commit: [FLINK-5107] Introduced limit for prior execution attempt history

[FLINK-5107] Introduced limit for prior execution attempt history

This closes #2837.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f5af7f10
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f5af7f10
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f5af7f10

Branch: refs/heads/master
Commit: f5af7f1025aac9cac3f83de5f0e3aece5730ec0f
Parents: ba8ed26
Author: Stefan Richter <s....@data-artisans.com>
Authored: Fri Nov 18 19:07:56 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Nov 22 23:18:34 2016 +0100

----------------------------------------------------------------------
 ...taskExecutionAttemptAccumulatorsHandler.java |   7 +-
 .../executiongraph/ArchivedExecutionVertex.java |  12 +-
 .../executiongraph/ExecutionJobVertex.java      |  11 +-
 .../runtime/executiongraph/ExecutionVertex.java |  44 +++--
 .../runtime/jobmanager/JobManagerOptions.java   |  38 +++++
 .../flink/runtime/util/EvictingBoundedList.java | 160 ++++++++++++++++++
 .../executiongraph/AllVerticesIteratorTest.java |   9 +-
 .../runtime/util/EvictingBoundedListTest.java   | 164 +++++++++++++++++++
 8 files changed, 422 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f5af7f10/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
index 786f5e8..675304f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import com.fasterxml.jackson.core.JsonGenerator;
-
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
@@ -39,6 +38,12 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA
 
 	@Override
 	public String handleRequest(AccessExecution execAttempt, Map<String, String> params) throws Exception {
+
+		// return empty string for pruned (== null) execution attempts
+		if (null == execAttempt) {
+			return "";
+		}
+
 		final StringifiedAccumulatorResult[] accs = execAttempt.getUserAccumulatorsStringified();
 		
 		StringWriter writer = new StringWriter();

http://git-wip-us.apache.org/repos/asf/flink/blob/f5af7f10/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
index e1fb11a..56fc7a6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
@@ -19,17 +19,16 @@ package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.util.EvictingBoundedList;
 
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
 
 public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializable {
 
 	private static final long serialVersionUID = -6708241535015028576L;
 	private final int subTaskIndex;
 
-	private final List<ArchivedExecution> priorExecutions;
+	private final EvictingBoundedList<ArchivedExecution> priorExecutions;
 
 	/** The name in the format "myTask (2/7)", cached to avoid frequent string concatenations */
 	private final String taskNameWithSubtask;
@@ -38,9 +37,10 @@ public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializa
 
 	public ArchivedExecutionVertex(ExecutionVertex vertex) {
 		this.subTaskIndex = vertex.getParallelSubtaskIndex();
-		this.priorExecutions = new ArrayList<>();
-		for (Execution priorExecution : vertex.getPriorExecutions()) {
-			priorExecutions.add(priorExecution.archive());
+		EvictingBoundedList<Execution> copyOfPriorExecutionsList = vertex.getCopyOfPriorExecutionsList();
+		priorExecutions = new EvictingBoundedList<>(copyOfPriorExecutionsList.getSizeLimit());
+		for (Execution priorExecution : copyOfPriorExecutionsList) {
+			priorExecutions.add(priorExecution != null ? priorExecution.archive() : null);
 		}
 		this.taskNameWithSubtask = vertex.getTaskNameWithSubtaskIndex();
 		this.currentExecution = vertex.getCurrentExecutionAttempt().archive();

http://git-wip-us.apache.org/repos/asf/flink/blob/f5af7f10/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index a62ed86..c066ca8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.core.io.InputSplitSource;
@@ -38,6 +39,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobEdge;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.JobManagerOptions;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
@@ -163,9 +165,16 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 					result.getResultType());
 		}
 
+		Configuration jobConfiguration = graph.getJobConfiguration();
+		int maxPriorAttemptsHistoryLength = jobConfiguration != null ?
+				jobConfiguration.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE) :
+				JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue();
+
 		// create all task vertices
 		for (int i = 0; i < numTaskVertices; i++) {
-			ExecutionVertex vertex = new ExecutionVertex(this, i, this.producedDataSets, timeout, createTimestamp);
+			ExecutionVertex vertex = new ExecutionVertex(
+					this, i, this.producedDataSets, timeout, createTimestamp, maxPriorAttemptsHistoryLength);
+
 			this.taskVertices[i] = vertex;
 		}
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/f5af7f10/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 39c60d2..5cbd1c1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -36,11 +36,13 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobEdge;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.JobManagerOptions;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.util.EvictingBoundedList;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
@@ -53,7 +55,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
 
 import static org.apache.flink.runtime.execution.ExecutionState.CANCELED;
 import static org.apache.flink.runtime.execution.ExecutionState.FAILED;
@@ -79,7 +80,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 
 	private final int subTaskIndex;
 
-	private final List<Execution> priorExecutions;
+	private final EvictingBoundedList<Execution> priorExecutions;
 
 	private final Time timeout;
 
@@ -99,7 +100,13 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 			int subTaskIndex,
 			IntermediateResult[] producedDataSets,
 			Time timeout) {
-		this(jobVertex, subTaskIndex, producedDataSets, timeout, System.currentTimeMillis());
+		this(
+				jobVertex,
+				subTaskIndex,
+				producedDataSets,
+				timeout,
+				System.currentTimeMillis(),
+				JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue());
 	}
 
 	public ExecutionVertex(
@@ -107,7 +114,17 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 			int subTaskIndex,
 			IntermediateResult[] producedDataSets,
 			Time timeout,
-			long createTimestamp) {
+			int maxPriorExecutionHistoryLength) {
+		this(jobVertex, subTaskIndex, producedDataSets, timeout, System.currentTimeMillis(), maxPriorExecutionHistoryLength);
+	}
+
+	public ExecutionVertex(
+			ExecutionJobVertex jobVertex,
+			int subTaskIndex,
+			IntermediateResult[] producedDataSets,
+			Time timeout,
+			long createTimestamp,
+			int maxPriorExecutionHistoryLength) {
 
 		this.jobVertex = jobVertex;
 		this.subTaskIndex = subTaskIndex;
@@ -125,7 +142,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 
 		this.inputEdges = new ExecutionEdge[jobVertex.getJobVertex().getInputs().size()][];
 
-		this.priorExecutions = new CopyOnWriteArrayList<Execution>();
+		this.priorExecutions = new EvictingBoundedList<>(maxPriorExecutionHistoryLength);
 
 		this.currentExecution = new Execution(
 			getExecutionGraph().getFutureExecutor(),
@@ -235,16 +252,19 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 
 	@Override
 	public Execution getPriorExecutionAttempt(int attemptNumber) {
-		if (attemptNumber >= 0 && attemptNumber < priorExecutions.size()) {
-			return priorExecutions.get(attemptNumber);
-		}
-		else {
-			throw new IllegalArgumentException("attempt does not exist");
+		synchronized (priorExecutions) {
+			if (attemptNumber >= 0 && attemptNumber < priorExecutions.size()) {
+				return priorExecutions.get(attemptNumber);
+			} else {
+				throw new IllegalArgumentException("attempt does not exist");
+			}
 		}
 	}
 
-	List<Execution> getPriorExecutions() {
-		return priorExecutions;
+	EvictingBoundedList<Execution> getCopyOfPriorExecutionsList() {
+		synchronized (priorExecutions) {
+			return new EvictingBoundedList<>(priorExecutions);
+		}
 	}
 
 	public ExecutionGraph getExecutionGraph() {

http://git-wip-us.apache.org/repos/asf/flink/blob/f5af7f10/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java
new file mode 100644
index 0000000..279a70e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+@PublicEvolving
+public class JobManagerOptions {
+
+	/**
+	 * The maximum number of prior execution attempts kept in history.
+	 */
+	public static final ConfigOption<Integer> MAX_ATTEMPTS_HISTORY_SIZE =
+			key("job-manager.max-attempts-history-size").defaultValue(16);
+
+	private JobManagerOptions() {
+		throw new IllegalAccessError();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5af7f10/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
new file mode 100644
index 0000000..f4c155a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * This class implements a list (array based) that is physically bounded in maximum size, but can virtually grow beyond
+ * the bounded size. When the list grows beyond the size bound, elements are dropped from the head of the list (FIFO
+ * order). If dropped elements are accessed, a default element is returned instead.
+ * <p>
+ * TODO this class could eventually implement the whole actual List interface.
+ *
+ * @param <T> type of the list elements
+ */
+public class EvictingBoundedList<T> implements Iterable<T>, Serializable {
+
+	private static final long serialVersionUID = -1863961980953613146L;
+
+	private final T defaultElement;
+	private final Object[] elements;
+	private int idx;
+	private int count;
+	private long modCount;
+
+	public EvictingBoundedList(int sizeLimit) {
+		this(sizeLimit, null);
+	}
+
+	public EvictingBoundedList(EvictingBoundedList<T> other) {
+		Preconditions.checkNotNull(other);
+		this.defaultElement = other.defaultElement;
+		this.elements = other.elements.clone();
+		this.idx = other.idx;
+		this.count = other.count;
+		this.modCount = 0L;
+	}
+
+	public EvictingBoundedList(int sizeLimit, T defaultElement) {
+		this.elements = new Object[sizeLimit];
+		this.defaultElement = defaultElement;
+		this.idx = 0;
+		this.count = 0;
+		this.modCount = 0L;
+	}
+
+	public int size() {
+		return count;
+	}
+
+	public boolean isEmpty() {
+		return 0 == count;
+	}
+
+	public boolean add(T t) {
+		elements[idx] = t;
+		idx = (idx + 1) % elements.length;
+		++count;
+		++modCount;
+		return true;
+	}
+
+	public void clear() {
+		if (!isEmpty()) {
+			for (int i = 0; i < elements.length; ++i) {
+				elements[i] = null;
+			}
+			count = 0;
+			idx = 0;
+			++modCount;
+		}
+	}
+
+	public T get(int index) {
+		Preconditions.checkArgument(index >= 0 && index < count);
+		return isDroppedIndex(index) ? getDefaultElement() : accessInternal(index % elements.length);
+	}
+
+	public int getSizeLimit() {
+		return elements.length;
+	}
+
+	public T set(int index, T element) {
+		Preconditions.checkArgument(index >= 0 && index < count);
+		++modCount;
+		if (isDroppedIndex(index)) {
+			return getDefaultElement();
+		} else {
+			int idx = index % elements.length;
+			T old = accessInternal(idx);
+			elements[idx] = element;
+			return old;
+		}
+	}
+
+	public T getDefaultElement() {
+		return defaultElement;
+	}
+
+	private boolean isDroppedIndex(int idx) {
+		return idx < count - elements.length;
+	}
+
+	@SuppressWarnings("unchecked")
+	private T accessInternal(int arrayIndex) {
+		return (T) elements[arrayIndex];
+	}
+
+	@Override
+	public Iterator<T> iterator() {
+		return new Iterator<T>() {
+
+			int pos = 0;
+			final long oldModCount = modCount;
+
+			@Override
+			public boolean hasNext() {
+				return pos < count;
+			}
+
+			@Override
+			public T next() {
+				if (oldModCount != modCount) {
+					throw new ConcurrentModificationException();
+				}
+				if (pos < count) {
+					return get(pos++);
+				} else {
+					throw new NoSuchElementException("Iterator exhausted.");
+				}
+			}
+
+			@Override
+			public void remove() {
+				throw new UnsupportedOperationException("Read-only iterator");
+			}
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5af7f10/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
index 0223a2e..4ecac9b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
@@ -18,8 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import java.util.Arrays;
-
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -28,6 +27,8 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import java.util.Arrays;
+
 public class AllVerticesIteratorTest {
 
 	@Test
@@ -50,8 +51,10 @@ public class AllVerticesIteratorTest {
 			v4.setParallelism(2);
 			
 			ExecutionGraph eg = Mockito.mock(ExecutionGraph.class);
+			Configuration jobConf = new Configuration();
 			Mockito.when(eg.getFutureExecutor()).thenReturn(TestingUtils.directExecutionContext());
-					
+			Mockito.when(eg.getJobConfiguration()).thenReturn(jobConf);
+
 			ExecutionJobVertex ejv1 = new ExecutionJobVertex(eg, v1, 1,
 					AkkaUtils.getDefaultTimeout());
 			ExecutionJobVertex ejv2 = new ExecutionJobVertex(eg, v2, 1,

http://git-wip-us.apache.org/repos/asf/flink/blob/f5af7f10/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
new file mode 100644
index 0000000..e0a1c70
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class EvictingBoundedListTest {
+
+	@Test
+	public void testAddGet() {
+		int insertSize = 17;
+		int boundSize = 5;
+		Integer defaultElement = 4711;
+
+		EvictingBoundedList<Integer> list = new EvictingBoundedList<>(boundSize, defaultElement);
+		assertTrue(list.isEmpty());
+
+		for (int i = 0; i < insertSize; ++i) {
+			list.add(i);
+		}
+
+		assertEquals(17, list.size());
+
+		for (int i = 0; i < insertSize; ++i) {
+			int exp = i < (insertSize - boundSize) ? defaultElement : i;
+			int act = list.get(i);
+			assertEquals(exp, act);
+		}
+	}
+
+	@Test
+	public void testSet() {
+		int insertSize = 17;
+		int boundSize = 5;
+		Integer defaultElement = 4711;
+		List<Integer> reference = new ArrayList<>(insertSize);
+		EvictingBoundedList<Integer> list = new EvictingBoundedList<>(boundSize, defaultElement);
+		for (int i = 0; i < insertSize; ++i) {
+			reference.add(i);
+			list.add(i);
+		}
+
+		assertEquals(reference.size(), list.size());
+
+		list.set(0, 123);
+		list.set(insertSize - boundSize - 1, 123);
+
+		list.set(insertSize - boundSize, 42);
+		reference.set(insertSize - boundSize, 42);
+		list.set(13, 43);
+		reference.set(13, 43);
+		list.set(16, 44);
+		reference.set(16, 44);
+
+		try {
+			list.set(insertSize, 23);
+			fail("Illegal index in set not detected.");
+		} catch (IllegalArgumentException ignored) {
+
+		}
+
+		for (int i = 0; i < insertSize; ++i) {
+			int exp = i < (insertSize - boundSize) ? defaultElement : reference.get(i);
+			int act = list.get(i);
+			assertEquals(exp, act);
+		}
+
+		assertEquals(reference.size(), list.size());
+	}
+
+	@Test
+	public void testClear() {
+		int insertSize = 17;
+		int boundSize = 5;
+		Integer defaultElement = 4711;
+
+		EvictingBoundedList<Integer> list = new EvictingBoundedList<>(boundSize, defaultElement);
+		for (int i = 0; i < insertSize; ++i) {
+			list.add(i);
+		}
+
+		list.clear();
+
+		assertEquals(0, list.size());
+		assertTrue(list.isEmpty());
+
+		try {
+			list.get(0);
+			fail();
+		} catch (IllegalArgumentException ignore) {
+		}
+	}
+
+	@Test
+	public void testIterator() {
+		int insertSize = 17;
+		int boundSize = 5;
+		Integer defaultElement = 4711;
+
+		EvictingBoundedList<Integer> list = new EvictingBoundedList<>(boundSize, defaultElement);
+		assertTrue(list.isEmpty());
+
+		for (int i = 0; i < insertSize; ++i) {
+			list.add(i);
+		}
+
+		Iterator<Integer> iterator = list.iterator();
+
+		for (int i = 0; i < insertSize; ++i) {
+			assertTrue(iterator.hasNext());
+			int exp = i < (insertSize - boundSize) ? defaultElement : i;
+			int act = iterator.next();
+			assertEquals(exp, act);
+		}
+
+		assertFalse(iterator.hasNext());
+
+		try {
+			iterator.next();
+			fail("Next on exhausted iterator did not trigger exception.");
+		} catch (NoSuchElementException ignored) {
+
+		}
+
+		iterator = list.iterator();
+		assertTrue(iterator.hasNext());
+		iterator.next();
+		list.add(123);
+		assertTrue(iterator.hasNext());
+		try {
+			iterator.next();
+			fail("Concurrent modification not detected.");
+		} catch (ConcurrentModificationException ignored) {
+
+		}
+	}
+}