You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/08/05 20:06:17 UTC
[2/2] tez git commit: TEZ-2633. Allow VertexManagerPlugins to receive
and report based on attempts instead of tasks (bikas)
TEZ-2633. Allow VertexManagerPlugins to receive and report based on attempts instead of tasks (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/7b45e9a1
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/7b45e9a1
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/7b45e9a1
Branch: refs/heads/master
Commit: 7b45e9a142830e7dd8b0263d50dbaaef5fb0da76
Parents: cc1d89c
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Aug 5 11:05:54 2015 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed Aug 5 11:05:54 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../apache/tez/dag/api/VertexManagerPlugin.java | 45 ++-
.../tez/dag/api/VertexManagerPluginContext.java | 37 ++-
.../apache/tez/runtime/api/DagIdentifier.java | 26 ++
.../tez/runtime/api/TaskAttemptIdentifier.java | 26 ++
.../apache/tez/runtime/api/TaskIdentifier.java | 26 ++
.../tez/runtime/api/VertexIdentifier.java | 28 ++
.../runtime/api/events/VertexManagerEvent.java | 22 +-
.../tez/dag/records/DagIdentifierImpl.java | 69 +++++
.../dag/records/TaskAttemptIdentifierImpl.java | 70 +++++
.../tez/dag/records/TaskIdentifierImpl.java | 70 +++++
.../tez/dag/records/VertexIdentifierImpl.java | 77 ++++++
.../java/org/apache/tez/dag/app/dag/Vertex.java | 4 +-
.../dag/impl/ImmediateStartVertexManager.java | 13 +-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 51 +++-
.../tez/dag/app/dag/impl/VertexManager.java | 65 ++---
.../apache/tez/dag/app/dag/impl/TestCommit.java | 5 +-
.../impl/TestImmediateStartVertexManager.java | 20 +-
.../tez/dag/app/dag/impl/TestVertexImpl.java | 50 ++--
.../tez/dag/app/dag/impl/TestVertexManager.java | 5 +-
.../tez/test/VertexManagerPluginForTest.java | 6 +-
.../vertexmanager/InputReadyVertexManager.java | 36 ++-
.../vertexmanager/ShuffleVertexManager.java | 35 ++-
.../TestInputReadyVertexManager.java | 127 +++++----
.../vertexmanager/TestShuffleVertexManager.java | 272 ++++++++++---------
.../org/apache/tez/test/TestAMRecovery.java | 41 +--
.../tez/test/TestExceptionPropagation.java | 7 +-
.../apache/tez/test/dag/MultiAttemptDAG.java | 29 +-
28 files changed, 906 insertions(+), 358 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c1a4e31..708dee5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,8 @@ INCOMPATIBLE CHANGES
TEZ-2468. Change the minimum Java version to Java 7.
TEZ-2646. Add scheduling casual dependency for attempts
TEZ-2647. Add input causality dependency for attempts
+ TEZ-2633. Allow VertexManagerPlugins to receive and report based on attempts
+ instead of tasks
ALL CHANGES:
TEZ-2684. ShuffleVertexManager.parsePartitionStats throws IllegalStateException: Stats should be initialized.
http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java
index 6aa18d6..b66a66a 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java
@@ -18,6 +18,8 @@
package org.apache.tez.dag.api;
+import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -25,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
/**
@@ -59,20 +62,58 @@ public abstract class VertexManagerPlugin {
*/
public abstract void initialize() throws Exception;
+ @Deprecated
/**
+ * This is replaced by {@link VertexManagerPlugin#onVertexStarted(List)}
* Notification that the vertex is ready to start running tasks
* @param completions Source vertices and all their tasks that have already completed
* @throws Exception
*/
- public abstract void onVertexStarted(Map<String, List<Integer>> completions) throws Exception;
+ public void onVertexStarted(Map<String, List<Integer>> completions) throws Exception {
+ throw new UnsupportedOperationException();
+ }
/**
+ * Notification that the vertex is ready to start running tasks
+ * @param completions All the source task attempts that have already completed
+ * @throws Exception
+ */
+ public void onVertexStarted(List<TaskAttemptIdentifier> completions) throws Exception {
+ Map<String, List<Integer>> completionsMap = new HashMap<String, List<Integer>>();
+ for (TaskAttemptIdentifier attempt : completions) {
+ String vName = attempt.getTaskIdentifier().getVertexIdentifier().getName();
+ List<Integer> tasks = completionsMap.get(vName);
+ if (tasks == null) {
+ tasks = new LinkedList<Integer>();
+ completionsMap.put(vName, tasks);
+ }
+ tasks.add(attempt.getTaskIdentifier().getIdentifier());
+ }
+ onVertexStarted(completionsMap);
+ }
+
+ @Deprecated
+ /**
+ * This has been replaced by
+ * {@link VertexManagerPlugin#onSourceTaskCompleted(TaskAttemptIdentifier)}
* Notification of a source vertex completion.
* @param srcVertexName
* @param taskId Index of the task that completed
* @throws Exception
*/
- public abstract void onSourceTaskCompleted(String srcVertexName, Integer taskId) throws Exception;
+ public void onSourceTaskCompleted(String srcVertexName, Integer taskId) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Notification of a source vertex task completion.
+ * @param attempt Identifier of the task attempt that completed
+ * @throws Exception
+ */
+ public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) throws Exception {
+ onSourceTaskCompleted(attempt.getTaskIdentifier().getVertexIdentifier().getName(),
+ attempt.getTaskIdentifier().getIdentifier());
+ }
/**
* Notification of an event directly sent to this vertex manager
http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
index 1b012ae..883387b 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
@@ -27,7 +27,6 @@ import javax.annotation.Nullable;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.runtime.api.InputSpecUpdate;
@@ -43,11 +42,35 @@ import com.google.common.base.Preconditions;
@Public
public interface VertexManagerPluginContext {
+ public class ScheduleTaskRequest {
+ int taskIndex;
+ TaskLocationHint locationHint;
+
+ public static ScheduleTaskRequest create(int taskIndex, @Nullable TaskLocationHint locationHint) {
+ return new ScheduleTaskRequest(taskIndex, locationHint);
+ }
+
+ private ScheduleTaskRequest(int taskIndex, @Nullable TaskLocationHint locationHint) {
+ Preconditions.checkState(taskIndex >= 0);
+ this.taskIndex = taskIndex;
+ this.locationHint = locationHint;
+ }
+
+ public int getTaskIndex() {
+ return taskIndex;
+ }
+
+ public TaskLocationHint getTaskLocationHint() {
+ return locationHint;
+ }
+ }
+
+ @Deprecated
public class TaskWithLocationHint {
Integer taskIndex;
TaskLocationHint locationHint;
public TaskWithLocationHint(Integer taskIndex, @Nullable TaskLocationHint locationHint) {
- Preconditions.checkNotNull(taskIndex);
+ Preconditions.checkState(taskIndex != null);
this.taskIndex = taskIndex;
this.locationHint = locationHint;
}
@@ -152,7 +175,7 @@ public interface VertexManagerPluginContext {
* destination tasks may need to be updated to account for the new task
* parallelism. This method can be called to update the parallelism multiple
* times until any of the tasks of the vertex have been scheduled (by invoking
- * {@link #scheduleVertexTasks(List)}. If needed, the original source edge
+ * {@link #scheduleTasks(List)}. If needed, the original source edge
* properties may be obtained via {@link #getInputVertexEdgeProperties()}
*
* @param parallelism
@@ -209,13 +232,21 @@ public interface VertexManagerPluginContext {
*/
public void addRootInputEvents(String inputName, Collection<InputDataInformationEvent> events);
+ @Deprecated
/**
+ * Replaced by {@link #scheduleTasks(List)}
* Notify the vertex to start the given tasks
* @param tasks Indices of the tasks to be started
*/
public void scheduleVertexTasks(List<TaskWithLocationHint> tasks);
/**
+ * Notify the vertex to schedule the given tasks
+ * @param tasks Identifier and metadata for the tasks to schedule
+ */
+ public void scheduleTasks(List<ScheduleTaskRequest> tasks);
+
+ /**
* Get the names of the non-vertex inputs of this vertex. These are primary
* sources of data.
* @return Names of inputs to this vertex. Maybe null if there are no inputs
http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-api/src/main/java/org/apache/tez/runtime/api/DagIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/DagIdentifier.java b/tez-api/src/main/java/org/apache/tez/runtime/api/DagIdentifier.java
new file mode 100644
index 0000000..dd63b4c
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/DagIdentifier.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api;
+
+public interface DagIdentifier {
+
+ public String getName();
+
+ public int getIdentifier();
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-api/src/main/java/org/apache/tez/runtime/api/TaskAttemptIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TaskAttemptIdentifier.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskAttemptIdentifier.java
new file mode 100644
index 0000000..101fa91
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskAttemptIdentifier.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api;
+
+public interface TaskAttemptIdentifier {
+
+ public int getIdentifier();
+
+ public TaskIdentifier getTaskIdentifier();
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-api/src/main/java/org/apache/tez/runtime/api/TaskIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TaskIdentifier.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskIdentifier.java
new file mode 100644
index 0000000..8ef066b
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskIdentifier.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api;
+
+public interface TaskIdentifier {
+
+ public int getIdentifier();
+
+ public VertexIdentifier getVertexIdentifier();
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-api/src/main/java/org/apache/tez/runtime/api/VertexIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/VertexIdentifier.java b/tez-api/src/main/java/org/apache/tez/runtime/api/VertexIdentifier.java
new file mode 100644
index 0000000..16e88ad
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/VertexIdentifier.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api;
+
+public interface VertexIdentifier {
+
+ public int getIdentifier();
+
+ public String getName();
+
+ public DagIdentifier getDagIdentifier();
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-api/src/main/java/org/apache/tez/runtime/api/events/VertexManagerEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/VertexManagerEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/VertexManagerEvent.java
index 484087e..9e73fe5 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/VertexManagerEvent.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/VertexManagerEvent.java
@@ -20,16 +20,19 @@ package org.apache.tez.runtime.api.events;
import java.nio.ByteBuffer;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import com.google.common.base.Preconditions;
/**
* Event used to send information from a Task to the VertexManager for a vertex.
* This may be used to send statistics like samples etc to the VertexManager for
- * automatic plan recofigurations based on observed statistics
+ * automatic plan reconfigurations based on observed statistics
*/
@Unstable
@Public
@@ -40,6 +43,8 @@ public class VertexManagerEvent extends Event {
*/
private final String targetVertexName;
+ private TaskAttemptIdentifier producerAttempt;
+
/**
* User payload to be sent
*/
@@ -68,4 +73,19 @@ public class VertexManagerEvent extends Event {
public ByteBuffer getUserPayload() {
return userPayload == null ? null : userPayload.asReadOnlyBuffer();
}
+
+ /**
+ * Get metadata about the task attempt that produced the event.
+ * This method will provide a valid return value only when invoked in the
+ * {@link VertexManagerPlugin}
+ * @return attempt metadata
+ */
+ public TaskAttemptIdentifier getProducerAttemptIdentifier() {
+ return producerAttempt;
+ }
+
+ @Private
+ public void setProducerAttemptIdentifier(TaskAttemptIdentifier producerAttempt) {
+ this.producerAttempt = producerAttempt;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-common/src/main/java/org/apache/tez/dag/records/DagIdentifierImpl.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/DagIdentifierImpl.java b/tez-common/src/main/java/org/apache/tez/dag/records/DagIdentifierImpl.java
new file mode 100644
index 0000000..099cb58
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/DagIdentifierImpl.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.records;
+
+import org.apache.tez.runtime.api.DagIdentifier;
+
+public class DagIdentifierImpl implements DagIdentifier {
+
+ private final TezDAGID dagId;
+ private final String dagName;
+
+ public DagIdentifierImpl(String dagName, TezDAGID dagId) {
+ this.dagId = dagId;
+ this.dagName = dagName;
+ }
+
+ @Override
+ public String getName() {
+ return dagName;
+ }
+
+ @Override
+ public int getIdentifier() {
+ return dagId.getId();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if(o == null) {
+ return false;
+ }
+ if (o.getClass() == this.getClass()) {
+ DagIdentifierImpl other = (DagIdentifierImpl) o;
+ return this.dagId.equals(other.dagId);
+ }
+ else {
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "Dag: " + dagName + ":[" + getIdentifier() + "]";
+ }
+
+ @Override
+ public int hashCode() {
+ return dagId.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptIdentifierImpl.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptIdentifierImpl.java b/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptIdentifierImpl.java
new file mode 100644
index 0000000..b834111
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptIdentifierImpl.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.records;
+
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.apache.tez.runtime.api.TaskIdentifier;
+
+public class TaskAttemptIdentifierImpl implements TaskAttemptIdentifier {
+
+ private final TaskIdentifier taskIdentifier;
+ private final TezTaskAttemptID attemptId;
+
+ public TaskAttemptIdentifierImpl(String dagName, String vertexName, TezTaskAttemptID attemptId) {
+ this.attemptId = attemptId;
+ this.taskIdentifier = new TaskIdentifierImpl(dagName, vertexName, attemptId.getTaskID());
+ }
+
+ @Override
+ public int getIdentifier() {
+ return attemptId.getId();
+ }
+
+ @Override
+ public TaskIdentifier getTaskIdentifier() {
+ return taskIdentifier;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if(o == null) {
+ return false;
+ }
+ if (o.getClass() == this.getClass()) {
+ TaskAttemptIdentifierImpl other = (TaskAttemptIdentifierImpl) o;
+ return this.attemptId.equals(other.attemptId);
+ }
+ else {
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return taskIdentifier.toString() + " Attempt: [" + getIdentifier() + "]";
+ }
+
+ @Override
+ public int hashCode() {
+ return attemptId.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-common/src/main/java/org/apache/tez/dag/records/TaskIdentifierImpl.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TaskIdentifierImpl.java b/tez-common/src/main/java/org/apache/tez/dag/records/TaskIdentifierImpl.java
new file mode 100644
index 0000000..fb0848a
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TaskIdentifierImpl.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.records;
+
+import org.apache.tez.runtime.api.TaskIdentifier;
+import org.apache.tez.runtime.api.VertexIdentifier;
+
+public class TaskIdentifierImpl implements TaskIdentifier {
+
+ private final VertexIdentifier vertexIdentifier;
+ private final TezTaskID taskId;
+
+ public TaskIdentifierImpl(String dagName, String vertexName, TezTaskID taskId) {
+ this.taskId = taskId;
+ this.vertexIdentifier = new VertexIdentifierImpl(dagName, vertexName, taskId.getVertexID());
+ }
+
+ @Override
+ public int getIdentifier() {
+ return taskId.getId();
+ }
+
+ @Override
+ public VertexIdentifier getVertexIdentifier() {
+ return vertexIdentifier;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if(o == null) {
+ return false;
+ }
+ if (o.getClass() == this.getClass()) {
+ TaskIdentifierImpl other = (TaskIdentifierImpl) o;
+ return this.taskId.equals(other.taskId);
+ }
+ else {
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return vertexIdentifier.toString() + " Task [" + getIdentifier() + "]";
+ }
+
+ @Override
+ public int hashCode() {
+ return taskId.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-common/src/main/java/org/apache/tez/dag/records/VertexIdentifierImpl.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/VertexIdentifierImpl.java b/tez-common/src/main/java/org/apache/tez/dag/records/VertexIdentifierImpl.java
new file mode 100644
index 0000000..4480f74
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/VertexIdentifierImpl.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.records;
+
+import org.apache.tez.runtime.api.DagIdentifier;
+import org.apache.tez.runtime.api.VertexIdentifier;
+
+public class VertexIdentifierImpl implements VertexIdentifier {
+
+ private final DagIdentifier dagIdentifier;
+ private final TezVertexID vertexId;
+ private final String vertexName;
+
+ public VertexIdentifierImpl(String dagName, String vertexName, TezVertexID vertexId) {
+ this.vertexId = vertexId;
+ this.vertexName = vertexName;
+ this.dagIdentifier = new DagIdentifierImpl(dagName, vertexId.getDAGId());
+ }
+
+ @Override
+ public String getName() {
+ return vertexName;
+ }
+
+ @Override
+ public int getIdentifier() {
+ return vertexId.getId();
+ }
+
+ @Override
+ public DagIdentifier getDagIdentifier() {
+ return dagIdentifier;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if(o == null) {
+ return false;
+ }
+ if (o.getClass() == this.getClass()) {
+ VertexIdentifierImpl other = (VertexIdentifierImpl) o;
+ return this.vertexId.equals(other.vertexId);
+ }
+ else {
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return dagIdentifier.toString() + " Vertex: " + vertexName + ":[" + getIdentifier() + "]";
+ }
+
+ @Override
+ public int hashCode() {
+ return vertexId.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index bb3548d..ab7941e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -37,8 +37,8 @@ import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.RootInputLeafOutput;
import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
import org.apache.tez.dag.api.TaskLocationHint;
-import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
@@ -142,7 +142,7 @@ public interface Vertex extends Comparable<Vertex> {
int getInputVerticesCount();
int getOutputVerticesCount();
- void scheduleTasks(List<TaskWithLocationHint> tasks);
+ void scheduleTasks(List<ScheduleTaskRequest> tasks);
void scheduleSpeculativeTask(TezTaskID taskId);
Resource getTaskResource();
http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
index 5e179bd..50624dd 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
@@ -28,10 +28,11 @@ import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
-import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import java.util.EnumSet;
@@ -56,7 +57,7 @@ public class ImmediateStartVertexManager extends VertexManagerPlugin {
}
@Override
- public void onVertexStarted(Map<String, List<Integer>> completions) {
+ public void onVertexStarted(List<TaskAttemptIdentifier> completions) {
managedTasks = getContext().getVertexNumTasks(getContext().getVertexName());
Map<String, EdgeProperty> edges = getContext().getInputVertexEdgeProperties();
for (Map.Entry<String, EdgeProperty> entry : edges.entrySet()) {
@@ -90,14 +91,14 @@ public class ImmediateStartVertexManager extends VertexManagerPlugin {
}
tasksScheduled = true;
- List<TaskWithLocationHint> tasksToStart = Lists.newArrayListWithCapacity(managedTasks);
+ List<ScheduleTaskRequest> tasksToStart = Lists.newArrayListWithCapacity(managedTasks);
for (int i = 0; i < managedTasks; ++i) {
- tasksToStart.add(new TaskWithLocationHint(i, null));
+ tasksToStart.add(ScheduleTaskRequest.create(i, null));
}
if (!tasksToStart.isEmpty()) {
LOG.info("Starting " + tasksToStart.size() + " in " + getContext().getVertexName());
- getContext().scheduleVertexTasks(tasksToStart);
+ getContext().scheduleTasks(tasksToStart);
}
// all tasks scheduled. Can call vertexManagerDone().
// TODO TEZ-1714 for locking issues getContext().vertexManagerDone();
@@ -134,7 +135,7 @@ public class ImmediateStartVertexManager extends VertexManagerPlugin {
}
@Override
- public void onSourceTaskCompleted(String srcVertexName, Integer attemptId) {
+ public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 9519fa9..accfa62 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -77,7 +77,7 @@ import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.TaskLocationHint;
-import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.api.client.ProgressBuilder;
import org.apache.tez.dag.api.client.StatusGetOpts;
@@ -149,6 +149,7 @@ import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
import org.apache.tez.dag.history.events.VertexStartedEvent;
import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
+import org.apache.tez.dag.records.TaskAttemptIdentifierImpl;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -160,6 +161,7 @@ import org.apache.tez.runtime.api.OutputCommitter;
import org.apache.tez.runtime.api.OutputCommitterContext;
import org.apache.tez.runtime.api.InputSpecUpdate;
import org.apache.tez.runtime.api.OutputStatistics;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.VertexStatistics;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.events.DataMovementEvent;
@@ -1489,16 +1491,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
getInputSpecList(taskIndex), getOutputSpecList(taskIndex),
getGroupInputSpecList(taskIndex));
}
-
+
@Override
- public void scheduleTasks(List<TaskWithLocationHint> tasksToSchedule) {
+ public void scheduleTasks(List<ScheduleTaskRequest> tasksToSchedule) {
try {
unsetTasksNotYetScheduled();
// update state under write lock
writeLock.lock();
try {
- for (TaskWithLocationHint task : tasksToSchedule) {
- if (numTasks <= task.getTaskIndex().intValue()) {
+ for (ScheduleTaskRequest task : tasksToSchedule) {
+ if (numTasks <= task.getTaskIndex()) {
throw new TezUncheckedException(
"Invalid taskId: " + task.getTaskIndex() + " for vertex: " + logIdentifier);
}
@@ -1507,7 +1509,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
if (taskLocationHints == null) {
taskLocationHints = new TaskLocationHint[numTasks];
}
- taskLocationHints[task.getTaskIndex().intValue()] = locationHint;
+ taskLocationHints[task.getTaskIndex()] = locationHint;
}
}
} finally {
@@ -1516,8 +1518,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
readLock.lock();
try {
- for (TaskWithLocationHint task : tasksToSchedule) {
- TezTaskID taskId = TezTaskID.getInstance(vertexId, task.getTaskIndex().intValue());
+ for (ScheduleTaskRequest task : tasksToSchedule) {
+ TezTaskID taskId = TezTaskID.getInstance(vertexId, task.getTaskIndex());
TaskSpec baseTaskSpec = createRemoteTaskSpec(taskId.getId());
eventHandler.handle(new TaskEventScheduleTask(taskId, baseTaskSpec,
getTaskLocationHint(taskId)));
@@ -2755,8 +2757,24 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
}
+ private static List<TaskAttemptIdentifier> getTaskAttemptIdentifiers(DAG dag,
+ List<TezTaskAttemptID> taIds) {
+ List<TaskAttemptIdentifier> attempts = new ArrayList<>(taIds.size());
+ String dagName = dag.getName();
+ for (TezTaskAttemptID taId : taIds) {
+ String vertexName = dag.getVertex(taId.getTaskID().getVertexID()).getName();
+ attempts.add(getTaskAttemptIdentifier(dagName, vertexName, taId));
+ }
+ return attempts;
+ }
+
+ private static TaskAttemptIdentifier getTaskAttemptIdentifier(String dagName, String vertexName,
+ TezTaskAttemptID taId) {
+ return new TaskAttemptIdentifierImpl(dagName, vertexName, taId);
+ }
+
private void recoveryCodeSimulatingStart() throws AMUserCodeException {
- vertexManager.onVertexStarted(pendingReportedSrcCompletions);
+ vertexManager.onVertexStarted(getTaskAttemptIdentifiers(dag, pendingReportedSrcCompletions));
// This code is duplicated from startVertex() because recovery does not follow normal
// transitions. To be removed after recovery code is fixed.
maybeSendConfiguredEvent();
@@ -3552,7 +3570,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
startedTime = clock.getTime();
try {
- vertexManager.onVertexStarted(pendingReportedSrcCompletions);
+ vertexManager.onVertexStarted(getTaskAttemptIdentifiers(dag, pendingReportedSrcCompletions));
} catch (AMUserCodeException e) {
String msg = "Exception in " + e.getSource() +", vertex=" + logIdentifier;
LOG.error(msg, e);
@@ -3811,8 +3829,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
if (vertex.getState() == VertexState.RUNNING) {
try {
// Inform the vertex manager about the source task completing.
- vertex.vertexManager.onSourceTaskCompleted(completionEvent
- .getTaskAttemptId().getTaskID());
+ TezTaskAttemptID taId = completionEvent.getTaskAttemptId();
+ vertex.vertexManager.onSourceTaskCompleted(
+ getTaskAttemptIdentifier(vertex.dag.getName(),
+ vertex.dag.getVertex(taId.getTaskID().getVertexID()).getName(),
+ taId));
} catch (AMUserCodeException e) {
String msg = "Exception in " + e.getSource() + ", vertex:" + vertex.getLogIdentifier();
LOG.error(msg, e);
@@ -4317,6 +4338,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
Vertex target = getDAG().getVertex(vmEvent.getTargetVertexName());
Preconditions.checkArgument(target != null,
"Event sent to unkown vertex: " + vmEvent.getTargetVertexName());
+ TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID();
+ if (srcTaId.getTaskID().getVertexID().equals(vertexId)) {
+ // this is the producer tasks' vertex
+ vmEvent.setProducerAttemptIdentifier(
+ getTaskAttemptIdentifier(dag.getName(), getName(), srcTaId));
+ }
if (target == this) {
vertexManager.onVertexManagerEventReceived(vmEvent);
} else {
http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index 64eb80f..9476860 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -36,7 +36,6 @@ import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
@@ -54,8 +53,6 @@ import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.dag.StateChangeNotifier;
-import org.apache.tez.dag.app.dag.Task;
-import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.CallableEvent;
import org.apache.tez.dag.app.dag.event.DAGEvent;
@@ -64,10 +61,9 @@ import org.apache.tez.dag.app.dag.event.VertexEventInputDataInformation;
import org.apache.tez.dag.app.dag.event.VertexEventManagerUserCodeError;
import org.apache.tez.dag.app.dag.impl.AMUserCodeException.Source;
import org.apache.tez.dag.app.dag.VertexStateUpdateListener;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.InputSpecUpdate;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.VertexStatistics;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
@@ -84,7 +80,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
-@SuppressWarnings("unchecked")
+@SuppressWarnings({"unchecked", "deprecation"})
public class VertexManager {
final VertexManagerPluginDescriptor pluginDesc;
final UserGroupInformation dagUgi;
@@ -201,10 +197,21 @@ public class VertexManager {
}
@Override
- public synchronized void scheduleVertexTasks(List<TaskWithLocationHint> tasks) {
+ public synchronized void scheduleTasks(List<ScheduleTaskRequest> tasks) {
checkAndThrowIfDone();
managedVertex.scheduleTasks(tasks);
}
+
+ @Override
+ public synchronized void scheduleVertexTasks(List<TaskWithLocationHint> tasks) {
+ checkAndThrowIfDone();
+ List<ScheduleTaskRequest> schedTasks = new ArrayList<ScheduleTaskRequest>(tasks.size());
+ for (TaskWithLocationHint task : tasks) {
+ schedTasks.add(ScheduleTaskRequest.create(
+ task.getTaskIndex(), task.getTaskLocationHint()));
+ }
+ scheduleTasks(schedTasks);
+ }
@Nullable
@Override
@@ -441,30 +448,12 @@ public class VertexManager {
}
}
- public void onVertexStarted(List<TezTaskAttemptID> completions) throws AMUserCodeException {
- Map<String, List<Integer>> pluginCompletionsMap = Maps.newHashMap();
- if (completions != null && !completions.isEmpty()) {
- for (TezTaskAttemptID tezTaskAttemptID : completions) {
- Integer taskId = Integer.valueOf(tezTaskAttemptID.getTaskID().getId());
- String vertexName =
- appContext.getCurrentDAG().getVertex(
- tezTaskAttemptID.getTaskID().getVertexID()).getName();
- List<Integer> taskIdList = pluginCompletionsMap.get(vertexName);
- if (taskIdList == null) {
- taskIdList = Lists.newArrayList();
- pluginCompletionsMap.put(vertexName, taskIdList);
- }
- taskIdList.add(taskId);
- }
- }
- enqueueAndScheduleNextEvent(new VertexManagerEventOnVertexStarted(pluginCompletionsMap));
+ public void onVertexStarted(List<TaskAttemptIdentifier> completions) throws AMUserCodeException {
+ enqueueAndScheduleNextEvent(new VertexManagerEventOnVertexStarted(completions));
}
- public void onSourceTaskCompleted(TezTaskID tezTaskId) throws AMUserCodeException {
- Integer taskId = Integer.valueOf(tezTaskId.getId());
- String vertexName =
- appContext.getCurrentDAG().getVertex(tezTaskId.getVertexID()).getName();
- enqueueAndScheduleNextEvent(new VertexManagerEventSourceTaskCompleted(taskId, vertexName));
+ public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) throws AMUserCodeException {
+ enqueueAndScheduleNextEvent(new VertexManagerEventSourceTaskCompleted(attempt));
}
public void onVertexManagerEventReceived(
@@ -562,31 +551,29 @@ public class VertexManager {
}
class VertexManagerEventOnVertexStarted extends VertexManagerEvent {
- private final Map<String, List<Integer>> pluginCompletionsMap;
+ private final List<TaskAttemptIdentifier> pluginCompletions;
- public VertexManagerEventOnVertexStarted(Map<String, List<Integer>> pluginCompletionsMap) {
- this.pluginCompletionsMap = pluginCompletionsMap;
+ public VertexManagerEventOnVertexStarted(List<TaskAttemptIdentifier> pluginCompletions) {
+ this.pluginCompletions = pluginCompletions;
}
@Override
public void invoke() throws Exception {
- plugin.onVertexStarted(pluginCompletionsMap);
+ plugin.onVertexStarted(pluginCompletions);
}
}
class VertexManagerEventSourceTaskCompleted extends VertexManagerEvent {
- private final Integer taskId;
- private final String vertexName;
+ private final TaskAttemptIdentifier attempt;
- public VertexManagerEventSourceTaskCompleted(Integer taskId, String vertexName) {
- this.taskId = taskId;
- this.vertexName = vertexName;
+ public VertexManagerEventSourceTaskCompleted(TaskAttemptIdentifier attempt) {
+ this.attempt = attempt;
}
@Override
public void invoke() throws Exception {
- plugin.onSourceTaskCompleted(vertexName, taskId);
+ plugin.onSourceTaskCompleted(attempt);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
index 6d23df3..83421a2 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
@@ -84,7 +84,6 @@ import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventStartDag;
import org.apache.tez.dag.app.dag.event.DAGEventType;
-import org.apache.tez.dag.app.dag.event.DAGEventVertexReRunning;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEvent;
@@ -101,6 +100,7 @@ import org.apache.tez.dag.history.HistoryEventHandler;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.events.*;
import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.OutputCommitter;
@@ -788,9 +788,10 @@ public class TestCommit {
Assert.assertEquals(VertexState.COMMITTING, v1.getState());
// reschedule task
VertexManagerEvent vmEvent = VertexManagerEvent.create("vertex1", ByteBuffer.wrap(new byte[0]));
+ TezTaskAttemptID taId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(v1.getVertexId(), 0), 0);
TezEvent tezEvent = new TezEvent(vmEvent,
new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1",
- null, null));
+ null, taId));
v1.handle(new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezEvent)));
waitUntil(dag, DAGState.FAILED);
http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestImmediateStartVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestImmediateStartVertexManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestImmediateStartVertexManager.java
index 6d071a7..a17c7c5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestImmediateStartVertexManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestImmediateStartVertexManager.java
@@ -28,9 +28,10 @@ import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
-import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
@@ -92,24 +93,25 @@ public class TestImmediateStartVertexManager {
public Object answer(InvocationOnMock invocation) {
Object[] args = invocation.getArguments();
scheduledTasks.clear();
- List<TaskWithLocationHint> tasks = (List<TaskWithLocationHint>)args[0];
- for (TaskWithLocationHint task : tasks) {
+ List<ScheduleTaskRequest> tasks = (List<ScheduleTaskRequest>)args[0];
+ for (ScheduleTaskRequest task : tasks) {
scheduledTasks.add(task.getTaskIndex());
}
return null;
- }}).when(mockContext).scheduleVertexTasks(anyList());
+ }}).when(mockContext).scheduleTasks(anyList());
+ List<TaskAttemptIdentifier> emptyCompletions = null;
ImmediateStartVertexManager manager = new ImmediateStartVertexManager(mockContext);
manager.initialize();
- manager.onVertexStarted(null);
- verify(mockContext, times(0)).scheduleVertexTasks(anyList());
+ manager.onVertexStarted(emptyCompletions);
+ verify(mockContext, times(0)).scheduleTasks(anyList());
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1,
VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2,
VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3,
VertexState.CONFIGURED));
- verify(mockContext, times(1)).scheduleVertexTasks(anyList());
+ verify(mockContext, times(1)).scheduleTasks(anyList());
Assert.assertEquals(4, scheduledTasks.size());
// simulate race between onVertexStarted and notifications
@@ -123,8 +125,8 @@ public class TestImmediateStartVertexManager {
return null;
}}).when(mockContext).registerForVertexStateUpdates(anyString(), anySet());
raceManager.initialize();
- raceManager.onVertexStarted(null);
- verify(mockContext, times(2)).scheduleVertexTasks(anyList());
+ raceManager.onVertexStarted(emptyCompletions);
+ verify(mockContext, times(2)).scheduleTasks(anyList());
Assert.assertEquals(4, scheduledTasks.size());
}
http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 3c0dd1e..cfc297e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -94,7 +94,7 @@ import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
-import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.api.client.VertexStatus;
import org.apache.tez.dag.api.event.VertexStateUpdate;
@@ -161,6 +161,7 @@ import org.apache.tez.dag.history.HistoryEventHandler;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
+import org.apache.tez.dag.records.TaskAttemptIdentifierImpl;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -170,6 +171,7 @@ import org.apache.tez.dag.utils.TaskSpecificLaunchCmdOption;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.OutputCommitter;
import org.apache.tez.runtime.api.OutputCommitterContext;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.InputSpecUpdate;
import org.apache.tez.runtime.api.InputInitializer;
import org.apache.tez.runtime.api.InputInitializerContext;
@@ -2513,10 +2515,10 @@ public class TestVertexImpl {
// verify all events have been put in pending.
// this is not necessary after legacy routing has been removed
Assert.assertEquals(5, v4.pendingTaskEvents.size());
- List<TaskWithLocationHint> taskList = new LinkedList<VertexManagerPluginContext.TaskWithLocationHint>();
+ List<ScheduleTaskRequest> taskList = new LinkedList<VertexManagerPluginContext.ScheduleTaskRequest>();
// scheduling start to trigger edge routing to begin
for (int i=0; i<v4.getTotalTasks(); ++i) {
- taskList.add(new TaskWithLocationHint(i, null));
+ taskList.add(ScheduleTaskRequest.create(i, null));
}
v4.scheduleTasks(taskList);
dispatcher.await();
@@ -2604,10 +2606,10 @@ public class TestVertexImpl {
VertexImpl v3 = vertices.get("vertex3");
VertexImpl v4 = vertices.get("vertex4");
- List<TaskWithLocationHint> taskList = new LinkedList<VertexManagerPluginContext.TaskWithLocationHint>();
+ List<ScheduleTaskRequest> taskList = new LinkedList<VertexManagerPluginContext.ScheduleTaskRequest>();
// scheduling start to trigger edge routing to begin
for (int i=0; i<v4.getTotalTasks(); ++i) {
- taskList.add(new TaskWithLocationHint(i, null));
+ taskList.add(ScheduleTaskRequest.create(i, null));
}
v4.scheduleTasks(taskList);
Assert.assertEquals(VertexState.RUNNING, v4.getState());
@@ -2662,10 +2664,10 @@ public class TestVertexImpl {
VertexImpl v3 = vertices.get("vertex3");
VertexImpl v4 = vertices.get("vertex4");
- List<TaskWithLocationHint> taskList = new LinkedList<VertexManagerPluginContext.TaskWithLocationHint>();
+ List<ScheduleTaskRequest> taskList = new LinkedList<VertexManagerPluginContext.ScheduleTaskRequest>();
// scheduling start to trigger edge routing to begin
for (int i=0; i<v4.getTotalTasks(); ++i) {
- taskList.add(new TaskWithLocationHint(i, null));
+ taskList.add(ScheduleTaskRequest.create(i, null));
}
v4.scheduleTasks(taskList);
Assert.assertEquals(VertexState.RUNNING, v4.getState());
@@ -2856,7 +2858,7 @@ public class TestVertexImpl {
startVertex(v1);
v3.reconfigureVertex(10, null, null);
checkTasks(v3, 10);
- v3.scheduleTasks(Collections.singletonList(new TaskWithLocationHint(new Integer(0), null)));
+ v3.scheduleTasks(Collections.singletonList(ScheduleTaskRequest.create(0, null)));
try {
v3.reconfigureVertex(5, null, null);
Assert.fail();
@@ -2881,7 +2883,7 @@ public class TestVertexImpl {
checkTasks(v3, 10);
taskEventDispatcher.events.clear();
TaskLocationHint mockLocation = mock(TaskLocationHint.class);
- v3.scheduleTasks(Collections.singletonList(new TaskWithLocationHint(new Integer(0), mockLocation)));
+ v3.scheduleTasks(Collections.singletonList(ScheduleTaskRequest.create(0, mockLocation)));
dispatcher.await();
Assert.assertEquals(1, taskEventDispatcher.events.size());
TaskEventScheduleTask event = (TaskEventScheduleTask) taskEventDispatcher.events.get(0);
@@ -2901,7 +2903,7 @@ public class TestVertexImpl {
VertexImpl v1 = vertices.get("vertex1");
startVertex(vertices.get("vertex2"));
startVertex(v1);
- v3.scheduleTasks(Collections.singletonList(new TaskWithLocationHint(new Integer(0), null)));
+ v3.scheduleTasks(Collections.singletonList(ScheduleTaskRequest.create(0, null)));
try {
v3.reconfigureVertex(5, null, null);
Assert.fail();
@@ -2937,7 +2939,7 @@ public class TestVertexImpl {
new VertexEventRouteEvent(v3.getVertexId(), taskEvents));
dispatcher.await();
Assert.assertEquals(2, v3.pendingTaskEvents.size());
- v3.scheduleTasks(Collections.singletonList(new TaskWithLocationHint(new Integer(0), null)));
+ v3.scheduleTasks(Collections.singletonList(ScheduleTaskRequest.create(0, null)));
dispatcher.await();
Assert.assertEquals(0, v3.pendingTaskEvents.size());
// send events and test that they are not buffered anymore
@@ -4892,10 +4894,10 @@ public class TestVertexImpl {
Assert.assertEquals(1, inputSpecs.get(0).getPhysicalEdgeCount());
}
- List<TaskWithLocationHint> taskList = new LinkedList<VertexManagerPluginContext.TaskWithLocationHint>();
+ List<ScheduleTaskRequest> taskList = new LinkedList<VertexManagerPluginContext.ScheduleTaskRequest>();
// scheduling start to trigger edge routing to begin
for (int i=0; i<v1.getTotalTasks(); ++i) {
- taskList.add(new TaskWithLocationHint(i, null));
+ taskList.add(ScheduleTaskRequest.create(i, null));
}
v1.scheduleTasks(taskList);
dispatcher.await();
@@ -4940,10 +4942,10 @@ public class TestVertexImpl {
Assert.assertEquals(true, initializerManager2.hasShutDown);
// scheduling start to trigger edge routing to begin
- taskList = new LinkedList<VertexManagerPluginContext.TaskWithLocationHint>();
+ taskList = new LinkedList<VertexManagerPluginContext.ScheduleTaskRequest>();
// scheduling start to trigger edge routing to begin
for (int i=0; i<v2.getTotalTasks(); ++i) {
- taskList.add(new TaskWithLocationHint(i, null));
+ taskList.add(ScheduleTaskRequest.create(i, null));
}
v2.scheduleTasks(taskList);
dispatcher.await();
@@ -5770,8 +5772,10 @@ public class TestVertexImpl {
RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager();
initializerManager1.completeInputInitialization();
- Event vmEvent = VertexManagerEvent.create(v1.getName(), ByteBuffer.wrap(new byte[0]));
- TezEvent tezEvent = new TezEvent(vmEvent, null);
+ VertexManagerEvent vmEvent = VertexManagerEvent.create(v1.getName(), ByteBuffer.wrap(new byte[0]));
+ TezTaskAttemptID taId1 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(v1.getVertexId(), 0), 0);
+ TezEvent tezEvent = new TezEvent(vmEvent, new EventMetaData(EventProducerConsumerType.OUTPUT,
+ v1.getName(), null, taId1));
dispatcher.getEventHandler().handle(new VertexEventRouteEvent(v1.getVertexId(), Lists.newArrayList(tezEvent)));
dispatcher.await();
@@ -6043,7 +6047,7 @@ public class TestVertexImpl {
verify(historyEventHandler, atLeast(1)).handle(argCaptor.capture());
verifyHistoryEvents(argCaptor.getAllValues(), HistoryEventType.VERTEX_DATA_MOVEMENT_EVENTS_GENERATED, 1);
- v3.scheduleTasks(Lists.newArrayList(new TaskWithLocationHint(0, null)));
+ v3.scheduleTasks(Lists.newArrayList(ScheduleTaskRequest.create(0, null)));
dispatcher.await();
assertTrue(v3.pendingTaskEvents.size() == 0);
// recovery events is not only handled one time
@@ -6078,11 +6082,11 @@ public class TestVertexImpl {
}
@Override
- public void onVertexStarted(Map<String, List<Integer>> completions) {
+ public void onVertexStarted(List<TaskAttemptIdentifier> completions) {
}
@Override
- public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
+ public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
}
@Override
@@ -6189,15 +6193,15 @@ public class TestVertexImpl {
}
@Override
- public void onSourceTaskCompleted(String srcVertexName, Integer attemptId) {
+ public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
if (this.exLocation == VMExceptionLocation.OnSourceTaskCompleted) {
throw new RuntimeException(this.exLocation.name());
}
- super.onSourceTaskCompleted(srcVertexName, attemptId);
+ super.onSourceTaskCompleted(attempt);
}
@Override
- public void onVertexStarted(Map<String, List<Integer>> completions) {
+ public void onVertexStarted(List<TaskAttemptIdentifier> completions) {
if (this.exLocation == VMExceptionLocation.OnVertexStarted) {
throw new RuntimeException(this.exLocation.name());
}
http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
index 81cb42a..9c16f5e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
@@ -47,6 +47,7 @@ import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.CallableEvent;
import org.apache.tez.dag.app.dag.event.VertexEventInputDataInformation;
import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.api.impl.TezEvent;
@@ -183,11 +184,11 @@ public class TestVertexManager {
}
@Override
- public void onVertexStarted(Map<String, List<Integer>> completions) {
+ public void onVertexStarted(List<TaskAttemptIdentifier> completions) {
}
@Override
- public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
+ public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-dag/src/test/java/org/apache/tez/test/VertexManagerPluginForTest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/test/VertexManagerPluginForTest.java b/tez-dag/src/test/java/org/apache/tez/test/VertexManagerPluginForTest.java
index 84e060b..1cdaeca 100644
--- a/tez-dag/src/test/java/org/apache/tez/test/VertexManagerPluginForTest.java
+++ b/tez-dag/src/test/java/org/apache/tez/test/VertexManagerPluginForTest.java
@@ -25,7 +25,6 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
-import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.api.InputDescriptor;
@@ -34,6 +33,7 @@ import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
public class VertexManagerPluginForTest extends VertexManagerPlugin {
@@ -99,14 +99,14 @@ public class VertexManagerPluginForTest extends VertexManagerPlugin {
}
@Override
- public void onVertexStarted(Map<String, List<Integer>> completions) {
+ public void onVertexStarted(List<TaskAttemptIdentifier> completions) {
if (pluginConfig.getReconfigureOnStart()) {
getContext().reconfigureVertex(pluginConfig.getNumTasks(), null, null);
}
}
@Override
- public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {}
+ public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {}
@Override
public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {}
http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
index 30e3e81..f05cd95 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
@@ -18,7 +18,6 @@
package org.apache.tez.dag.library.vertexmanager;
-import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
@@ -34,17 +33,16 @@ import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
-import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import com.google.common.base.Preconditions;
-import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
@Private
public class InputReadyVertexManager extends VertexManagerPlugin {
@@ -57,7 +55,7 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
TaskLocationHint oneToOneLocationHints[];
int numOneToOneEdges;
int numConfiguredSources;
- Multimap<String, Integer> pendingCompletions = LinkedListMultimap.create();
+ List<TaskAttemptIdentifier> pendingCompletions = Lists.newLinkedList();
AtomicBoolean configured;
AtomicBoolean started;
@@ -144,10 +142,8 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
private void trySchedulingPendingCompletions() {
if (readyToSchedule() && !pendingCompletions.isEmpty()) {
- for (Map.Entry<String, Collection<Integer>> entry : pendingCompletions.asMap().entrySet()) {
- for (Integer i : entry.getValue()) {
- onSourceTaskCompleted(entry.getKey(), i);
- }
+ for (TaskAttemptIdentifier attempt : pendingCompletions) {
+ onSourceTaskCompleted(attempt);
}
}
}
@@ -180,10 +176,10 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
}
@Override
- public synchronized void onVertexStarted(Map<String, List<Integer>> completions) {
- for (Map.Entry<String, List<Integer>> entry : completions.entrySet()) {
- pendingCompletions.putAll(entry.getKey(), entry.getValue());
- }
+ public synchronized void onVertexStarted(List<TaskAttemptIdentifier> completions) {
+ if (completions != null) {
+ pendingCompletions.addAll(completions);
+ }
// allow scheduling
started.set(true);
@@ -192,12 +188,14 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
}
@Override
- public synchronized void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
+ public synchronized void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
+ String srcVertexName = attempt.getTaskIdentifier().getVertexIdentifier().getName();
+ int taskId = attempt.getTaskIdentifier().getIdentifier();
if (readyToSchedule()) {
// configured and started. try to schedule
handleSourceTaskFinished(srcVertexName, taskId);
} else {
- pendingCompletions.put(srcVertexName, taskId);
+ pendingCompletions.add(attempt);
}
}
@@ -245,7 +243,7 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
}
// all source vertices will full dependencies are done
- List<TaskWithLocationHint> tasksToStart = null;
+ List<ScheduleTaskRequest> tasksToStart = null;
if (numOneToOneEdges == 0) {
// no 1-1 dependency. Start all tasks
int numTasks = taskIsStarted.length;
@@ -253,7 +251,7 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
tasksToStart = Lists.newArrayListWithCapacity(numTasks);
for (int i=0; i<numTasks; ++i) {
taskIsStarted[i] = true;
- tasksToStart.add(new TaskWithLocationHint(Integer.valueOf(i), null));
+ tasksToStart.add(ScheduleTaskRequest.create(i, null));
}
} else {
// start only the ready 1-1 tasks
@@ -268,13 +266,13 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
LOG.info("Starting task " + i + " for vertex: "
+ getContext().getVertexName() + " with location: "
+ ((locationHint != null) ? locationHint.getAffinitizedTask() : "null"));
- tasksToStart.add(new TaskWithLocationHint(Integer.valueOf(i), locationHint));
+ tasksToStart.add(ScheduleTaskRequest.create(Integer.valueOf(i), locationHint));
}
}
}
if (tasksToStart != null && !tasksToStart.isEmpty()) {
- getContext().scheduleVertexTasks(tasksToStart);
+ getContext().scheduleTasks(tasksToStart);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index 6c3e3f8..d9c4941 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -24,6 +24,7 @@ import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
@@ -46,11 +47,13 @@ import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
-import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.apache.tez.runtime.api.TaskIdentifier;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
@@ -72,6 +75,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -143,6 +147,8 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
int totalTasksToSchedule = 0;
private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false);
+ private Set<TaskIdentifier> taskWithVmEvents = Sets.newHashSet();
+
//Track source vertex and its finished tasks
private final Map<String, SourceVertexInfo> srcVertexInfo = Maps.newConcurrentMap();
boolean sourceVerticesScheduled = false;
@@ -469,7 +475,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
@Override
- public synchronized void onVertexStarted(Map<String, List<Integer>> completions) {
+ public synchronized void onVertexStarted(List<TaskAttemptIdentifier> completions) {
// examine edges after vertex started because until then these may not have been defined
Map<String, EdgeProperty> inputs = getContext().getInputVertexEdgeProperties();
for(Map.Entry<String, EdgeProperty> entry : inputs.entrySet()) {
@@ -498,10 +504,8 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
totalTasksToSchedule + " pending tasks");
if (completions != null) {
- for (Map.Entry<String, List<Integer>> entry : completions.entrySet()) {
- for (Integer taskId : entry.getValue()) {
- onSourceTaskCompleted(entry.getKey(), taskId);
- }
+ for (TaskAttemptIdentifier attempt : completions) {
+ onSourceTaskCompleted(attempt);
}
}
onVertexStartedDone.set(true);
@@ -511,7 +515,9 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
@Override
- public synchronized void onSourceTaskCompleted(String srcVertexName, Integer srcTaskId) {
+ public synchronized void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
+ String srcVertexName = attempt.getTaskIdentifier().getVertexIdentifier().getName();
+ int srcTaskId = attempt.getTaskIdentifier().getIdentifier();
updateSourceTaskCount();
SourceVertexInfo srcInfo = srcVertexInfo.get(srcVertexName);
@@ -550,7 +556,14 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
@Override
public synchronized void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
- // TODO handle duplicates from retries
+ // currently events from multiple attempts of the same task can be ignored because
+ // their output will be the same. However, with pipelined events that may not hold.
+ TaskIdentifier producerTask = vmEvent.getProducerAttemptIdentifier().getTaskIdentifier();
+ if (!taskWithVmEvents.add(producerTask)) {
+ LOG.info("Ignoring vertex manager event from: " + producerTask);
+ return;
+ }
+
numVertexManagerEventsReceived++;
long sourceTaskOutputSize = 0;
@@ -758,16 +771,16 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
}
//Sort in case partition stats are available
sortPendingTasksBasedOnDataSize();
- List<TaskWithLocationHint> scheduledTasks = Lists.newArrayListWithCapacity(numTasksToSchedule);
+ List<ScheduleTaskRequest> scheduledTasks = Lists.newArrayListWithCapacity(numTasksToSchedule);
while(!pendingTasks.isEmpty() && numTasksToSchedule > 0) {
numTasksToSchedule--;
Integer taskIndex = pendingTasks.get(0).index;
- scheduledTasks.add(new TaskWithLocationHint(taskIndex, null));
+ scheduledTasks.add(ScheduleTaskRequest.create(taskIndex, null));
pendingTasks.remove(0);
}
- getContext().scheduleVertexTasks(scheduledTasks);
+ getContext().scheduleTasks(scheduledTasks);
if (pendingTasks.size() == 0) {
// done scheduling all tasks
// TODO TEZ-1714 locking issues. getContext().vertexManagerDone();