You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/09/27 22:00:12 UTC
git commit: TEZ-481. Fix Reduce auto-parallelism after TEZ-398 (bikas)
Updated Branches:
refs/heads/master 034ca0a95 -> 04d5e3771
TEZ-481. Fix Reduce auto-parallelism after TEZ-398 (bikas)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/04d5e377
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/04d5e377
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/04d5e377
Branch: refs/heads/master
Commit: 04d5e3771587009e9a6f0e22cd1582e9eda78ef4
Parents: 034ca0a
Author: Bikas Saha <bi...@apache.org>
Authored: Fri Sep 27 12:58:14 2013 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Fri Sep 27 12:58:14 2013 -0700
----------------------------------------------------------------------
.../org/apache/tez/common/TezJobConfig.java | 6 --
.../api/events/InputInformationEvent.java | 41 ----------
.../runtime/api/events/VertexManagerEvent.java | 61 +++++++++++++++
tez-api/src/main/proto/Events.proto | 7 +-
.../apache/tez/dag/app/dag/VertexScheduler.java | 2 +
.../app/dag/event/DAGEventVertexCompleted.java | 4 +-
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 11 ++-
.../dag/impl/ImmediateStartVertexScheduler.java | 5 ++
.../dag/app/dag/impl/ShuffleVertexManager.java | 77 ++++++++++---------
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 29 ++++++-
.../tez/dag/app/dag/impl/TestDAGImpl.java | 11 ++-
.../dag/app/dag/impl/TestVertexScheduler.java | 27 ++++---
.../apache/tez/runtime/api/impl/EventType.java | 4 +-
.../apache/tez/runtime/api/impl/TezEvent.java | 33 ++++----
.../library/common/shuffle/impl/Fetcher.java | 20 +++--
.../library/common/shuffle/impl/Shuffle.java | 31 +-------
.../shuffle/impl/ShuffleInputEventHandler.java | 26 +------
.../common/shuffle/impl/ShuffleScheduler.java | 80 +++++++-------------
.../library/output/LocalOnFileSorterOutput.java | 2 +-
.../library/output/OnFileSortedOutput.java | 18 ++++-
.../src/main/proto/ShufflePayloads.proto | 4 +
21 files changed, 255 insertions(+), 244 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/04d5e377/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
index ace87ca..07602ea 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
@@ -171,12 +171,6 @@ public class TezJobConfig {
"tez.runtime.shuffle.use.in-memory";
public static final boolean DEFAULT_TEZ_RUNTIME_SHUFFLE_USE_IN_MEMORY = false;
- // TODO NEWTEZ Remove these config parameters. Will be part of an event.
- @Private
- public static final String TEZ_RUNTIME_SHUFFLE_PARTITION_RANGE =
- "tez.runtime.shuffle.partition-range";
- public static int TEZ_RUNTIME_SHUFFLE_PARTITION_RANGE_DEFAULT = 1;
-
/**
*
*/
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/04d5e377/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInformationEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInformationEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInformationEvent.java
deleted file mode 100644
index 0322b75..0000000
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInformationEvent.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.api.events;
-
-import org.apache.tez.runtime.api.Event;
-
-/**
- * Event used to send user specific data from the user
- * code in the AM to the task input
- */
-public class InputInformationEvent extends Event {
-
- /**
- * User Payload for this Event
- */
- private final byte[] userPayload;
- public InputInformationEvent(byte[] userPayload) {
- this.userPayload = userPayload;
- }
-
- public byte[] getUserPayload() {
- return userPayload;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/04d5e377/tez-api/src/main/java/org/apache/tez/runtime/api/events/VertexManagerEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/VertexManagerEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/VertexManagerEvent.java
new file mode 100644
index 0000000..84431bc
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/VertexManagerEvent.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.events;
+
+import org.apache.tez.runtime.api.Event;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Event used to send information from a Task to the VertexManager for a vertex.
+ * This may be used to send statistics like samples etc to the VertexManager for
+ * automatic plan recofigurations based on observed statistics
+ */
+public class VertexManagerEvent extends Event {
+
+ /**
+ * Vertex to which the event should be sent
+ */
+ private final String targetVertexName;
+
+ /**
+ * User payload to be sent
+ */
+ private final byte[] userPayload;
+
+ /**
+ * Create a new VertexManagerEvent
+ * @param vertexName
+ * @param userPayload This should not be modified since a reference is kept
+ */
+ public VertexManagerEvent(String vertexName, byte[] userPayload) {
+ Preconditions.checkArgument(vertexName != null);
+ Preconditions.checkArgument(userPayload != null);
+ this.targetVertexName = vertexName;
+ this.userPayload = userPayload;
+ }
+
+ public String getTargetVertexName() {
+ return targetVertexName;
+ }
+
+ public byte[] getUserPayload() {
+ return userPayload;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/04d5e377/tez-api/src/main/proto/Events.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/Events.proto b/tez-api/src/main/proto/Events.proto
index b91125d..d76708a 100644
--- a/tez-api/src/main/proto/Events.proto
+++ b/tez-api/src/main/proto/Events.proto
@@ -39,6 +39,7 @@ message InputFailedEventProto {
optional int32 version = 4;
}
-message InputInformationEventProto {
- optional bytes user_payload = 1;
-}
+message VertexManagerEventProto {
+ optional string target_vertex_name = 1;
+ optional bytes user_payload = 2;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/04d5e377/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java
index 3789702..4656efd 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java
@@ -20,10 +20,12 @@ package org.apache.tez.dag.app.dag;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
// Rename to VertexManager TEZ-364 and move to DAG API. Make abstract class.
public interface VertexScheduler {
void initialize(Configuration conf);
void onVertexStarted();
void onSourceTaskCompleted(TezTaskAttemptID attemptId);
+ void onVertexManagerEventReceived(VertexManagerEvent vmEvent);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/04d5e377/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventVertexCompleted.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventVertexCompleted.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventVertexCompleted.java
index 80c7f42..3d4367d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventVertexCompleted.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventVertexCompleted.java
@@ -23,8 +23,8 @@ import org.apache.tez.dag.records.TezVertexID;
public class DAGEventVertexCompleted extends DAGEvent {
- private TezVertexID vertexId;
- private VertexState vertexState;
+ private final TezVertexID vertexId;
+ private final VertexState vertexState;
public DAGEventVertexCompleted(TezVertexID vertexId, VertexState vertexState) {
super(vertexId.getDAGId(), DAGEventType.DAG_VERTEX_COMPLETED);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/04d5e377/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index b4c9981..c81b835 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -24,8 +24,10 @@ import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -136,6 +138,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
private TezCounters dagCounters = new TezCounters();
private Object fullCountersLock = new Object();
private TezCounters fullCounters = null;
+ private Set<TezVertexID> reRunningVertices = new HashSet<TezVertexID>();
public final Configuration conf;
private final DAGPlan jobPlan;
@@ -1132,8 +1135,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
Vertex vertex = job.vertices.get(vertexEvent.getVertexId());
job.numCompletedVertices++;
if (vertexEvent.getVertexState() == VertexState.SUCCEEDED) {
+ if (!job.reRunningVertices.contains(vertex.getVertexId())) {
+ // vertex succeeded for the first time
+ job.dagScheduler.vertexCompleted(vertex);
+ }
job.vertexSucceeded(vertex);
- job.dagScheduler.vertexCompleted(vertex);
}
else if (vertexEvent.getVertexState() == VertexState.FAILED) {
job.enactKill(DAGTerminationCause.VERTEX_FAILURE, VertexTerminationCause.OTHER_VERTEX_FAILURE);
@@ -1144,6 +1150,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
job.vertexKilled(vertex);
forceTransitionToKillWait = true;
}
+
+ job.reRunningVertices.remove(vertex.getVertexId());
LOG.info("Vertex " + vertex.getVertexId() + " completed."
+ ", numCompletedVertices=" + job.numCompletedVertices
@@ -1190,6 +1198,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
}
private void vertexReRunning(Vertex vertex) {
+ reRunningVertices.add(vertex.getVertexId());
numSuccessfulVertices--;
addDiagnostic("Vertex re-running " + vertex.getVertexId());
// TODO: Metrics
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/04d5e377/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexScheduler.java
index b79a426..2d94006 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexScheduler.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.VertexScheduler;
import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
/**
* Starts all tasks immediately on vertex start
@@ -46,4 +47,8 @@ public class ImmediateStartVertexScheduler implements VertexScheduler {
public void initialize(Configuration conf) {
}
+ @Override
+ public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/04d5e377/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
index e039c72..d633d0c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
@@ -18,7 +18,6 @@
package org.apache.tez.dag.app.dag.impl;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -29,7 +28,6 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.TezJobConfig;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
@@ -39,10 +37,13 @@ import org.apache.tez.dag.app.dag.VertexScheduler;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
+
+import com.google.protobuf.InvalidProtocolBufferException;
/**
* Starts scheduling tasks when number of completed source tasks crosses
@@ -64,6 +65,7 @@ public class ShuffleVertexManager implements VertexScheduler {
int numSourceTasks = 0;
int numSourceTasksCompleted = 0;
+ int numVertexManagerEventsReceived = 0;
ArrayList<TezTaskID> pendingTasks;
int totalTasksToSchedule = 0;
HashMap<TezVertexID, Vertex> bipartiteSources =
@@ -196,21 +198,34 @@ public class ShuffleVertexManager implements VertexScheduler {
if (completedSourceTasks.add(srcTaskId)) {
// source task has completed
++numSourceTasksCompleted;
- if (enableAutoParallelism) {
- // save output size
- // TODO TEZ-481
- long sourceTaskOutputSize = 100000000l;//sourceTaskAttempt.getDataSize();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Source task: " + srcAttemptId
- + " finished with output size: " + sourceTaskOutputSize);
- }
- completedSourceTasksOutputSize += sourceTaskOutputSize;
- }
}
schedulePendingTasks();
}
}
+ @Override
+ public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
+ // TODO handle duplicates from retries
+ if (enableAutoParallelism) {
+ // save output size
+ VertexManagerEventPayloadProto proto;
+ try {
+ proto = VertexManagerEventPayloadProto.parseFrom(vmEvent.getUserPayload());
+ } catch (InvalidProtocolBufferException e) {
+ throw new TezUncheckedException(e);
+ }
+ long sourceTaskOutputSize = proto.getOutputSize();
+ numVertexManagerEventsReceived++;
+ completedSourceTasksOutputSize += sourceTaskOutputSize;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received info of output size: " + sourceTaskOutputSize
+ + " numInfoReceived: " + numVertexManagerEventsReceived
+ + " total output size: " + completedSourceTasksOutputSize);
+ }
+ }
+
+ }
+
void updatePendingTasks() {
pendingTasks.clear();
pendingTasks.addAll(managedVertex.getTasks().keySet());
@@ -230,9 +245,14 @@ public class ShuffleVertexManager implements VertexScheduler {
if(numSourceTasksCompleted == 0) {
return;
}
+
+ if(numVertexManagerEventsReceived == 0) {
+ return;
+ }
+
int currentParallelism = pendingTasks.size();
long expectedTotalSourceTasksOutputSize =
- (numSourceTasks*completedSourceTasksOutputSize)/numSourceTasksCompleted;
+ (numSourceTasks*completedSourceTasksOutputSize)/numVertexManagerEventsReceived;
int desiredTaskParallelism =
(int)(expectedTotalSourceTasksOutputSize/desiredTaskInputDataSize);
if(desiredTaskParallelism < minTaskParallelism) {
@@ -256,42 +276,25 @@ public class ShuffleVertexManager implements VertexScheduler {
int finalTaskParallelism = (remainderRangeForLastShuffler > 0) ?
(numShufflersWithBaseRange + 1) : (numShufflersWithBaseRange);
-
+
if(finalTaskParallelism < currentParallelism) {
// final parallelism is less than actual parallelism
LOG.info("Reducing parallelism for vertex: " + managedVertex.getVertexId()
+ " to " + finalTaskParallelism + " from " + pendingTasks.size()
+ " . Expected output: " + expectedTotalSourceTasksOutputSize
+ " based on actual output: " + completedSourceTasksOutputSize
- + " from " + numSourceTasksCompleted + " completed source tasks. "
+ + " from " + numVertexManagerEventsReceived + " vertex manager events. "
+ " desiredTaskInputSize: " + desiredTaskInputDataSize);
-
- List<byte[]> taskConfs = new ArrayList<byte[]>(finalTaskParallelism);
- try {
- Configuration taskConf = new Configuration(false);
- taskConf.setInt(TezJobConfig.TEZ_RUNTIME_SHUFFLE_PARTITION_RANGE,
- basePartitionRange);
- // create event user payload to inform the task
- for (int i = 0; i < numShufflersWithBaseRange; ++i) {
- taskConfs.add(MRHelpers.createUserPayloadFromConf(taskConf));
- }
- if(finalTaskParallelism > numShufflersWithBaseRange) {
- taskConf.setInt(TezJobConfig.TEZ_RUNTIME_SHUFFLE_PARTITION_RANGE,
- remainderRangeForLastShuffler);
- taskConfs.add(MRHelpers.createUserPayloadFromConf(taskConf));
- }
- } catch (IOException e) {
- throw new TezUncheckedException(e);
- }
Map<Vertex, EdgeManager> edgeManagers = new HashMap<Vertex, EdgeManager>(
bipartiteSources.size());
for(Vertex vertex : bipartiteSources.values()) {
// use currentParallelism for numSourceTasks to maintain original state
// for the source tasks
- edgeManagers.put(vertex, new CustomShuffleEdgeManager(currentParallelism,
- finalTaskParallelism, basePartitionRange,
- remainderRangeForLastShuffler));
+ edgeManagers.put(vertex, new CustomShuffleEdgeManager(
+ currentParallelism, finalTaskParallelism, basePartitionRange,
+ ((remainderRangeForLastShuffler > 0) ?
+ remainderRangeForLastShuffler : basePartitionRange)));
}
managedVertex.setParallelism(finalTaskParallelism, edgeManagers);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/04d5e377/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index bf10322..96f0785 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -114,6 +114,7 @@ import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
@@ -211,6 +212,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
.addTransition(VertexState.INITED, VertexState.RUNNING,
VertexEventType.V_START,
new StartTransition())
+ .addTransition(VertexState.INITED,
+ VertexState.INITED, VertexEventType.V_ROUTE_EVENT,
+ ROUTE_EVENT_TRANSITION)
+
.addTransition(VertexState.INITED, VertexState.KILLED,
VertexEventType.V_TERMINATE,
@@ -260,6 +265,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// Ignore-able events
.addTransition(VertexState.TERMINATING, VertexState.TERMINATING,
EnumSet.of(VertexEventType.V_TERMINATE,
+ VertexEventType.V_ROUTE_EVENT,
VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
VertexEventType.V_TASK_ATTEMPT_COMPLETED,
VertexEventType.V_TASK_RESCHEDULED))
@@ -278,6 +284,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
.addTransition(VertexState.SUCCEEDED, VertexState.SUCCEEDED,
EnumSet.of(VertexEventType.V_TERMINATE,
VertexEventType.V_TASK_ATTEMPT_COMPLETED,
+ VertexEventType.V_ROUTE_EVENT,
VertexEventType.V_TASK_COMPLETED))
// Transitions from FAILED state
@@ -289,6 +296,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
.addTransition(VertexState.FAILED, VertexState.FAILED,
EnumSet.of(VertexEventType.V_TERMINATE,
VertexEventType.V_TASK_RESCHEDULED,
+ VertexEventType.V_ROUTE_EVENT,
VertexEventType.V_TASK_ATTEMPT_COMPLETED,
VertexEventType.V_TASK_COMPLETED))
@@ -302,6 +310,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EnumSet.of(VertexEventType.V_TERMINATE,
VertexEventType.V_SOURCE_VERTEX_STARTED,
VertexEventType.V_START,
+ VertexEventType.V_ROUTE_EVENT,
VertexEventType.V_TASK_RESCHEDULED,
VertexEventType.V_TASK_ATTEMPT_COMPLETED,
VertexEventType.V_TASK_COMPLETED))
@@ -313,6 +322,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EnumSet.of(VertexEventType.V_INIT,
VertexEventType.V_SOURCE_VERTEX_STARTED,
VertexEventType.V_START,
+ VertexEventType.V_ROUTE_EVENT,
VertexEventType.V_TERMINATE,
VertexEventType.V_TASK_COMPLETED,
VertexEventType.V_TASK_ATTEMPT_COMPLETED,
@@ -1391,10 +1401,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
LOG.info("Vertex: " + vertex.getName() + " routing event: "
+ tezEvent.getEventType());
EventMetaData sourceMeta = tezEvent.getSourceInfo();
- checkEventSourceMetadata(vertex, sourceMeta);
switch(tezEvent.getEventType()) {
case DATA_MOVEMENT_EVENT:
{
+ checkEventSourceMetadata(vertex, sourceMeta);
TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID();
DataMovementEvent dmEvent = (DataMovementEvent) tezEvent.getEvent();
dmEvent.setVersion(srcTaId.getId());
@@ -1403,8 +1413,21 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
destEdge.sendTezEventToDestinationTasks(tezEvent);
}
break;
+ case VERTEX_MANAGER_EVENT:
+ {
+ VertexManagerEvent vmEvent = (VertexManagerEvent) tezEvent.getEvent();
+ Vertex target = vertex.getDAG().getVertex(vmEvent.getTargetVertexName());
+ if (target == vertex) {
+ vertex.vertexScheduler.onVertexManagerEventReceived(vmEvent);
+ } else {
+ vertex.eventHandler.handle(new VertexEventRouteEvent(target
+ .getVertexId(), Collections.singletonList(tezEvent)));
+ }
+ }
+ break;
case INPUT_FAILED_EVENT:
{
+ checkEventSourceMetadata(vertex, sourceMeta);
TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID();
InputFailedEvent ifEvent = (InputFailedEvent) tezEvent.getEvent();
ifEvent.setVersion(srcTaId.getId());
@@ -1415,6 +1438,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
break;
case INPUT_READ_ERROR_EVENT:
{
+ checkEventSourceMetadata(vertex, sourceMeta);
Edge srcEdge = vertex.sourceVertices.get(vertex.getDAG().getVertex(
sourceMeta.getEdgeVertexName()));
srcEdge.sendTezEventToSourceTasks(tezEvent);
@@ -1422,6 +1446,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
break;
case TASK_STATUS_UPDATE_EVENT:
{
+ checkEventSourceMetadata(vertex, sourceMeta);
TaskStatusUpdateEvent sEvent =
(TaskStatusUpdateEvent) tezEvent.getEvent();
vertex.getEventHandler().handle(
@@ -1431,6 +1456,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
break;
case TASK_ATTEMPT_COMPLETED_EVENT:
{
+ checkEventSourceMetadata(vertex, sourceMeta);
vertex.getEventHandler().handle(
new TaskAttemptEvent(sourceMeta.getTaskAttemptID(),
TaskAttemptEventType.TA_DONE));
@@ -1438,6 +1464,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
break;
case TASK_ATTEMPT_FAILED_EVENT:
{
+ checkEventSourceMetadata(vertex, sourceMeta);
TaskAttemptFailedEvent taskFailedEvent =
(TaskAttemptFailedEvent) tezEvent.getEvent();
vertex.getEventHandler().handle(
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/04d5e377/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 1a36c26..55b846d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -18,8 +18,7 @@
package org.apache.tez.dag.app.dag.impl;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -46,6 +45,7 @@ import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
+import org.apache.tez.dag.app.dag.DAGScheduler;
import org.apache.tez.dag.app.dag.DAGTerminationCause;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.Task;
@@ -526,7 +526,7 @@ public class TestDAGImpl {
dispatcher.init(conf);
dispatcher.start();
}
-
+
@After
public void teardown() {
dispatcher.await();
@@ -606,6 +606,7 @@ public class TestDAGImpl {
@Test
public void testVertexReRunning() {
initDAG(dag);
+ dag.dagScheduler = mock(DAGScheduler.class);
startDAG(dag);
dispatcher.await();
@@ -620,6 +621,7 @@ public class TestDAGImpl {
Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
Assert.assertEquals(1, dag.getSuccessfulVertices());
Assert.assertEquals(1, dag.numCompletedVertices);
+ verify(dag.dagScheduler, times(1)).vertexCompleted(v);
dispatcher.getEventHandler().handle(
new VertexEventTaskReschedule(new TezTaskID(vId, 0)));
@@ -635,6 +637,9 @@ public class TestDAGImpl {
Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
Assert.assertEquals(1, dag.getSuccessfulVertices());
Assert.assertEquals(1, dag.numCompletedVertices);
+
+ // re-completion is not notified again
+ verify(dag.dagScheduler, times(1)).vertexCompleted(v);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/04d5e377/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
index b2e13e2..528d03b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
-import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
@@ -40,9 +39,9 @@ import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
import org.junit.Assert;
-import org.junit.Ignore;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -53,7 +52,6 @@ public class TestVertexScheduler {
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test(timeout = 5000)
- @Ignore // TODO TEZ-481
public void testShuffleVertexManagerAutoParallelism() throws IOException {
Configuration conf = new Configuration();
conf.setBoolean(
@@ -98,7 +96,6 @@ public class TestVertexScheduler {
when(mockManagedVertex.getVertexId()).thenReturn(mockManagedVertexId);
when(mockManagedVertex.getInputVertices()).thenReturn(mockInputVertices);
-
mockInputVertices.put(mockSrcVertex1, new Edge(eProp1, mockEventHandler));
mockInputVertices.put(mockSrcVertex2, new Edge(eProp2, mockEventHandler));
mockInputVertices.put(mockSrcVertex3, new Edge(eProp3, mockEventHandler));
@@ -162,27 +159,29 @@ public class TestVertexScheduler {
TezTaskAttemptID mockSrcAttemptId31 =
new TezTaskAttemptID(new TezTaskID(mockSrcVertexId3, 0), 0);
+ byte[] payload =
+ VertexManagerEventPayloadProto.newBuilder().setOutputSize(5000L).build().toByteArray();
+ VertexManagerEvent vmEvent = new VertexManagerEvent("Vertex", payload);
// parallelism not change due to large data size
- //when(mockEvent.getDataSize()).thenReturn(5000L);
scheduler = createScheduler(conf, mockManagedVertex, 0.1f, 0.1f);
scheduler.onVertexStarted();
Assert.assertTrue(scheduler.pendingTasks.size() == 4); // no tasks scheduled
Assert.assertTrue(scheduler.numSourceTasks == 4);
+ scheduler.onVertexManagerEventReceived(vmEvent);
scheduler.onSourceTaskCompleted(mockSrcAttemptId11);
// managedVertex tasks reduced
verify(mockManagedVertex, times(0)).setParallelism(anyInt(), anyMap());
Assert.assertEquals(0, scheduler.pendingTasks.size()); // all tasks scheduled
Assert.assertEquals(4, scheduledTasks.size());
- Assert.assertEquals(1, scheduler.numSourceTasksCompleted);
+ Assert.assertEquals(1, scheduler.numSourceTasksCompleted); // TODO
Assert.assertEquals(5000L, scheduler.completedSourceTasksOutputSize);
+
// parallelism changed due to small data size
- //when(mockEvent.getDataSize()).thenReturn(500L);
scheduledTasks.clear();
- Configuration procConf = new Configuration();
- ProcessorDescriptor procDesc = new ProcessorDescriptor("REDUCE");
- procDesc.setUserPayload(MRHelpers.createUserPayloadFromConf(procConf));
- when(mockManagedVertex.getProcessorDescriptor()).thenReturn(procDesc);
+ payload =
+ VertexManagerEventPayloadProto.newBuilder().setOutputSize(500L).build().toByteArray();
+ vmEvent = new VertexManagerEvent("Vertex", payload);
scheduler = createScheduler(conf, mockManagedVertex, 0.5f, 0.5f);
scheduler.onVertexStarted();
@@ -193,10 +192,12 @@ public class TestVertexScheduler {
Assert.assertEquals(4, scheduler.pendingTasks.size()); // no tasks scheduled
Assert.assertEquals(4, scheduler.numSourceTasks);
Assert.assertEquals(0, scheduler.numSourceTasksCompleted);
+ scheduler.onVertexManagerEventReceived(vmEvent);
scheduler.onSourceTaskCompleted(mockSrcAttemptId11);
Assert.assertEquals(4, scheduler.pendingTasks.size());
Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
Assert.assertEquals(1, scheduler.numSourceTasksCompleted);
+ Assert.assertEquals(1, scheduler.numVertexManagerEventsReceived);
Assert.assertEquals(500L, scheduler.completedSourceTasksOutputSize);
// ignore duplicate completion
scheduler.onSourceTaskCompleted(mockSrcAttemptId11);
@@ -205,6 +206,7 @@ public class TestVertexScheduler {
Assert.assertEquals(1, scheduler.numSourceTasksCompleted);
Assert.assertEquals(500L, scheduler.completedSourceTasksOutputSize);
+ scheduler.onVertexManagerEventReceived(vmEvent);
scheduler.onSourceTaskCompleted(mockSrcAttemptId12);
// managedVertex tasks reduced
verify(mockManagedVertex).setParallelism(eq(2), anyMap());
@@ -215,6 +217,7 @@ public class TestVertexScheduler {
Assert.assertTrue(scheduledTasks.contains(mockTaskId1));
Assert.assertTrue(scheduledTasks.contains(mockTaskId2));
Assert.assertEquals(2, scheduler.numSourceTasksCompleted);
+ Assert.assertEquals(2, scheduler.numVertexManagerEventsReceived);
Assert.assertEquals(1000L, scheduler.completedSourceTasksOutputSize);
// more completions dont cause recalculation of parallelism
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/04d5e377/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java
index 81ff5fc..b42096c 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java
@@ -24,6 +24,6 @@ public enum EventType {
DATA_MOVEMENT_EVENT,
INPUT_READ_ERROR_EVENT,
INPUT_FAILED_EVENT,
- INTPUT_INFORMATION_EVENT,
- TASK_STATUS_UPDATE_EVENT
+ TASK_STATUS_UPDATE_EVENT,
+ VERTEX_MANAGER_EVENT
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/04d5e377/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
index e195cf9..03e41f4 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
@@ -28,14 +28,14 @@ import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.EventProtos.DataMovementEventProto;
import org.apache.tez.runtime.api.events.EventProtos.InputFailedEventProto;
-import org.apache.tez.runtime.api.events.EventProtos.InputInformationEventProto;
import org.apache.tez.runtime.api.events.EventProtos.InputReadErrorEventProto;
+import org.apache.tez.runtime.api.events.EventProtos.VertexManagerEventProto;
import org.apache.tez.runtime.api.events.InputFailedEvent;
-import org.apache.tez.runtime.api.events.InputInformationEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.internals.api.events.SystemEventProtos.TaskAttemptCompletedEventProto;
import org.apache.tez.runtime.internals.api.events.SystemEventProtos.TaskAttemptFailedEventProto;
@@ -59,14 +59,14 @@ public class TezEvent implements Writable {
this.setSourceInfo(sourceInfo);
if (event instanceof DataMovementEvent) {
eventType = EventType.DATA_MOVEMENT_EVENT;
+ } else if (event instanceof VertexManagerEvent) {
+ eventType = EventType.VERTEX_MANAGER_EVENT;
} else if (event instanceof InputReadErrorEvent) {
eventType = EventType.INPUT_READ_ERROR_EVENT;
} else if (event instanceof TaskAttemptFailedEvent) {
eventType = EventType.TASK_ATTEMPT_FAILED_EVENT;
} else if (event instanceof TaskAttemptCompletedEvent) {
eventType = EventType.TASK_ATTEMPT_COMPLETED_EVENT;
- } else if (event instanceof InputInformationEvent) {
- eventType = EventType.INTPUT_INFORMATION_EVENT;
} else if (event instanceof InputFailedEvent) {
eventType = EventType.INPUT_FAILED_EVENT;
} else if (event instanceof TaskStatusUpdateEvent) {
@@ -123,6 +123,13 @@ public class TezEvent implements Writable {
.setUserPayload(ByteString.copyFrom(dmEvt.getUserPayload()))
.build().toByteArray();
break;
+ case VERTEX_MANAGER_EVENT:
+ VertexManagerEvent vmEvt = (VertexManagerEvent) event;
+ eventBytes = VertexManagerEventProto.newBuilder()
+ .setTargetVertexName(vmEvt.getTargetVertexName())
+ .setUserPayload(ByteString.copyFrom(vmEvt.getUserPayload()))
+ .build().toByteArray();
+ break;
case INPUT_READ_ERROR_EVENT:
InputReadErrorEvent ideEvt = (InputReadErrorEvent) event;
eventBytes = InputReadErrorEventProto.newBuilder()
@@ -146,11 +153,7 @@ public class TezEvent implements Writable {
.setSourceIndex(ifEvt.getSourceIndex())
.setTargetIndex(ifEvt.getTargetIndex())
.setVersion(ifEvt.getVersion()).build().toByteArray();
- case INTPUT_INFORMATION_EVENT:
- InputInformationEvent iEvt = (InputInformationEvent) event;
- eventBytes = InputInformationEventProto.newBuilder()
- .setUserPayload(ByteString.copyFrom(iEvt.getUserPayload()))
- .build().toByteArray();
+ break;
default:
throw new TezUncheckedException("Unknown TezEvent"
+ ", type=" + eventType);
@@ -182,6 +185,12 @@ public class TezEvent implements Writable {
dmProto.getTargetIndex(),
dmProto.getUserPayload().toByteArray());
break;
+ case VERTEX_MANAGER_EVENT:
+ VertexManagerEventProto vmProto =
+ VertexManagerEventProto.parseFrom(eventBytes);
+ event = new VertexManagerEvent(vmProto.getTargetVertexName(),
+ vmProto.getUserPayload().toByteArray());
+ break;
case INPUT_READ_ERROR_EVENT:
InputReadErrorEventProto ideProto =
InputReadErrorEventProto.parseFrom(eventBytes);
@@ -202,12 +211,6 @@ public class TezEvent implements Writable {
event = new InputFailedEvent(ifProto.getSourceIndex(),
ifProto.getTargetIndex(), ifProto.getVersion());
break;
- case INTPUT_INFORMATION_EVENT:
- InputInformationEventProto infoProto =
- InputInformationEventProto.parseFrom(eventBytes);
- event = new InputInformationEvent(
- infoProto.getUserPayload().toByteArray());
- break;
default:
throw new TezUncheckedException("Unknown TezEvent"
+ ", type=" + eventType);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/04d5e377/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
index f5d1802..7741122 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
@@ -359,8 +359,8 @@ class Fetcher extends Thread {
try {
ShuffleHeader header = new ShuffleHeader();
header.readFields(input);
- String pathComponent = header.mapId;
- srcAttemptId = scheduler.getIdentifierForPathComponent(pathComponent);
+ srcAttemptId =
+ scheduler.getIdentifierForFetchedOutput(header.mapId, header.forReduce);
compressedLength = header.compressedLength;
decompressedLength = header.uncompressedLength;
forReduce = header.forReduce;
@@ -456,15 +456,13 @@ class Fetcher extends Thread {
return false;
}
- int reduceStartId = shuffle.getReduceStartId();
- int reduceRange = shuffle.getReduceRange();
- if (forReduce < reduceStartId || forReduce >= reduceStartId+reduceRange) {
- wrongReduceErrs.increment(1);
- LOG.warn(getName() + " data for the wrong reduce map: " +
- srcAttemptId + " len: " + compressedLength + " decomp len: " +
- decompressedLength + " for reduce " + forReduce);
- return false;
- }
+// if (forReduce < reduceStartId || forReduce >= reduceStartId+reduceRange) {
+// wrongReduceErrs.increment(1);
+// LOG.warn(getName() + " data for the wrong reduce map: " +
+// srcAttemptId + " len: " + compressedLength + " decomp len: " +
+// decompressedLength + " for reduce " + forReduce);
+// return false;
+// }
// Sanity check
if (!remaining.contains(srcAttemptId)) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/04d5e377/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
index 8689d11..acf987a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
@@ -22,7 +22,6 @@ import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
-import java.util.concurrent.atomic.AtomicInteger;
import javax.crypto.SecretKey;
@@ -65,10 +64,7 @@ public class Shuffle implements ExceptionReporter {
private Throwable throwable = null;
private String throwingThreadName = null;
private final int numInputs;
- private final AtomicInteger reduceStartId;
private final SecretKey jobTokenSecret;
- private AtomicInteger reduceRange = new AtomicInteger(
- TezJobConfig.TEZ_RUNTIME_SHUFFLE_PARTITION_RANGE_DEFAULT);
private FutureTask<TezRawKeyValueIterator> runShuffleFuture;
@@ -105,9 +101,7 @@ public class Shuffle implements ExceptionReporter {
TezCounter mergedMapOutputsCounter =
inputContext.getCounters().findCounter(TaskCounter.MERGED_MAP_OUTPUTS);
- reduceStartId = new AtomicInteger(inputContext.getTaskIndex());
- LOG.info("Shuffle assigned reduce start id: " + reduceStartId.get()
- + " with default reduce range: " + reduceRange.get());
+ LOG.info("Shuffle assigned with " + numInputs + " inputs");
scheduler = new ShuffleScheduler(
this.inputContext,
@@ -119,7 +113,6 @@ public class Shuffle implements ExceptionReporter {
failedShuffleCounter);
eventHandler= new ShuffleInputEventHandler(
inputContext,
- this,
scheduler);
merger = new MergeManager(
this.conf,
@@ -233,14 +226,6 @@ public class Shuffle implements ExceptionReporter {
}
}
- public int getReduceStartId() {
- return reduceStartId.get();
- }
-
- public int getReduceRange() {
- return reduceRange.get();
- }
-
public synchronized void reportException(Throwable t) {
if (throwable == null) {
throwable = t;
@@ -261,18 +246,4 @@ public class Shuffle implements ExceptionReporter {
}
}
- public void setPartitionRange(int range) {
- if (range == reduceRange.get()) {
- return;
- }
- if (reduceRange.compareAndSet(
- TezJobConfig.TEZ_RUNTIME_SHUFFLE_PARTITION_RANGE_DEFAULT, range)) {
- LOG.info("Reduce range set to: " + range);
- } else {
- TezUncheckedException e =
- new TezUncheckedException("Reduce range can be set only once.");
- reportException(e);
- throw e;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/04d5e377/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
index 8b323b5..d731a46 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
@@ -28,12 +28,9 @@ import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.TezInputContext;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputFailedEvent;
-import org.apache.tez.runtime.api.events.InputInformationEvent;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
-import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.InputInformationEventPayloadProto;
-import com.google.common.base.Preconditions;
import com.google.protobuf.InvalidProtocolBufferException;
public class ShuffleInputEventHandler {
@@ -42,15 +39,12 @@ public class ShuffleInputEventHandler {
private final ShuffleScheduler scheduler;
private final TezInputContext inputContext;
- private final Shuffle shuffle;
private int maxMapRuntime = 0;
- private boolean shuffleRangeSet = false;
public ShuffleInputEventHandler(TezInputContext inputContext,
- Shuffle shuffle, ShuffleScheduler scheduler) {
+ ShuffleScheduler scheduler) {
this.inputContext = inputContext;
- this.shuffle = shuffle;
this.scheduler = scheduler;
}
@@ -62,28 +56,13 @@ public class ShuffleInputEventHandler {
private void handleEvent(Event event) {
- if (event instanceof InputInformationEvent) {
- processInputInformationEvent((InputInformationEvent) event);
- }
- else if (event instanceof DataMovementEvent) {
+ if (event instanceof DataMovementEvent) {
processDataMovementEvent((DataMovementEvent) event);
} else if (event instanceof InputFailedEvent) {
processTaskFailedEvent((InputFailedEvent) event);
}
}
- private void processInputInformationEvent(InputInformationEvent iiEvent) {
- InputInformationEventPayloadProto inputInfoPayload;
- try {
- inputInfoPayload = InputInformationEventPayloadProto.parseFrom(iiEvent.getUserPayload());
- } catch (InvalidProtocolBufferException e) {
- throw new TezUncheckedException("Unable to parse InputInformationEvent payload", e);
- }
- int partitionRange = inputInfoPayload.getPartitionRange();
- shuffle.setPartitionRange(partitionRange);
- this.shuffleRangeSet = true;
- }
-
private void processDataMovementEvent(DataMovementEvent dmEvent) {
// FIXME TODO NEWTEZ
// Preconditions.checkState(shuffleRangeSet == true, "Shuffle Range must be set before a DataMovementEvent is processed");
@@ -95,6 +74,7 @@ public class ShuffleInputEventHandler {
}
int partitionId = dmEvent.getSourceIndex();
URI baseUri = getBaseURI(shufflePayload.getHost(), shufflePayload.getPort(), partitionId);
+ LOG.info("Data movement event baseUri:" + baseUri);
InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent.getVersion(), shufflePayload.getPathComponent());
scheduler.addKnownMapOutput(shufflePayload.getHost(), partitionId, baseUri.toString(), srcAttemptIdentifier);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/04d5e377/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
index dcf8b6d..33da660 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
@@ -33,7 +33,6 @@ import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang.mutable.MutableInt;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -61,7 +60,7 @@ class ShuffleScheduler {
private static final float PENALTY_GROWTH_RATE = 1.3f;
// TODO NEWTEZ May need to be a string if attempting to fetch from multiple inputs.
- private final Map<Integer, MutableInt> finishedMaps;
+ private boolean[] finishedMaps;
private final int numInputs;
private int remainingMaps;
private Map<String, MapHost> mapLocations = new HashMap<String, MapHost>();
@@ -98,17 +97,16 @@ class ShuffleScheduler {
public ShuffleScheduler(TezInputContext inputContext,
Configuration conf,
- int tasksInDegree,
+ int numberOfInputs,
Shuffle shuffle,
TezCounter shuffledMapsCounter,
TezCounter reduceShuffleBytes,
TezCounter failedShuffleCounter) {
this.inputContext = inputContext;
- this.numInputs = tasksInDegree;
- abortFailureLimit = Math.max(30, tasksInDegree / 10);
- remainingMaps = tasksInDegree;
- //TODO NEWTEZ May need to be a string or a more usable construct if attempting to fetch from multiple inputs. Define a taskId / taskAttemptId pair
- finishedMaps = new HashMap<Integer, MutableInt>(remainingMaps);
+ this.numInputs = numberOfInputs;
+ abortFailureLimit = Math.max(30, numberOfInputs / 10);
+ remainingMaps = numberOfInputs;
+ finishedMaps = new boolean[remainingMaps]; // default init to false
this.shuffle = shuffle;
this.shuffledMapsCounter = shuffledMapsCounter;
this.reduceShuffleBytes = reduceShuffleBytes;
@@ -116,7 +114,7 @@ class ShuffleScheduler {
this.startTime = System.currentTimeMillis();
this.lastProgressTime = startTime;
referee.start();
- this.maxFailedUniqueFetches = Math.min(tasksInDegree,
+ this.maxFailedUniqueFetches = Math.min(numberOfInputs,
this.maxFailedUniqueFetches);
this.maxFetchFailuresBeforeReporting =
conf.getInt(
@@ -138,13 +136,12 @@ class ShuffleScheduler {
failureCounts.remove(taskIdentifier);
hostFailures.remove(host.getHostName());
- if (!isFinishedTaskTrue(srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex())) {
+ if (!isInputFinished(srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex())) {
output.commit();
- if(incrementTaskCopyAndCheckCompletion(srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex())) {
- shuffledMapsCounter.increment(1);
- if (--remainingMaps == 0) {
- notifyAll();
- }
+ setInputFinished(srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex());
+ shuffledMapsCounter.increment(1);
+ if (--remainingMaps == 0) {
+ notifyAll();
}
// update the status
@@ -288,16 +285,6 @@ class ShuffleScheduler {
}
- public synchronized void tipFailed(int srcTaskIndex) {
- if (!isFinishedTaskTrue(srcTaskIndex)) {
- setFinishedTaskTrue(srcTaskIndex);
- if (--remainingMaps == 0) {
- notifyAll();
- }
- logProgress();
- }
- }
-
public synchronized void addKnownMapOutput(String hostName,
int partitionId,
String hostUrl,
@@ -310,7 +297,8 @@ class ShuffleScheduler {
mapLocations.put(identifier, host);
}
host.addKnownMap(srcAttempt);
- pathToIdentifierMap.put(srcAttempt.getPathComponent(), srcAttempt);
+ pathToIdentifierMap.put(
+ getIdentifierFromPathAndReduceId(srcAttempt.getPathComponent(), partitionId), srcAttempt);
// Mark the host as pending
if (host.getState() == MapHost.State.PENDING) {
@@ -351,8 +339,9 @@ class ShuffleScheduler {
return host;
}
- public InputAttemptIdentifier getIdentifierForPathComponent(String pathComponent) {
- return pathToIdentifierMap.get(pathComponent);
+ public InputAttemptIdentifier getIdentifierForFetchedOutput(
+ String path, int reduceId) {
+ return pathToIdentifierMap.get(getIdentifierFromPathAndReduceId(path, reduceId));
}
public synchronized List<InputAttemptIdentifier> getMapsForHost(MapHost host) {
@@ -364,7 +353,7 @@ class ShuffleScheduler {
// find the maps that we still need, up to the limit
while (itr.hasNext()) {
InputAttemptIdentifier id = itr.next();
- if (!obsoleteMaps.contains(id) && !isFinishedTaskTrue(id.getInputIdentifier().getSrcTaskIndex())) {
+ if (!obsoleteMaps.contains(id) && !isInputFinished(id.getInputIdentifier().getSrcTaskIndex())) {
result.add(id);
if (++includedMaps >= MAX_MAPS_AT_ONCE) {
break;
@@ -374,7 +363,7 @@ class ShuffleScheduler {
// put back the maps left after the limit
while (itr.hasNext()) {
InputAttemptIdentifier id = itr.next();
- if (!obsoleteMaps.contains(id) && !isFinishedTaskTrue(id.getInputIdentifier().getSrcTaskIndex())) {
+ if (!obsoleteMaps.contains(id) && !isInputFinished(id.getInputIdentifier().getSrcTaskIndex())) {
host.addKnownMap(id);
}
}
@@ -448,6 +437,10 @@ class ShuffleScheduler {
}
+ private String getIdentifierFromPathAndReduceId(String path, int reduceId) {
+ return path + "_" + reduceId;
+ }
+
/**
* A thread that takes hosts off of the penalty list when the timer expires.
*/
@@ -488,34 +481,15 @@ class ShuffleScheduler {
}
}
- void setFinishedTaskTrue(int srcTaskIndex) {
+ void setInputFinished(int inputIndex) {
synchronized(finishedMaps) {
- finishedMaps.put(srcTaskIndex, new MutableInt(shuffle.getReduceRange()));
+ finishedMaps[inputIndex] = true;
}
}
- boolean incrementTaskCopyAndCheckCompletion(int srcTaskIndex) {
- synchronized(finishedMaps) {
- MutableInt result = finishedMaps.get(srcTaskIndex);
- if(result == null) {
- result = new MutableInt(0);
- finishedMaps.put(srcTaskIndex, result);
- }
- result.increment();
- return isFinishedTaskTrue(srcTaskIndex);
- }
- }
-
- boolean isFinishedTaskTrue(int srcTaskIndex) {
+ boolean isInputFinished(int inputIndex) {
synchronized (finishedMaps) {
- MutableInt result = finishedMaps.get(srcTaskIndex);
- if(result == null) {
- return false;
- }
- if (result.intValue() == shuffle.getReduceRange()) {
- return true;
- }
- return false;
+ return finishedMaps[inputIndex];
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/04d5e377/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java
index a19d5e1..c8a8233 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java
@@ -57,7 +57,7 @@ public class LocalOnFileSorterOutput extends OnFileSortedOutput {
}
@Override
- protected List<Event> generateDataMovementEventsOnClose() throws IOException {
+ protected List<Event> generateEventsOnClose() throws IOException {
return null;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/04d5e377/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
index 5415053..8acc258 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
@@ -26,15 +26,18 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.TezOutputContext;
import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.library.api.KeyValueWriter;
import org.apache.tez.runtime.library.common.sort.impl.ExternalSorter;
import org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter;
import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
import com.google.common.collect.Lists;
@@ -93,10 +96,10 @@ public class OnFileSortedOutput implements LogicalOutput {
sorter.close();
this.endTime = System.nanoTime();
- return generateDataMovementEventsOnClose();
+ return generateEventsOnClose();
}
- protected List<Event> generateDataMovementEventsOnClose() throws IOException {
+ protected List<Event> generateEventsOnClose() throws IOException {
String host = System.getenv(ApplicationConstants.Environment.NM_HOST
.toString());
ByteBuffer shuffleMetadata = outputContext
@@ -112,8 +115,17 @@ public class OnFileSortedOutput implements LogicalOutput {
DataMovementEventPayloadProto payloadProto = payloadBuilder.build();
byte[] payloadBytes = payloadProto.toByteArray();
- List<Event> events = Lists.newArrayListWithCapacity(numOutputs);
+ long outputSize = outputContext.getCounters()
+ .findCounter(TaskCounter.MAP_OUTPUT_BYTES).getValue();
+ VertexManagerEventPayloadProto.Builder vmBuilder = VertexManagerEventPayloadProto
+ .newBuilder();
+ vmBuilder.setOutputSize(outputSize);
+ VertexManagerEvent vmEvent = new VertexManagerEvent(
+ outputContext.getDestinationVertexName(), vmBuilder.build().toByteArray());
+ List<Event> events = Lists.newArrayListWithCapacity(numOutputs+1);
+ events.add(vmEvent);
+
for (int i = 0; i < numOutputs; i++) {
DataMovementEvent event = new DataMovementEvent(i, payloadBytes);
events.add(event);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/04d5e377/tez-runtime-library/src/main/proto/ShufflePayloads.proto
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/proto/ShufflePayloads.proto b/tez-runtime-library/src/main/proto/ShufflePayloads.proto
index 1fb000f..0396eec 100644
--- a/tez-runtime-library/src/main/proto/ShufflePayloads.proto
+++ b/tez-runtime-library/src/main/proto/ShufflePayloads.proto
@@ -31,3 +31,7 @@ message DataMovementEventPayloadProto {
message InputInformationEventPayloadProto {
optional int32 partition_range = 1;
}
+
+message VertexManagerEventPayloadProto {
+ optional int64 output_size = 1;
+}
\ No newline at end of file