You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:12:44 UTC
[20/63] [abbrv] git commit: Introduce execution attempts at execution
vertex. Add tests for job event classes
Introduce execution attempts at execution vertex.
Add tests for job event classes
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/43e7d0fa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/43e7d0fa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/43e7d0fa
Branch: refs/heads/master
Commit: 43e7d0fa0a2b71ed615039bcf9e708523b015cf3
Parents: 2ac08a6
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 9 21:12:20 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Sep 20 20:02:48 2014 +0200
----------------------------------------------------------------------
.../executiongraph/ExecutionAttempt.java | 111 +++++++++++++
.../runtime/executiongraph/ExecutionGate.java | 96 ------------
.../executiongraph/ExecutionVertex2.java | 57 +++++--
.../flink/runtime/event/job/EventsTest.java | 154 +++++++++++++++++++
4 files changed, 307 insertions(+), 111 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/43e7d0fa/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttempt.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttempt.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttempt.java
new file mode 100644
index 0000000..b623d6f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttempt.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.flink.runtime.instance.AllocatedSlot;
+
+public class ExecutionAttempt {
+
+ private final AtomicBoolean finished = new AtomicBoolean();
+
+ private final ExecutionAttemptID attemptId;
+
+ private final AllocatedSlot assignedResource;
+
+ private final int attemptNumber;
+
+ private final long startTimestamp;
+
+ private volatile long endTimestamp;
+
+ private volatile Throwable failureCause;
+
+ // --------------------------------------------------------------------------------------------
+
+ public ExecutionAttemptID getAttemptId() {
+ return attemptId;
+ }
+
+ public ExecutionAttempt(ExecutionAttemptID attemptId, AllocatedSlot assignedResource, int attemptNumber, long startTimestamp) {
+ this.attemptId = attemptId;
+ this.assignedResource = assignedResource;
+ this.attemptNumber = attemptNumber;
+ this.startTimestamp = startTimestamp;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public AllocatedSlot getAssignedResource() {
+ return assignedResource;
+ }
+
+ public int getAttemptNumber() {
+ return attemptNumber;
+ }
+
+ public long getStartTimestamp() {
+ return startTimestamp;
+ }
+
+ public long getEndTimestamp() {
+ return endTimestamp;
+ }
+
+ public Throwable getFailureCause() {
+ return failureCause;
+ }
+
+ public boolean isFinished() {
+ return finished.get();
+ }
+
+ public boolean isFailed() {
+ return finished.get() && failureCause != null;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public boolean finish() {
+ if (finished.compareAndSet(false, true)) {
+ endTimestamp = System.currentTimeMillis();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public boolean fail(Throwable error) {
+ if (finished.compareAndSet(false, true)) {
+ failureCause = error;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ return String.format("Attempt #%d (%s) @ %s - started %d %s", attemptNumber, attemptId,
+ assignedResource.toString(), startTimestamp, isFinished() ? "finished " + endTimestamp : "[RUNNING]");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/43e7d0fa/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGate.java
deleted file mode 100644
index 1bfb923..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGate.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.executiongraph;
-
-import java.util.Collection;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-import org.apache.flink.runtime.io.network.channels.ChannelType;
-import org.apache.flink.runtime.io.network.gates.GateID;
-
-/**
- * Objects of this class represent either an {@link InputGate} or {@link OutputGate} within an {@link ExecutionGraph},
- * Nephele's internal scheduling representation for jobs.
- * <p>
- * This class is thread-safe.
- *
- */
-public final class ExecutionGate {
-
- private final GateID gateID;
-
- private volatile ExecutionVertex vertex;
-
- private final ExecutionGroupEdge groupEdge;
-
- private final boolean isInputGate;
-
- private final CopyOnWriteArrayList<ExecutionEdge> edges = new CopyOnWriteArrayList<ExecutionEdge>();
-
- ExecutionGate(final GateID gateID, final ExecutionVertex vertex, final ExecutionGroupEdge groupEdge,
- final boolean isInputGate) {
-
- this.gateID = gateID;
- this.vertex = vertex;
- this.groupEdge = groupEdge;
- this.isInputGate = isInputGate;
- }
-
- public GateID getGateID() {
-
- return this.gateID;
- }
-
- public ExecutionVertex getVertex() {
-
- return this.vertex;
- }
-
- public boolean isInputGate() {
-
- return this.isInputGate;
- }
-
- public int getNumberOfEdges() {
-
- return this.edges.size();
- }
-
- public ExecutionEdge getEdge(final int index) {
-
- return this.edges.get(index);
- }
-
- void replaceAllEdges(final Collection<ExecutionEdge> newEdges) {
-
- this.edges.clear();
- this.edges.addAll(newEdges);
- }
-
- public ChannelType getChannelType() {
-
- return this.groupEdge.getChannelType();
- }
-
- ExecutionGroupEdge getGroupEdge() {
-
- return this.groupEdge;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/43e7d0fa/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java
index 4a4d0e7..787605d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.executiongraph;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.commons.logging.Log;
@@ -65,8 +66,8 @@ public class ExecutionVertex2 {
private static final AtomicReferenceFieldUpdater<ExecutionVertex2, ExecutionState2> STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(ExecutionVertex2.class, ExecutionState2.class, "state");
- private static final AtomicReferenceFieldUpdater<ExecutionVertex2, AllocatedSlot> ASSIGNED_SLOT_UPDATER =
- AtomicReferenceFieldUpdater.newUpdater(ExecutionVertex2.class, AllocatedSlot.class, "assignedSlot");
+ private static final AtomicReferenceFieldUpdater<ExecutionVertex2, ExecutionAttempt> ATTEMPT_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(ExecutionVertex2.class, ExecutionAttempt.class, "currentOrLastAttempt");
private static final Logger LOG = ExecutionGraph.LOG;
@@ -82,15 +83,13 @@ public class ExecutionVertex2 {
private final int subTaskIndex;
-
private final long[] stateTimestamps;
+ private final List<ExecutionAttempt> priorAttempts;
- private volatile ExecutionState2 state = CREATED;
-
- private volatile AllocatedSlot assignedSlot;
+ private volatile ExecutionAttempt currentOrLastAttempt;
- private volatile Throwable failureCause;
+ private volatile ExecutionState2 state = CREATED;
public ExecutionVertex2(ExecutionJobVertex jobVertex, int subTaskIndex, IntermediateResult[] producedDataSets) {
@@ -107,6 +106,7 @@ public class ExecutionVertex2 {
this.inputEdges = new ExecutionEdge2[jobVertex.getJobVertex().getInputs().size()][];
this.stateTimestamps = new long[ExecutionState2.values().length];
+ this.priorAttempts = new CopyOnWriteArrayList<ExecutionAttempt>();
}
@@ -149,21 +149,48 @@ public class ExecutionVertex2 {
return state;
}
- public Throwable getFailureCause() {
- return failureCause;
+ public long getStateTimestamp(ExecutionState2 state) {
+ return this.stateTimestamps[state.ordinal()];
}
- public AllocatedSlot getAssignedResource() {
- return assignedSlot;
+ private ExecutionGraph getExecutionGraph() {
+ return this.jobVertex.getGraph();
}
- public long getStateTimestamp(ExecutionState2 state) {
- return this.stateTimestamps[state.ordinal()];
+ public Throwable getLastFailureCause() {
+ // copy reference to the stack
+ ExecutionAttempt attempt = this.currentOrLastAttempt;
+ if (attempt != null) {
+ return attempt.getFailureCause();
+ }
+ else if (priorAttempts.size() > 0) {
+ // since the list is append-only, this always works in the presence of concurrent modifications
+ return priorAttempts.get(priorAttempts.size() - 1).getFailureCause();
+ }
+ else {
+ return null;
+ }
}
+ public AllocatedSlot getCurrentAssignedResource() {
+ // copy reference to the stack
+ ExecutionAttempt attempt = this.currentOrLastAttempt;
+ return attempt == null ? null : attempt.getAssignedResource();
+ }
- private ExecutionGraph getExecutionGraph() {
- return this.jobVertex.getGraph();
+ public AllocatedSlot getLastAssignedResource() {
+ // copy reference to the stack
+ ExecutionAttempt attempt = this.currentOrLastAttempt;
+ if (attempt != null) {
+ return attempt.getAssignedResource();
+ }
+ else if (priorAttempts.size() > 0) {
+ // since the list is append-only, this always works in the presence of concurrent modifications
+ return priorAttempts.get(priorAttempts.size() - 1).getAssignedResource();
+ }
+ else {
+ return null;
+ }
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/43e7d0fa/flink-runtime/src/test/java/org/apache/flink/runtime/event/job/EventsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/event/job/EventsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/event/job/EventsTest.java
new file mode 100644
index 0000000..ff6732b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/event/job/EventsTest.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.event.job;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.runtime.execution.ExecutionState2;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.junit.Test;
+
+
+public class EventsTest {
+
+ @Test
+ public void testEqualsHashCodeToString() {
+ try {
+
+ // ExecutionStateChangeEvent
+ {
+ JobVertexID jid = new JobVertexID();
+ ExecutionAttemptID eid = new ExecutionAttemptID();
+
+ ExecutionStateChangeEvent e1 = new ExecutionStateChangeEvent(429345231796596L, jid, 17, eid, ExecutionState2.CANCELING);
+ ExecutionStateChangeEvent e2 = new ExecutionStateChangeEvent(429345231796596L, jid, 17, eid, ExecutionState2.CANCELING);
+
+ assertTrue(e1.equals(e2));
+ assertEquals(e1.hashCode(), e2.hashCode());
+ e1.toString();
+ }
+
+ // JobEvent
+ {
+ JobEvent e1 = new JobEvent(429345231796596L, JobStatus.CANCELED, "testmessage");
+ JobEvent e2 = new JobEvent(429345231796596L, JobStatus.CANCELED, "testmessage");
+ JobEvent e3 = new JobEvent(237217579123412431L, JobStatus.RUNNING, null);
+ JobEvent e4 = new JobEvent(237217579123412431L, JobStatus.RUNNING, null);
+
+ assertTrue(e1.equals(e2));
+ assertTrue(e3.equals(e4));
+
+ assertEquals(e1.hashCode(), e2.hashCode());
+ assertEquals(e3.hashCode(), e4.hashCode());
+ e1.toString();
+ e3.toString();
+ }
+
+ // RecentJobEvent
+ {
+ JobID jid = new JobID();
+ RecentJobEvent e1 = new RecentJobEvent(jid, "some name", JobStatus.FAILED, false, 634563546, 546734734672572457L);
+ RecentJobEvent e2 = new RecentJobEvent(jid, "some name", JobStatus.FAILED, false, 634563546, 546734734672572457L);
+ RecentJobEvent e3 = new RecentJobEvent(jid, null, JobStatus.RUNNING, false, 42364716239476L, 127843618273607L);
+ RecentJobEvent e4 = new RecentJobEvent(jid, null, JobStatus.RUNNING, false, 42364716239476L, 127843618273607L);
+
+ assertTrue(e1.equals(e2));
+ assertTrue(e3.equals(e4));
+
+ assertEquals(e1.hashCode(), e2.hashCode());
+ assertEquals(e3.hashCode(), e4.hashCode());
+ e1.toString();
+ e3.toString();
+ }
+
+ // VertexEvent
+ {
+ JobVertexID jid = new JobVertexID();
+ ExecutionAttemptID eid = new ExecutionAttemptID();
+
+ VertexEvent e1 = new VertexEvent(64619276017401234L, jid, "peter", 44, 13, eid, ExecutionState2.DEPLOYING, "foo");
+ VertexEvent e2 = new VertexEvent(64619276017401234L, jid, "peter", 44, 13, eid, ExecutionState2.DEPLOYING, "foo");
+
+ VertexEvent e3 = new VertexEvent(64619276017401234L, jid, null, 44, 13, eid, ExecutionState2.DEPLOYING, null);
+ VertexEvent e4 = new VertexEvent(64619276017401234L, jid, null, 44, 13, eid, ExecutionState2.DEPLOYING, null);
+
+ assertTrue(e1.equals(e2));
+ assertTrue(e3.equals(e4));
+
+ assertFalse(e1.equals(e3));
+
+ assertEquals(e1.hashCode(), e2.hashCode());
+ assertEquals(e3.hashCode(), e4.hashCode());
+ e1.toString();
+ e3.toString();
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSerialization() {
+ try {
+ JobID jid = new JobID();
+ JobVertexID vid = new JobVertexID();
+ ExecutionAttemptID eid = new ExecutionAttemptID();
+
+ ExecutionStateChangeEvent esce = new ExecutionStateChangeEvent(429345231796596L, vid, 17, eid, ExecutionState2.CANCELING);
+
+ JobEvent je1 = new JobEvent(429345231796596L, JobStatus.CANCELED, "testmessage");
+ JobEvent je2 = new JobEvent(237217579123412431L, JobStatus.RUNNING, null);
+
+ RecentJobEvent rce1 = new RecentJobEvent(jid, "some name", JobStatus.FAILED, false, 634563546, 546734734672572457L);
+ RecentJobEvent rce2 = new RecentJobEvent(jid, null, JobStatus.RUNNING, false, 42364716239476L, 127843618273607L);
+
+ VertexEvent ve1 = new VertexEvent(64619276017401234L, vid, "peter", 44, 13, eid, ExecutionState2.DEPLOYING, "foo");
+ VertexEvent ve2 = new VertexEvent(64619276017401234L, vid, null, 44, 13, eid, ExecutionState2.DEPLOYING, null);
+
+ assertEquals(esce, CommonTestUtils.createCopyWritable(esce));
+ assertEquals(je1, CommonTestUtils.createCopyWritable(je1));
+ assertEquals(je2, CommonTestUtils.createCopyWritable(je2));
+ assertEquals(rce1, CommonTestUtils.createCopyWritable(rce1));
+ assertEquals(rce2, CommonTestUtils.createCopyWritable(rce2));
+ assertEquals(ve1, CommonTestUtils.createCopyWritable(ve1));
+ assertEquals(ve2, CommonTestUtils.createCopyWritable(ve2));
+
+ assertEquals(esce, CommonTestUtils.createCopySerializable(esce));
+ assertEquals(je1, CommonTestUtils.createCopySerializable(je1));
+ assertEquals(je2, CommonTestUtils.createCopySerializable(je2));
+ assertEquals(rce1, CommonTestUtils.createCopySerializable(rce1));
+ assertEquals(rce2, CommonTestUtils.createCopySerializable(rce2));
+ assertEquals(ve1, CommonTestUtils.createCopySerializable(ve1));
+ assertEquals(ve2, CommonTestUtils.createCopySerializable(ve2));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}