You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/11/22 23:25:54 UTC
[2/3] flink git commit: [FLINK-5107] Introduced limit for prior
execution attempt history
[FLINK-5107] Introduced limit for prior execution attempt history
This closes #2837.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f5af7f10
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f5af7f10
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f5af7f10
Branch: refs/heads/master
Commit: f5af7f1025aac9cac3f83de5f0e3aece5730ec0f
Parents: ba8ed26
Author: Stefan Richter <s....@data-artisans.com>
Authored: Fri Nov 18 19:07:56 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Nov 22 23:18:34 2016 +0100
----------------------------------------------------------------------
...taskExecutionAttemptAccumulatorsHandler.java | 7 +-
.../executiongraph/ArchivedExecutionVertex.java | 12 +-
.../executiongraph/ExecutionJobVertex.java | 11 +-
.../runtime/executiongraph/ExecutionVertex.java | 44 +++--
.../runtime/jobmanager/JobManagerOptions.java | 38 +++++
.../flink/runtime/util/EvictingBoundedList.java | 160 ++++++++++++++++++
.../executiongraph/AllVerticesIteratorTest.java | 9 +-
.../runtime/util/EvictingBoundedListTest.java | 164 +++++++++++++++++++
8 files changed, 422 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f5af7f10/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
index 786f5e8..675304f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.webmonitor.handlers;
import com.fasterxml.jackson.core.JsonGenerator;
-
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
@@ -39,6 +38,12 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA
@Override
public String handleRequest(AccessExecution execAttempt, Map<String, String> params) throws Exception {
+
+ // return empty string for pruned (== null) execution attempts
+ if (null == execAttempt) {
+ return "";
+ }
+
final StringifiedAccumulatorResult[] accs = execAttempt.getUserAccumulatorsStringified();
StringWriter writer = new StringWriter();
http://git-wip-us.apache.org/repos/asf/flink/blob/f5af7f10/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
index e1fb11a..56fc7a6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
@@ -19,17 +19,16 @@ package org.apache.flink.runtime.executiongraph;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.util.EvictingBoundedList;
import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializable {
private static final long serialVersionUID = -6708241535015028576L;
private final int subTaskIndex;
- private final List<ArchivedExecution> priorExecutions;
+ private final EvictingBoundedList<ArchivedExecution> priorExecutions;
/** The name in the format "myTask (2/7)", cached to avoid frequent string concatenations */
private final String taskNameWithSubtask;
@@ -38,9 +37,10 @@ public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializa
public ArchivedExecutionVertex(ExecutionVertex vertex) {
this.subTaskIndex = vertex.getParallelSubtaskIndex();
- this.priorExecutions = new ArrayList<>();
- for (Execution priorExecution : vertex.getPriorExecutions()) {
- priorExecutions.add(priorExecution.archive());
+ EvictingBoundedList<Execution> copyOfPriorExecutionsList = vertex.getCopyOfPriorExecutionsList();
+ priorExecutions = new EvictingBoundedList<>(copyOfPriorExecutionsList.getSizeLimit());
+ for (Execution priorExecution : copyOfPriorExecutionsList) {
+ priorExecutions.add(priorExecution != null ? priorExecution.archive() : null);
}
this.taskNameWithSubtask = vertex.getTaskNameWithSubtaskIndex();
this.currentExecution = vertex.getCurrentExecutionAttempt().archive();
http://git-wip-us.apache.org/repos/asf/flink/blob/f5af7f10/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index a62ed86..c066ca8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.core.io.InputSplitSource;
@@ -38,6 +39,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.JobManagerOptions;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
@@ -163,9 +165,16 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
result.getResultType());
}
+ Configuration jobConfiguration = graph.getJobConfiguration();
+ int maxPriorAttemptsHistoryLength = jobConfiguration != null ?
+ jobConfiguration.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE) :
+ JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue();
+
// create all task vertices
for (int i = 0; i < numTaskVertices; i++) {
- ExecutionVertex vertex = new ExecutionVertex(this, i, this.producedDataSets, timeout, createTimestamp);
+ ExecutionVertex vertex = new ExecutionVertex(
+ this, i, this.producedDataSets, timeout, createTimestamp, maxPriorAttemptsHistoryLength);
+
this.taskVertices[i] = vertex;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f5af7f10/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 39c60d2..5cbd1c1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -36,11 +36,13 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.JobManagerOptions;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.state.TaskStateHandles;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.util.EvictingBoundedList;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
@@ -53,7 +55,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
import static org.apache.flink.runtime.execution.ExecutionState.CANCELED;
import static org.apache.flink.runtime.execution.ExecutionState.FAILED;
@@ -79,7 +80,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
private final int subTaskIndex;
- private final List<Execution> priorExecutions;
+ private final EvictingBoundedList<Execution> priorExecutions;
private final Time timeout;
@@ -99,7 +100,13 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
int subTaskIndex,
IntermediateResult[] producedDataSets,
Time timeout) {
- this(jobVertex, subTaskIndex, producedDataSets, timeout, System.currentTimeMillis());
+ this(
+ jobVertex,
+ subTaskIndex,
+ producedDataSets,
+ timeout,
+ System.currentTimeMillis(),
+ JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue());
}
public ExecutionVertex(
@@ -107,7 +114,17 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
int subTaskIndex,
IntermediateResult[] producedDataSets,
Time timeout,
- long createTimestamp) {
+ int maxPriorExecutionHistoryLength) {
+ this(jobVertex, subTaskIndex, producedDataSets, timeout, System.currentTimeMillis(), maxPriorExecutionHistoryLength);
+ }
+
+ public ExecutionVertex(
+ ExecutionJobVertex jobVertex,
+ int subTaskIndex,
+ IntermediateResult[] producedDataSets,
+ Time timeout,
+ long createTimestamp,
+ int maxPriorExecutionHistoryLength) {
this.jobVertex = jobVertex;
this.subTaskIndex = subTaskIndex;
@@ -125,7 +142,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
this.inputEdges = new ExecutionEdge[jobVertex.getJobVertex().getInputs().size()][];
- this.priorExecutions = new CopyOnWriteArrayList<Execution>();
+ this.priorExecutions = new EvictingBoundedList<>(maxPriorExecutionHistoryLength);
this.currentExecution = new Execution(
getExecutionGraph().getFutureExecutor(),
@@ -235,16 +252,19 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
@Override
public Execution getPriorExecutionAttempt(int attemptNumber) {
- if (attemptNumber >= 0 && attemptNumber < priorExecutions.size()) {
- return priorExecutions.get(attemptNumber);
- }
- else {
- throw new IllegalArgumentException("attempt does not exist");
+ synchronized (priorExecutions) {
+ if (attemptNumber >= 0 && attemptNumber < priorExecutions.size()) {
+ return priorExecutions.get(attemptNumber);
+ } else {
+ throw new IllegalArgumentException("attempt does not exist");
+ }
}
}
- List<Execution> getPriorExecutions() {
- return priorExecutions;
+ EvictingBoundedList<Execution> getCopyOfPriorExecutionsList() {
+ synchronized (priorExecutions) {
+ return new EvictingBoundedList<>(priorExecutions);
+ }
}
public ExecutionGraph getExecutionGraph() {
http://git-wip-us.apache.org/repos/asf/flink/blob/f5af7f10/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java
new file mode 100644
index 0000000..279a70e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+@PublicEvolving
+public class JobManagerOptions {
+
+ /**
+ * The maximum number of prior execution attempts kept in history.
+ */
+ public static final ConfigOption<Integer> MAX_ATTEMPTS_HISTORY_SIZE =
+ key("job-manager.max-attempts-history-size").defaultValue(16);
+
+ private JobManagerOptions() {
+ throw new IllegalAccessError();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f5af7f10/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
new file mode 100644
index 0000000..f4c155a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * This class implements a list (array based) that is physically bounded in maximum size, but can virtually grow beyond
+ * the bounded size. When the list grows beyond the size bound, elements are dropped from the head of the list (FIFO
+ * order). If dropped elements are accessed, a default element is returned instead.
+ * <p>
+ * TODO this class could eventually implement the whole actual List interface.
+ *
+ * @param <T> type of the list elements
+ */
+public class EvictingBoundedList<T> implements Iterable<T>, Serializable {
+
+ private static final long serialVersionUID = -1863961980953613146L;
+
+ private final T defaultElement;
+ private final Object[] elements;
+ private int idx;
+ private int count;
+ private long modCount;
+
+ public EvictingBoundedList(int sizeLimit) {
+ this(sizeLimit, null);
+ }
+
+ public EvictingBoundedList(EvictingBoundedList<T> other) {
+ Preconditions.checkNotNull(other);
+ this.defaultElement = other.defaultElement;
+ this.elements = other.elements.clone();
+ this.idx = other.idx;
+ this.count = other.count;
+ this.modCount = 0L;
+ }
+
+ public EvictingBoundedList(int sizeLimit, T defaultElement) {
+ this.elements = new Object[sizeLimit];
+ this.defaultElement = defaultElement;
+ this.idx = 0;
+ this.count = 0;
+ this.modCount = 0L;
+ }
+
+ public int size() {
+ return count;
+ }
+
+ public boolean isEmpty() {
+ return 0 == count;
+ }
+
+ public boolean add(T t) {
+ elements[idx] = t;
+ idx = (idx + 1) % elements.length;
+ ++count;
+ ++modCount;
+ return true;
+ }
+
+ public void clear() {
+ if (!isEmpty()) {
+ for (int i = 0; i < elements.length; ++i) {
+ elements[i] = null;
+ }
+ count = 0;
+ idx = 0;
+ ++modCount;
+ }
+ }
+
+ public T get(int index) {
+ Preconditions.checkArgument(index >= 0 && index < count);
+ return isDroppedIndex(index) ? getDefaultElement() : accessInternal(index % elements.length);
+ }
+
+ public int getSizeLimit() {
+ return elements.length;
+ }
+
+ public T set(int index, T element) {
+ Preconditions.checkArgument(index >= 0 && index < count);
+ ++modCount;
+ if (isDroppedIndex(index)) {
+ return getDefaultElement();
+ } else {
+ int idx = index % elements.length;
+ T old = accessInternal(idx);
+ elements[idx] = element;
+ return old;
+ }
+ }
+
+ public T getDefaultElement() {
+ return defaultElement;
+ }
+
+ private boolean isDroppedIndex(int idx) {
+ return idx < count - elements.length;
+ }
+
+ @SuppressWarnings("unchecked")
+ private T accessInternal(int arrayIndex) {
+ return (T) elements[arrayIndex];
+ }
+
+ @Override
+ public Iterator<T> iterator() {
+ return new Iterator<T>() {
+
+ int pos = 0;
+ final long oldModCount = modCount;
+
+ @Override
+ public boolean hasNext() {
+ return pos < count;
+ }
+
+ @Override
+ public T next() {
+ if (oldModCount != modCount) {
+ throw new ConcurrentModificationException();
+ }
+ if (pos < count) {
+ return get(pos++);
+ } else {
+ throw new NoSuchElementException("Iterator exhausted.");
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Read-only iterator");
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f5af7f10/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
index 0223a2e..4ecac9b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
@@ -18,8 +18,7 @@
package org.apache.flink.runtime.executiongraph;
-import java.util.Arrays;
-
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -28,6 +27,8 @@ import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
+import java.util.Arrays;
+
public class AllVerticesIteratorTest {
@Test
@@ -50,8 +51,10 @@ public class AllVerticesIteratorTest {
v4.setParallelism(2);
ExecutionGraph eg = Mockito.mock(ExecutionGraph.class);
+ Configuration jobConf = new Configuration();
Mockito.when(eg.getFutureExecutor()).thenReturn(TestingUtils.directExecutionContext());
-
+ Mockito.when(eg.getJobConfiguration()).thenReturn(jobConf);
+
ExecutionJobVertex ejv1 = new ExecutionJobVertex(eg, v1, 1,
AkkaUtils.getDefaultTimeout());
ExecutionJobVertex ejv2 = new ExecutionJobVertex(eg, v2, 1,
http://git-wip-us.apache.org/repos/asf/flink/blob/f5af7f10/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
new file mode 100644
index 0000000..e0a1c70
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class EvictingBoundedListTest {
+
+ @Test
+ public void testAddGet() {
+ int insertSize = 17;
+ int boundSize = 5;
+ Integer defaultElement = 4711;
+
+ EvictingBoundedList<Integer> list = new EvictingBoundedList<>(boundSize, defaultElement);
+ assertTrue(list.isEmpty());
+
+ for (int i = 0; i < insertSize; ++i) {
+ list.add(i);
+ }
+
+ assertEquals(17, list.size());
+
+ for (int i = 0; i < insertSize; ++i) {
+ int exp = i < (insertSize - boundSize) ? defaultElement : i;
+ int act = list.get(i);
+ assertEquals(exp, act);
+ }
+ }
+
+ @Test
+ public void testSet() {
+ int insertSize = 17;
+ int boundSize = 5;
+ Integer defaultElement = 4711;
+ List<Integer> reference = new ArrayList<>(insertSize);
+ EvictingBoundedList<Integer> list = new EvictingBoundedList<>(boundSize, defaultElement);
+ for (int i = 0; i < insertSize; ++i) {
+ reference.add(i);
+ list.add(i);
+ }
+
+ assertEquals(reference.size(), list.size());
+
+ list.set(0, 123);
+ list.set(insertSize - boundSize - 1, 123);
+
+ list.set(insertSize - boundSize, 42);
+ reference.set(insertSize - boundSize, 42);
+ list.set(13, 43);
+ reference.set(13, 43);
+ list.set(16, 44);
+ reference.set(16, 44);
+
+ try {
+ list.set(insertSize, 23);
+ fail("Illegal index in set not detected.");
+ } catch (IllegalArgumentException ignored) {
+
+ }
+
+ for (int i = 0; i < insertSize; ++i) {
+ int exp = i < (insertSize - boundSize) ? defaultElement : reference.get(i);
+ int act = list.get(i);
+ assertEquals(exp, act);
+ }
+
+ assertEquals(reference.size(), list.size());
+ }
+
+ @Test
+ public void testClear() {
+ int insertSize = 17;
+ int boundSize = 5;
+ Integer defaultElement = 4711;
+
+ EvictingBoundedList<Integer> list = new EvictingBoundedList<>(boundSize, defaultElement);
+ for (int i = 0; i < insertSize; ++i) {
+ list.add(i);
+ }
+
+ list.clear();
+
+ assertEquals(0, list.size());
+ assertTrue(list.isEmpty());
+
+ try {
+ list.get(0);
+ fail();
+ } catch (IllegalArgumentException ignore) {
+ }
+ }
+
+ @Test
+ public void testIterator() {
+ int insertSize = 17;
+ int boundSize = 5;
+ Integer defaultElement = 4711;
+
+ EvictingBoundedList<Integer> list = new EvictingBoundedList<>(boundSize, defaultElement);
+ assertTrue(list.isEmpty());
+
+ for (int i = 0; i < insertSize; ++i) {
+ list.add(i);
+ }
+
+ Iterator<Integer> iterator = list.iterator();
+
+ for (int i = 0; i < insertSize; ++i) {
+ assertTrue(iterator.hasNext());
+ int exp = i < (insertSize - boundSize) ? defaultElement : i;
+ int act = iterator.next();
+ assertEquals(exp, act);
+ }
+
+ assertFalse(iterator.hasNext());
+
+ try {
+ iterator.next();
+ fail("Next on exhausted iterator did not trigger exception.");
+ } catch (NoSuchElementException ignored) {
+
+ }
+
+ iterator = list.iterator();
+ assertTrue(iterator.hasNext());
+ iterator.next();
+ list.add(123);
+ assertTrue(iterator.hasNext());
+ try {
+ iterator.next();
+ fail("Concurrent modification not detected.");
+ } catch (ConcurrentModificationException ignored) {
+
+ }
+ }
+}