You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:12:44 UTC

[20/63] [abbrv] git commit: Introduce execution attempts at execution vertex. Add tests for job event classes

Introduce execution attempts at execution vertex.
Add tests for job event classes


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/43e7d0fa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/43e7d0fa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/43e7d0fa

Branch: refs/heads/master
Commit: 43e7d0fa0a2b71ed615039bcf9e708523b015cf3
Parents: 2ac08a6
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 9 21:12:20 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Sep 20 20:02:48 2014 +0200

----------------------------------------------------------------------
 .../executiongraph/ExecutionAttempt.java        | 111 +++++++++++++
 .../runtime/executiongraph/ExecutionGate.java   |  96 ------------
 .../executiongraph/ExecutionVertex2.java        |  57 +++++--
 .../flink/runtime/event/job/EventsTest.java     | 154 +++++++++++++++++++
 4 files changed, 307 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/43e7d0fa/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttempt.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttempt.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttempt.java
new file mode 100644
index 0000000..b623d6f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttempt.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.flink.runtime.instance.AllocatedSlot;
+
+public class ExecutionAttempt {
+
+	private final AtomicBoolean finished = new AtomicBoolean();
+	
+	private final ExecutionAttemptID attemptId;
+	
+	private final AllocatedSlot assignedResource;
+	
+	private final int attemptNumber;
+	
+	private final long startTimestamp;
+	
+	private volatile long endTimestamp;
+	
+	private volatile Throwable failureCause;
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public ExecutionAttemptID getAttemptId() {
+		return attemptId;
+	}
+	
+	public ExecutionAttempt(ExecutionAttemptID attemptId, AllocatedSlot assignedResource, int attemptNumber, long startTimestamp) {
+		this.attemptId = attemptId;
+		this.assignedResource = assignedResource;
+		this.attemptNumber = attemptNumber;
+		this.startTimestamp = startTimestamp;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+
+	public AllocatedSlot getAssignedResource() {
+		return assignedResource;
+	}
+	
+	public int getAttemptNumber() {
+		return attemptNumber;
+	}
+	
+	public long getStartTimestamp() {
+		return startTimestamp;
+	}
+	
+	public long getEndTimestamp() {
+		return endTimestamp;
+	}
+	
+	public Throwable getFailureCause() {
+		return failureCause;
+	}
+	
+	public boolean isFinished() {
+		return finished.get();
+	}
+	
+	public boolean isFailed() {
+		return finished.get() && failureCause != null;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public boolean finish() {
+		if (finished.compareAndSet(false, true)) {
+			endTimestamp = System.currentTimeMillis();
+			return true;
+		} else {
+			return false;
+		}
+	}
+	
+	public boolean fail(Throwable error) {
+		if (finished.compareAndSet(false, true)) {
+			failureCause = error;
+			return true;
+		} else {
+			return false;
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public String toString() {
+		return String.format("Attempt #%d (%s) @ %s - started %d %s", attemptNumber, attemptId,
+				assignedResource.toString(), startTimestamp, isFinished() ? "finished " + endTimestamp : "[RUNNING]");
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/43e7d0fa/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGate.java
deleted file mode 100644
index 1bfb923..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGate.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.executiongraph;
-
-import java.util.Collection;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-import org.apache.flink.runtime.io.network.channels.ChannelType;
-import org.apache.flink.runtime.io.network.gates.GateID;
-
-/**
- * Objects of this class represent either an {@link InputGate} or {@link OutputGate} within an {@link ExecutionGraph},
- * Nephele's internal scheduling representation for jobs.
- * <p>
- * This class is thread-safe.
- * 
- */
-public final class ExecutionGate {
-
-	private final GateID gateID;
-
-	private volatile ExecutionVertex vertex;
-
-	private final ExecutionGroupEdge groupEdge;
-
-	private final boolean isInputGate;
-
-	private final CopyOnWriteArrayList<ExecutionEdge> edges = new CopyOnWriteArrayList<ExecutionEdge>();
-
-	ExecutionGate(final GateID gateID, final ExecutionVertex vertex, final ExecutionGroupEdge groupEdge,
-			final boolean isInputGate) {
-
-		this.gateID = gateID;
-		this.vertex = vertex;
-		this.groupEdge = groupEdge;
-		this.isInputGate = isInputGate;
-	}
-
-	public GateID getGateID() {
-
-		return this.gateID;
-	}
-
-	public ExecutionVertex getVertex() {
-
-		return this.vertex;
-	}
-
-	public boolean isInputGate() {
-
-		return this.isInputGate;
-	}
-
-	public int getNumberOfEdges() {
-
-		return this.edges.size();
-	}
-
-	public ExecutionEdge getEdge(final int index) {
-
-		return this.edges.get(index);
-	}
-
-	void replaceAllEdges(final Collection<ExecutionEdge> newEdges) {
-		
-		this.edges.clear();
-		this.edges.addAll(newEdges);
-	}
-
-	public ChannelType getChannelType() {
-
-		return this.groupEdge.getChannelType();
-	}
-
-	ExecutionGroupEdge getGroupEdge() {
-
-		return this.groupEdge;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/43e7d0fa/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java
index 4a4d0e7..787605d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.executiongraph;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import org.apache.commons.logging.Log;
@@ -65,8 +66,8 @@ public class ExecutionVertex2 {
 	private static final AtomicReferenceFieldUpdater<ExecutionVertex2, ExecutionState2> STATE_UPDATER =
 			AtomicReferenceFieldUpdater.newUpdater(ExecutionVertex2.class, ExecutionState2.class, "state");
 	
-	private static final AtomicReferenceFieldUpdater<ExecutionVertex2, AllocatedSlot> ASSIGNED_SLOT_UPDATER =
-			AtomicReferenceFieldUpdater.newUpdater(ExecutionVertex2.class, AllocatedSlot.class, "assignedSlot");
+	private static final AtomicReferenceFieldUpdater<ExecutionVertex2, ExecutionAttempt> ATTEMPT_UPDATER =
+			AtomicReferenceFieldUpdater.newUpdater(ExecutionVertex2.class, ExecutionAttempt.class, "currentOrLastAttempt");
 
 	private static final Logger LOG = ExecutionGraph.LOG;
 	
@@ -82,15 +83,13 @@ public class ExecutionVertex2 {
 	
 	private final int subTaskIndex;
 	
-	
 	private final long[] stateTimestamps;
 	
+	private final List<ExecutionAttempt> priorAttempts;
 	
-	private volatile ExecutionState2 state = CREATED;
-	
-	private volatile AllocatedSlot assignedSlot;
+	private volatile ExecutionAttempt currentOrLastAttempt;
 	
-	private volatile Throwable failureCause;
+	private volatile ExecutionState2 state = CREATED;
 	
 	
 	public ExecutionVertex2(ExecutionJobVertex jobVertex, int subTaskIndex, IntermediateResult[] producedDataSets) {
@@ -107,6 +106,7 @@ public class ExecutionVertex2 {
 		this.inputEdges = new ExecutionEdge2[jobVertex.getJobVertex().getInputs().size()][];
 		
 		this.stateTimestamps = new long[ExecutionState2.values().length];
+		this.priorAttempts = new CopyOnWriteArrayList<ExecutionAttempt>();
 	}
 	
 	
@@ -149,21 +149,48 @@ public class ExecutionVertex2 {
 		return state;
 	}
 	
-	public Throwable getFailureCause() {
-		return failureCause;
+	public long getStateTimestamp(ExecutionState2 state) {
+		return this.stateTimestamps[state.ordinal()];
 	}
 	
-	public AllocatedSlot getAssignedResource() {
-		return assignedSlot;
+	private ExecutionGraph getExecutionGraph() {
+		return this.jobVertex.getGraph();
 	}
 	
-	public long getStateTimestamp(ExecutionState2 state) {
-		return this.stateTimestamps[state.ordinal()];
+	public Throwable getLastFailureCause() {
+		// copy reference to the stack
+		ExecutionAttempt attempt = this.currentOrLastAttempt;
+		if (attempt != null) {
+			return attempt.getFailureCause();
+		}
+		else if (priorAttempts.size() > 0) {
+			// since the list is append-only, this always works in the presence of concurrent modifications
+			return priorAttempts.get(priorAttempts.size() - 1).getFailureCause();
+		}
+		else {
+			return null;
+		}
 	}
 	
+	public AllocatedSlot getCurrentAssignedResource() {
+		// copy reference to the stack
+		ExecutionAttempt attempt = this.currentOrLastAttempt;
+		return attempt == null ? null : attempt.getAssignedResource();
+	}
 	
-	private ExecutionGraph getExecutionGraph() {
-		return this.jobVertex.getGraph();
+	public AllocatedSlot getLastAssignedResource() {
+		// copy reference to the stack
+		ExecutionAttempt attempt = this.currentOrLastAttempt;
+		if (attempt != null) {
+			return attempt.getAssignedResource();
+		}
+		else if (priorAttempts.size() > 0) {
+			// since the list is append-only, this always works in the presence of concurrent modifications
+			return priorAttempts.get(priorAttempts.size() - 1).getAssignedResource();
+		}
+		else {
+			return null;
+		}
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/43e7d0fa/flink-runtime/src/test/java/org/apache/flink/runtime/event/job/EventsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/event/job/EventsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/event/job/EventsTest.java
new file mode 100644
index 0000000..ff6732b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/event/job/EventsTest.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.event.job;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.runtime.execution.ExecutionState2;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.junit.Test;
+
+
+public class EventsTest {
+
+	@Test
+	public void testEqualsHashCodeToString() {
+		try {
+			
+			// ExecutionStateChangeEvent
+			{
+				JobVertexID jid = new JobVertexID();
+				ExecutionAttemptID eid = new ExecutionAttemptID();
+				
+				ExecutionStateChangeEvent e1 = new ExecutionStateChangeEvent(429345231796596L, jid, 17, eid, ExecutionState2.CANCELING);
+				ExecutionStateChangeEvent e2 = new ExecutionStateChangeEvent(429345231796596L, jid, 17, eid, ExecutionState2.CANCELING);
+				
+				assertTrue(e1.equals(e2));
+				assertEquals(e1.hashCode(), e2.hashCode());
+				e1.toString();
+			}
+			
+			// JobEvent
+			{
+				JobEvent e1 = new JobEvent(429345231796596L, JobStatus.CANCELED, "testmessage");
+				JobEvent e2 = new JobEvent(429345231796596L, JobStatus.CANCELED, "testmessage");
+				JobEvent e3 = new JobEvent(237217579123412431L, JobStatus.RUNNING, null);
+				JobEvent e4 = new JobEvent(237217579123412431L, JobStatus.RUNNING, null);
+				
+				assertTrue(e1.equals(e2));
+				assertTrue(e3.equals(e4));
+				
+				assertEquals(e1.hashCode(), e2.hashCode());
+				assertEquals(e3.hashCode(), e4.hashCode());
+				e1.toString();
+				e3.toString();
+			}
+			
+			// RecentJobEvent
+			{
+				JobID jid = new JobID();
+				RecentJobEvent e1 = new RecentJobEvent(jid, "some name", JobStatus.FAILED, false, 634563546, 546734734672572457L);
+				RecentJobEvent e2 = new RecentJobEvent(jid, "some name", JobStatus.FAILED, false, 634563546, 546734734672572457L);
+				RecentJobEvent e3 = new RecentJobEvent(jid, null, JobStatus.RUNNING, false, 42364716239476L, 127843618273607L);
+				RecentJobEvent e4 = new RecentJobEvent(jid, null, JobStatus.RUNNING, false, 42364716239476L, 127843618273607L);
+				
+				assertTrue(e1.equals(e2));
+				assertTrue(e3.equals(e4));
+				
+				assertEquals(e1.hashCode(), e2.hashCode());
+				assertEquals(e3.hashCode(), e4.hashCode());
+				e1.toString();
+				e3.toString();
+			}
+			
+			// VertexEvent
+			{
+				JobVertexID jid = new JobVertexID();
+				ExecutionAttemptID eid = new ExecutionAttemptID();
+				
+				VertexEvent e1 = new VertexEvent(64619276017401234L, jid, "peter", 44, 13, eid, ExecutionState2.DEPLOYING, "foo");
+				VertexEvent e2 = new VertexEvent(64619276017401234L, jid, "peter", 44, 13, eid, ExecutionState2.DEPLOYING, "foo");
+				
+				VertexEvent e3 = new VertexEvent(64619276017401234L, jid, null, 44, 13, eid, ExecutionState2.DEPLOYING, null);
+				VertexEvent e4 = new VertexEvent(64619276017401234L, jid, null, 44, 13, eid, ExecutionState2.DEPLOYING, null);
+				
+				assertTrue(e1.equals(e2));
+				assertTrue(e3.equals(e4));
+				
+				assertFalse(e1.equals(e3));
+				
+				assertEquals(e1.hashCode(), e2.hashCode());
+				assertEquals(e3.hashCode(), e4.hashCode());
+				e1.toString();
+				e3.toString();
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testSerialization() {
+		try {
+			JobID jid = new JobID();
+			JobVertexID vid = new JobVertexID();
+			ExecutionAttemptID eid = new ExecutionAttemptID();
+			
+			ExecutionStateChangeEvent esce = new ExecutionStateChangeEvent(429345231796596L, vid, 17, eid, ExecutionState2.CANCELING);
+
+			JobEvent je1 = new JobEvent(429345231796596L, JobStatus.CANCELED, "testmessage");
+			JobEvent je2 = new JobEvent(237217579123412431L, JobStatus.RUNNING, null);
+
+			RecentJobEvent rce1 = new RecentJobEvent(jid, "some name", JobStatus.FAILED, false, 634563546, 546734734672572457L);
+			RecentJobEvent rce2 = new RecentJobEvent(jid, null, JobStatus.RUNNING, false, 42364716239476L, 127843618273607L);
+
+			VertexEvent ve1 = new VertexEvent(64619276017401234L, vid, "peter", 44, 13, eid, ExecutionState2.DEPLOYING, "foo");
+			VertexEvent ve2 = new VertexEvent(64619276017401234L, vid, null, 44, 13, eid, ExecutionState2.DEPLOYING, null);
+			
+			assertEquals(esce, CommonTestUtils.createCopyWritable(esce));
+			assertEquals(je1, CommonTestUtils.createCopyWritable(je1));
+			assertEquals(je2, CommonTestUtils.createCopyWritable(je2));
+			assertEquals(rce1, CommonTestUtils.createCopyWritable(rce1));
+			assertEquals(rce2, CommonTestUtils.createCopyWritable(rce2));
+			assertEquals(ve1, CommonTestUtils.createCopyWritable(ve1));
+			assertEquals(ve2, CommonTestUtils.createCopyWritable(ve2));
+			
+			assertEquals(esce, CommonTestUtils.createCopySerializable(esce));
+			assertEquals(je1, CommonTestUtils.createCopySerializable(je1));
+			assertEquals(je2, CommonTestUtils.createCopySerializable(je2));
+			assertEquals(rce1, CommonTestUtils.createCopySerializable(rce1));
+			assertEquals(rce2, CommonTestUtils.createCopySerializable(rce2));
+			assertEquals(ve1, CommonTestUtils.createCopySerializable(ve1));
+			assertEquals(ve2, CommonTestUtils.createCopySerializable(ve2));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}