You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/09/08 05:24:03 UTC
[1/2] git commit: TEZ-1447. Provide a mechanism for InputInitializers
to know about Vertex state changes. (sseth) (cherry picked from commit
9085e7bb4d3d9a5c1a91aac48048da8127cee17a)
Repository: tez
Updated Branches:
refs/heads/branch-0.5 ca86b6e36 -> d9ea43537
TEZ-1447. Provide a mechanism for InputInitializers to know about
Vertex state changes. (sseth)
(cherry picked from commit 9085e7bb4d3d9a5c1a91aac48048da8127cee17a)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/88acd71c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/88acd71c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/88acd71c
Branch: refs/heads/branch-0.5
Commit: 88acd71c49d02eabd746ffc3a554e10ccd65fc02
Parents: ca86b6e
Author: Siddharth Seth <ss...@apache.org>
Authored: Sun Sep 7 20:21:45 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Sun Sep 7 20:23:43 2014 -0700
----------------------------------------------------------------------
.../apache/tez/dag/api/event/VertexState.java | 51 ++++
.../tez/dag/api/event/VertexStateUpdate.java | 60 +++++
.../VertexStateUpdateParallelismUpdated.java | 50 ++++
.../tez/runtime/api/InputInitializer.java | 17 +-
.../runtime/api/InputInitializerContext.java | 19 +-
.../app/dag/RootInputInitializerManager.java | 59 ++++-
.../tez/dag/app/dag/StateChangeNotifier.java | 168 ++++++++++++
.../dag/app/dag/VertexStateUpdateListener.java | 30 +++
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 5 +-
.../TezRootInputInitializerContextImpl.java | 16 +-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 83 +++++-
.../tez/state/OnStateChangedCallback.java | 24 ++
.../org/apache/tez/state/StateMachineTez.java | 66 +++++
.../dag/app/dag/TestStateChangeNotifier.java | 263 +++++++++++++++++++
.../tez/dag/app/dag/impl/TestVertexImpl.java | 102 +++++--
.../common/TestMRInputSplitDistributor.java | 9 +
16 files changed, 991 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/88acd71c/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexState.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexState.java b/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexState.java
new file mode 100644
index 0000000..ab296a5
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexState.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api.event;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Vertex state information.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public enum VertexState {
+ /**
+ * Indicates that the Vertex had entered the SUCCEEDED state. A vertex could go back into RUNNING state after SUCCEEDING
+ */
+ SUCCEEDED,
+ /**
+ * Indicates that the Vertex had entered the RUNNING state. This state can be reached after SUCCEEDED, if some
+ * tasks belonging to the vertex are restarted due to errors
+ */
+ RUNNING,
+ /**
+ * Indicates that the Vertex has FAILED
+ */
+ FAILED,
+ /**
+ * Indicates that the Vertex has been KILLED
+ */
+ KILLED,
+ /**
+ * Indicates that the parallelism for the vertex had changed.
+ */
+ PARALLELISM_UPDATED
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/88acd71c/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexStateUpdate.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexStateUpdate.java b/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexStateUpdate.java
new file mode 100644
index 0000000..5b7ca40
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexStateUpdate.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api.event;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Updates that are sent to user code running within the AM, on Vertex state changes.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class VertexStateUpdate {
+
+ private final String vertexName;
+ private final VertexState vertexState;
+
+
+ public VertexStateUpdate(String vertexName, VertexState vertexState) {
+ this.vertexName = vertexName;
+ this.vertexState = vertexState;
+ }
+
+ /**
+ * Get the name of the vertex for which the state has changed
+ * @return the name of the vertex
+ */
+ public String getVertexName() {
+ return vertexName;
+ }
+
+ /**
+ * Get the updated state
+ * @return the updated state
+ */
+ public VertexState getVertexState() {
+ return vertexState;
+ }
+
+ @Override
+ public String toString() {
+ return "VertexStateUpdate: vertexName=" + vertexName + ", State=" + vertexState;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/88acd71c/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexStateUpdateParallelismUpdated.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexStateUpdateParallelismUpdated.java b/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexStateUpdateParallelismUpdated.java
new file mode 100644
index 0000000..54fa87c
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexStateUpdateParallelismUpdated.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api.event;
+
+/**
+ * An event that is sent out when the parallelism of a vertex changes.
+ */
+public class VertexStateUpdateParallelismUpdated extends VertexStateUpdate {
+
+ private final int parallelism;
+ private final int previousParallelism;
+ public VertexStateUpdateParallelismUpdated(String vertexName,
+ int updatedParallelism, int previousParallelism) {
+ super(vertexName, VertexState.PARALLELISM_UPDATED);
+ this.parallelism = updatedParallelism;
+ this.previousParallelism = previousParallelism;
+ }
+
+ /**
+ * Returns the new parallelism for the vertex
+ * @return the new parallelism
+ */
+ public int getParallelism() {
+ return parallelism;
+ }
+
+ /**
+ * Returns the previous value of the parallelism
+ * @return the previous parallelism
+ */
+ public int getPreviousParallelism() {
+ return previousParallelism;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/88acd71c/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java
index 09268d9..3ab5cdb 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.runtime.api.events.InputInitializerEvent;
/**
@@ -85,5 +86,19 @@ public abstract class InputInitializer {
public final InputInitializerContext getContext() {
return this.initializerContext;
}
-
+
+ /**
+ * Receive notifications on vertex state changes.
+ * <p/>
+ * State changes will be received based on the registration via {@link
+ * org.apache.tez.runtime.api.InputInitializerContext#registerForVertexStatusUpdates(String,
+ * java.util.Set)}. Notifications will be received for all registered state changes, and not just
+ * for the latest state update. They will be in order in which the state change occurred.
+ *
+ * @param stateUpdate an event indicating the name of the vertex, and it's updated state.
+ * Additional information may be available for specific events, Look at the
+ * type hierarchy for {@link org.apache.tez.dag.api.event.VertexStateUpdate}
+ */
+ public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/88acd71c/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializerContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializerContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializerContext.java
index e4e15ef..fe82b54 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializerContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializerContext.java
@@ -18,11 +18,17 @@
package org.apache.tez.runtime.api;
+import javax.annotation.Nullable;
+import java.util.EnumSet;
+import java.util.Set;
+
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
/**
* A context that provides information to the {@link InputInitializer}
@@ -98,6 +104,17 @@ public interface InputInitializerContext {
* @param vertexName
* @return Total number of tasks in this vertex
*/
- public int getVertexNumTasks(String vertexName);
+ int getVertexNumTasks(String vertexName);
+
+ /**
+ * Register to get notifications on updates to the specified vertex. Notifications will be sent
+ * via {@link org.apache.tez.runtime.api.InputInitializer#onVertexStateUpdated(org.apache.tez.dag.api.event.VertexStateUpdate)} </p>
+ *
+ * This method can only be invoked once. Duplicate invocations will result in an error.
+ *
+ * @param vertexName the vertex name for which notifications are required.
+ * @param stateSet the set of states for which notifications are required. null implies all
+ */
+ void registerForVertexStatusUpdates(String vertexName, @Nullable Set<VertexState> stateSet);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/88acd71c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
index f8e68bd..770761e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
@@ -18,10 +18,12 @@
package org.apache.tez.dag.app.dag;
+import javax.annotation.Nullable;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -39,6 +41,8 @@ import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.RootInputLeafOutput;
import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.event.*;
+import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
import org.apache.tez.dag.app.dag.event.VertexEventRootInputInitialized;
@@ -68,6 +72,7 @@ public class RootInputInitializerManager {
private final EventHandler eventHandler;
private volatile boolean isStopped = false;
private final UserGroupInformation dagUgi;
+ private final StateChangeNotifier entityStateTracker;
private final Vertex vertex;
private final AppContext appContext;
@@ -75,7 +80,7 @@ public class RootInputInitializerManager {
private final Map<String, InitializerWrapper> initializerMap = new HashMap<String, InitializerWrapper>();
public RootInputInitializerManager(Vertex vertex, AppContext appContext,
- UserGroupInformation dagUgi) {
+ UserGroupInformation dagUgi, StateChangeNotifier stateTracker) {
this.appContext = appContext;
this.vertex = vertex;
this.eventHandler = appContext.getEventHandler();
@@ -83,6 +88,7 @@ public class RootInputInitializerManager {
.setDaemon(true).setNameFormat("InputInitializer [" + this.vertex.getName() + "] #%d").build());
this.executor = MoreExecutors.listeningDecorator(rawExecutor);
this.dagUgi = dagUgi;
+ this.entityStateTracker = stateTracker;
}
public void runInputInitializers(List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
@@ -90,10 +96,11 @@ public class RootInputInitializerManager {
for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input : inputs) {
InputInitializerContext context =
- new TezRootInputInitializerContextImpl(input, vertex, appContext);
+ new TezRootInputInitializerContextImpl(input, vertex, appContext, this);
InputInitializer initializer = createInitializer(input, context);
- InitializerWrapper initializerWrapper = new InitializerWrapper(input, initializer, context, vertex);
+ InitializerWrapper initializerWrapper =
+ new InitializerWrapper(input, initializer, context, vertex, entityStateTracker);
initializerMap.put(input.getName(), initializerWrapper);
ListenableFuture<List<Event>> future = executor
.submit(new InputInitializerCallable(initializerWrapper, dagUgi));
@@ -101,7 +108,6 @@ public class RootInputInitializerManager {
}
}
-
@VisibleForTesting
protected InputInitializer createInitializer(RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>
input, InputInitializerContext context) {
@@ -139,6 +145,14 @@ public class RootInputInitializerManager {
}
}
+ public void registerForVertexUpdates(String vertexName, String inputName,
+ @Nullable Set<org.apache.tez.dag.api.event.VertexState> stateSet) {
+ Preconditions.checkNotNull(vertexName, "VertexName cannot be null: " + vertexName);
+ Preconditions.checkNotNull(inputName, "InputName cannot be null");
+ InitializerWrapper initializer = initializerMap.get(inputName);
+ initializer.registerForVertexStateUpdates(vertexName, stateSet);
+ }
+
@VisibleForTesting
protected InputInitializerCallback createInputInitializerCallback(InitializerWrapper initializer) {
return new InputInitializerCallback(initializer, eventHandler, vertex.getVertexId());
@@ -218,7 +232,7 @@ public class RootInputInitializerManager {
}
}
- private static class InitializerWrapper {
+ private static class InitializerWrapper implements VertexStateUpdateListener {
private final RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input;
@@ -226,14 +240,17 @@ public class RootInputInitializerManager {
private final InputInitializerContext context;
private final AtomicBoolean isComplete = new AtomicBoolean(false);
private final String vertexLogIdentifier;
+ private final StateChangeNotifier stateChangeNotifier;
+ private final List<String> notificationRegisteredVertices = Lists.newArrayList();
InitializerWrapper(RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input,
InputInitializer initializer, InputInitializerContext context,
- Vertex vertex) {
+ Vertex vertex, StateChangeNotifier stateChangeNotifier) {
this.input = input;
this.initializer = initializer;
this.context = context;
this.vertexLogIdentifier = vertex.getLogIdentifier();
+ this.stateChangeNotifier = stateChangeNotifier;
}
public RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> getInput() {
@@ -254,6 +271,36 @@ public class RootInputInitializerManager {
public void setComplete() {
this.isComplete.set(true);
+ unregisterForVertexStatusUpdates();
+ }
+
+ public void registerForVertexStateUpdates(String vertexName, Set<VertexState> stateSet) {
+ synchronized(notificationRegisteredVertices) {
+ notificationRegisteredVertices.add(vertexName);
+ }
+ stateChangeNotifier.registerForVertexUpdates(vertexName, stateSet, this);
+ }
+
+ private void unregisterForVertexStatusUpdates() {
+ synchronized (notificationRegisteredVertices) {
+ for (String vertexName : notificationRegisteredVertices) {
+ stateChangeNotifier.unregisterForVertexUpdates(vertexName, this);
+ }
+
+ }
+ }
+
+ @Override
+ public void onStateUpdated(VertexStateUpdate event) {
+ if (isComplete()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Dropping state update for vertex=" + event.getVertexName() + ", state=" +
+ event.getVertexState() +
+ " since initializer " + input.getName() + " is already complete.");
+ }
+ } else {
+ initializer.onVertexStateUpdated(event);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/88acd71c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
new file mode 100644
index 0000000..558fc61
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag;
+
+
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.SetMultimap;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.records.TezVertexID;
+
+/**
+ * Tracks status updates from various components, and informs registered components about updates.
+ */
+@InterfaceAudience.Private
+public class StateChangeNotifier {
+
+ private final DAG dag;
+ private final SetMultimap<TezVertexID, ListenerContainer> vertexListeners;
+ private final ListMultimap<TezVertexID, VertexStateUpdate> lastKnowStatesMap;
+ private final ReentrantReadWriteLock listenersLock = new ReentrantReadWriteLock();
+ private final ReentrantReadWriteLock.ReadLock readLock = listenersLock.readLock();
+ private final ReentrantReadWriteLock.WriteLock writeLock = listenersLock.writeLock();
+
+ public StateChangeNotifier(DAG dag) {
+ this.dag = dag;
+ this.vertexListeners = Multimaps.synchronizedSetMultimap(
+ HashMultimap.<TezVertexID, ListenerContainer>create());
+ this.lastKnowStatesMap = LinkedListMultimap.create();
+ }
+
+ public void registerForVertexUpdates(String vertexName,
+ Set<org.apache.tez.dag.api.event.VertexState> stateSet,
+ VertexStateUpdateListener listener) {
+ Preconditions.checkNotNull(vertexName, "VertexName cannot be null");
+ Vertex vertex = dag.getVertex(vertexName);
+ Preconditions.checkNotNull(vertex, "Vertex does not exist: " + vertexName);
+ TezVertexID vertexId = vertex.getVertexId();
+ writeLock.lock();
+ // Read within the lock, to ensure a consistent view is seen.
+ List<VertexStateUpdate> previousUpdates = lastKnowStatesMap.get(vertexId);
+ try {
+ ListenerContainer listenerContainer = new ListenerContainer(listener, stateSet);
+ Set<ListenerContainer> listenerContainers = vertexListeners.get(vertexId);
+ if (listenerContainers == null || !listenerContainers.contains(listenerContainer)) {
+ vertexListeners.put(vertexId, listenerContainer);
+ // Send the last known state immediately, if it isn't null.
+ // Sent from within the lock to avoid duplicate events, and out of order events.
+ if (previousUpdates != null && !previousUpdates.isEmpty()) {
+ for (VertexStateUpdate update : previousUpdates) {
+ listenerContainer.sendStateUpdate(update);
+ }
+ }
+ } else {
+ // Disallow multiple register calls.
+ throw new TezUncheckedException(
+ "Only allowed to register once for a listener. CurrentContext: vertexName=" +
+ vertexName + ", Listener: " + listener);
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ // KKK Send out current state.
+
+
+ public void unregisterForVertexUpdates(String vertexName, VertexStateUpdateListener listener) {
+ Preconditions.checkNotNull(vertexName, "VertexName cannot be null");
+ Vertex vertex = dag.getVertex(vertexName);
+ Preconditions.checkNotNull(vertex, "Vertex does not exist: " + vertexName);
+ TezVertexID vertexId = vertex.getVertexId();
+ writeLock.lock();
+ try {
+ ListenerContainer listenerContainer = new ListenerContainer(listener, null);
+ vertexListeners.remove(vertexId, listenerContainer);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public void stateChanged(TezVertexID vertexId, VertexStateUpdate vertexStateUpdate) {
+ readLock.lock();
+ try {
+ lastKnowStatesMap.put(vertexId, vertexStateUpdate);
+ if (vertexListeners.containsKey(vertexId)) {
+ sendStateUpdate(vertexId, vertexStateUpdate);
+ }
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ private void sendStateUpdate(TezVertexID vertexId,
+ VertexStateUpdate event) {
+ for (ListenerContainer listenerContainer : vertexListeners.get(vertexId)) {
+ listenerContainer.sendStateUpdate(event);
+ }
+
+ }
+
+ private static final class ListenerContainer {
+ final VertexStateUpdateListener listener;
+ final Set<org.apache.tez.dag.api.event.VertexState> states;
+
+ private ListenerContainer(VertexStateUpdateListener listener,
+ Set<org.apache.tez.dag.api.event.VertexState> states) {
+ this.listener = listener;
+ if (states == null) {
+ this.states = EnumSet.allOf(org.apache.tez.dag.api.event.VertexState.class);
+ } else {
+ this.states = states;
+ }
+ }
+
+ private void sendStateUpdate(VertexStateUpdate stateUpdate) {
+ if (states.contains(stateUpdate.getVertexState())) {
+ listener.onStateUpdated(stateUpdate);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ListenerContainer that = (ListenerContainer) o;
+
+ // Explicit reference comparison
+ return listener == that.listener;
+ }
+
+ @Override
+ public int hashCode() {
+ return System.identityHashCode(listener);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/88acd71c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexStateUpdateListener.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexStateUpdateListener.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexStateUpdateListener.java
new file mode 100644
index 0000000..f636c5d
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexStateUpdateListener.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+
+@InterfaceAudience.Private
+/**
+ * This class should not be implemented by user facing APIs such as InputInitializer
+ */
+public interface VertexStateUpdateListener {
+ public void onStateUpdated(VertexStateUpdate event);
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/88acd71c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 24b41a5..680e31a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -76,6 +76,7 @@ import org.apache.tez.dag.app.dag.DAGReport;
import org.apache.tez.dag.app.dag.DAGScheduler;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.DAGTerminationCause;
+import org.apache.tez.dag.app.dag.StateChangeNotifier;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.app.dag.VertexTerminationCause;
@@ -153,6 +154,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
private final AppContext appContext;
private final UserGroupInformation dagUGI;
private final ACLManager aclManager;
+ private final StateChangeNotifier entityUpdateTracker;
volatile Map<TezVertexID, Vertex> vertices = new HashMap<TezVertexID, Vertex>();
private Map<String, Edge> edges = new HashMap<String, Edge>();
@@ -434,6 +436,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
// This "this leak" is okay because the retained pointer is in an
// instance variable.
stateMachine = stateMachineFactory.make(this);
+ this.entityUpdateTracker = new StateChangeNotifier(this);
}
protected StateMachine<DAGState, DAGEventType, DAGEvent> getStateMachine() {
@@ -1243,7 +1246,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
dag.eventHandler, dag.taskAttemptListener,
dag.clock, dag.taskHeartbeatHandler,
!dag.commitAllOutputsOnSuccess, dag.appContext, vertexLocationHint,
- dag.vertexGroups, dag.taskSpecificLaunchCmdOption);
+ dag.vertexGroups, dag.taskSpecificLaunchCmdOption, dag.entityUpdateTracker);
return v;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/88acd71c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
index a9b9bca..846d208 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
@@ -20,13 +20,17 @@ package org.apache.tez.dag.app.dag.impl;
import static com.google.common.base.Preconditions.checkNotNull;
+import java.util.Set;
+
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.RootInputLeafOutput;
import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.dag.RootInputInitializerManager;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.runtime.api.InputInitializerContext;
@@ -36,18 +40,23 @@ public class TezRootInputInitializerContextImpl implements
private RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input;
private final Vertex vertex;
private final AppContext appContext;
+ private final RootInputInitializerManager manager;
+
// TODO Add support for counters - merged with the Vertex counters.
public TezRootInputInitializerContextImpl(
RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input,
- Vertex vertex, AppContext appContext) {
+ Vertex vertex, AppContext appContext,
+ RootInputInitializerManager manager) {
checkNotNull(input, "input is null");
checkNotNull(vertex, "vertex is null");
checkNotNull(appContext, "appContext is null");
+ checkNotNull(manager, "initializerManager is null");
this.input = input;
this.vertex = vertex;
this.appContext = appContext;
+ this.manager = manager;
}
@Override
@@ -105,4 +114,9 @@ public class TezRootInputInitializerContextImpl implements
return appContext.getCurrentDAG().getVertex(vertexName).getTotalTasks();
}
+ @Override
+ public void registerForVertexStatusUpdates(String vertexName, Set<VertexState> stateSet) {
+ manager.registerForVertexUpdates(vertexName, input.getName(), stateSet);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/88acd71c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 5787a11..ff556ba 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -77,6 +77,8 @@ import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.api.client.VertexStatus;
import org.apache.tez.dag.api.client.VertexStatus.State;
import org.apache.tez.dag.api.client.VertexStatusBuilder;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.api.event.VertexStateUpdateParallelismUpdated;
import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
@@ -86,6 +88,7 @@ import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.RootInputInitializerManager;
+import org.apache.tez.dag.app.dag.StateChangeNotifier;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
import org.apache.tez.dag.app.dag.TaskTerminationCause;
@@ -161,6 +164,8 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multiset;
import com.google.common.collect.Sets;
+import org.apache.tez.state.OnStateChangedCallback;
+import org.apache.tez.state.StateMachineTez;
/** Implementation of Vertex interface. Maintains the state machines of Vertex.
* The read and write calls use ReadWriteLock for concurrency.
@@ -207,6 +212,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private int distanceFromRoot = 0;
private final List<String> diagnostics = new ArrayList<String>();
+
+ protected final StateChangeNotifier stateChangeNotifier;
//task/attempt related datastructures
@VisibleForTesting
@@ -227,6 +234,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private static final SourceTaskAttemptCompletedEventTransition
SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION =
new SourceTaskAttemptCompletedEventTransition();
+ private static final VertexStateChangedCallback STATE_CHANGED_CALLBACK =
+ new VertexStateChangedCallback();
private VertexState recoveredState = VertexState.NEW;
private List<TezEvent> recoveredEvents = new ArrayList<TezEvent>();
@@ -527,8 +536,19 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// create the topology tables
.installTopology();
- private final StateMachine<VertexState, VertexEventType, VertexEvent>
- stateMachine;
+ private void augmentStateMachine() {
+ stateMachine
+ .registerStateEnteredCallback(VertexState.SUCCEEDED,
+ STATE_CHANGED_CALLBACK)
+ .registerStateEnteredCallback(VertexState.FAILED,
+ STATE_CHANGED_CALLBACK)
+ .registerStateEnteredCallback(VertexState.KILLED,
+ STATE_CHANGED_CALLBACK)
+ .registerStateEnteredCallback(VertexState.RUNNING,
+ STATE_CHANGED_CALLBACK);
+ }
+
+ private final StateMachineTez<VertexState, VertexEventType, VertexEvent, VertexImpl> stateMachine;
//changing fields while the vertex is running
private int numTasks;
@@ -617,7 +637,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
TaskAttemptListener taskAttemptListener, Clock clock,
TaskHeartbeatHandler thh, boolean commitVertexOutputs,
AppContext appContext, VertexLocationHint vertexLocationHint,
- Map<String, VertexGroupInfo> dagVertexGroups, TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption) {
+ Map<String, VertexGroupInfo> dagVertexGroups, TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption,
+ StateChangeNotifier entityStatusTracker) {
this.vertexId = vertexId;
this.vertexPlan = vertexPlan;
this.vertexName = StringInterner.weakIntern(vertexName);
@@ -663,17 +684,22 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
if (vertexPlan.getOutputsCount() > 0) {
setAdditionalOutputs(vertexPlan.getOutputsList());
}
+ this.stateChangeNotifier = entityStatusTracker;
// Setup the initial parallelism early. This may be changed after
// initialization or on a setParallelism call.
this.numTasks = vertexPlan.getTaskConfig().getNumTasks();
+ // Not sending the notifier a parallelism update since this is the initial parallelism
this.dagVertexGroups = dagVertexGroups;
logIdentifier = this.getVertexId() + " [" + this.getName() + "]";
// This "this leak" is okay because the retained pointer is in an
// instance variable.
- stateMachine = stateMachineFactory.make(this);
+
+ stateMachine = new StateMachineTez<VertexState, VertexEventType, VertexEvent, VertexImpl>(
+ stateMachineFactory.make(this), this);
+ augmentStateMachine();
}
protected StateMachine<VertexState, VertexEventType, VertexEvent> getStateMachine() {
@@ -1003,7 +1029,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
if (updatedEvent.getVertexLocationHint() != null) {
setTaskLocationHints(updatedEvent.getVertexLocationHint());
}
+ int oldNumTasks = numTasks;
numTasks = updatedEvent.getNumTasks();
+ stateChangeNotifier.stateChanged(vertexId,
+ new VertexStateUpdateParallelismUpdated(vertexName, numTasks, oldNumTasks));
handleParallelismUpdate(numTasks, updatedEvent.getSourceEdgeManagers(),
updatedEvent.getRootInputSpecUpdates());
if (LOG.isDebugEnabled()) {
@@ -1189,7 +1218,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
this.rootInputSpecs.putAll(rootInputSpecUpdates);
}
+ int oldNumTasks = numTasks;
this.numTasks = parallelism;
+ stateChangeNotifier.stateChanged(vertexId,
+ new VertexStateUpdateParallelismUpdated(vertexName, numTasks, oldNumTasks));
this.createTasks();
LOG.info("Vertex " + getVertexId() +
" parallelism set to " + parallelism);
@@ -1251,7 +1283,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
LOG.info("Vertex " + logIdentifier +
" parallelism set to " + parallelism + " from " + numTasks);
+ int oldNumTasks = numTasks;
this.numTasks = parallelism;
+ stateChangeNotifier.stateChanged(vertexId,
+ new VertexStateUpdateParallelismUpdated(vertexName, numTasks, oldNumTasks));
assert tasks.size() == numTasks;
// set new edge managers
@@ -1816,9 +1851,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// For VertexManagers setting parallelism, the setParallelism call needs
// to be inline.
if (event != null) {
+ int oldNumTasks = numTasks;
numTasks = event.getNumTasks();
+ stateChangeNotifier.stateChanged(vertexId,
+ new VertexStateUpdateParallelismUpdated(vertexName, numTasks, oldNumTasks));
} else {
numTasks = getVertexPlan().getTaskConfig().getNumTasks();
+ // Not sending a parallelism update notification since this is from the original plan
}
if (!(numTasks == -1 || numTasks >= 0)) {
@@ -2670,7 +2709,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
String dagName, String vertexName, TezVertexID vertexID,
EventHandler eventHandler, int numTasks, int numNodes,
Resource vertexTaskResource, Resource totalResource) {
- return new RootInputInitializerManager(this, appContext, this.dagUgi);
+ return new RootInputInitializerManager(this, appContext, this.dagUgi, this.stateChangeNotifier);
}
private boolean initializeVertexInInitializingState() {
@@ -3456,6 +3495,40 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
}
+ private static class VertexStateChangedCallback
+ implements OnStateChangedCallback<VertexState, VertexImpl> {
+
+ @Override
+ public void onStateChanged(VertexImpl vertex, VertexState vertexState) {
+ vertex.stateChangeNotifier.stateChanged(vertex.getVertexId(),
+ new VertexStateUpdate(vertex.getName(), convertInternalState(
+ vertexState, vertex.getVertexId())));
+ }
+
+ private org.apache.tez.dag.api.event.VertexState convertInternalState(VertexState vertexState,
+ TezVertexID vertexId) {
+ switch (vertexState) {
+ case RUNNING:
+ return org.apache.tez.dag.api.event.VertexState.RUNNING;
+ case SUCCEEDED:
+ return org.apache.tez.dag.api.event.VertexState.SUCCEEDED;
+ case FAILED:
+ return org.apache.tez.dag.api.event.VertexState.FAILED;
+ case KILLED:
+ return org.apache.tez.dag.api.event.VertexState.KILLED;
+ case NEW:
+ case INITIALIZING:
+ case INITED:
+ case ERROR:
+ case TERMINATING:
+ case RECOVERING:
+ default:
+ throw new TezUncheckedException(
+ "Not expecting state updates for state: " + vertexState + ", VertexID: " + vertexId);
+ }
+ }
+ }
+
@Override
public void setInputVertices(Map<Vertex, Edge> inVertices) {
this.sourceVertices = inVertices;
http://git-wip-us.apache.org/repos/asf/tez/blob/88acd71c/tez-dag/src/main/java/org/apache/tez/state/OnStateChangedCallback.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/state/OnStateChangedCallback.java b/tez-dag/src/main/java/org/apache/tez/state/OnStateChangedCallback.java
new file mode 100644
index 0000000..53767fc
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/state/OnStateChangedCallback.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.state;
+
+
+public interface OnStateChangedCallback<STATE extends Enum<STATE>, OPERAND> {
+ public void onStateChanged(OPERAND operand, STATE state);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/88acd71c/tez-dag/src/main/java/org/apache/tez/state/StateMachineTez.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/state/StateMachineTez.java b/tez-dag/src/main/java/org/apache/tez/state/StateMachineTez.java
new file mode 100644
index 0000000..0fd3c0e
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/state/StateMachineTez.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.state;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.tez.dag.records.TezID;
+
+public class StateMachineTez<STATE extends Enum<STATE>, EVENTTYPE extends Enum<EVENTTYPE>, EVENT, OPERAND>
+ implements StateMachine<STATE, EVENTTYPE, EVENT> {
+
+ private final Map<STATE, OnStateChangedCallback> callbackMap =
+ new HashMap<STATE, OnStateChangedCallback>();
+ private final OPERAND operand;
+
+ private final StateMachine<STATE, EVENTTYPE, EVENT> realStatemachine;
+
+ public StateMachineTez(StateMachine sm, OPERAND operand) {
+ this.realStatemachine = sm;
+ this.operand = operand;
+ }
+
+ public StateMachineTez registerStateEnteredCallback(STATE state,
+ OnStateChangedCallback callback) {
+ callbackMap.put(state, callback);
+ return this;
+ }
+
+ @Override
+ public STATE getCurrentState() {
+ return realStatemachine.getCurrentState();
+ }
+
+ @Override
+ public STATE doTransition(EVENTTYPE eventType, EVENT event) throws
+ InvalidStateTransitonException {
+ STATE oldState = realStatemachine.getCurrentState();
+ STATE newState = realStatemachine.doTransition(eventType, event);
+ if (newState != oldState) {
+ OnStateChangedCallback callback = callbackMap.get(newState);
+ if (callback != null) {
+ callback.onStateChanged(operand, newState);
+ }
+ }
+ return newState;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/88acd71c/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestStateChangeNotifier.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestStateChangeNotifier.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestStateChangeNotifier.java
new file mode 100644
index 0000000..6a505ef
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestStateChangeNotifier.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.api.event.VertexStateUpdateParallelismUpdated;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+public class TestStateChangeNotifier {
+
+ @Test(timeout = 5000)
+ public void testEventsOnRegistration() {
+ TezDAGID dagId = TezDAGID.getInstance("1", 1, 1);
+ Vertex v1 = createMockVertex(dagId, 1);
+ Vertex v2 = createMockVertex(dagId, 2);
+ Vertex v3 = createMockVertex(dagId, 3);
+ DAG dag = createMockDag(dagId, v1, v2, v3);
+
+ StateChangeNotifier tracker = new StateChangeNotifier(dag);
+
+ // Vertex has sent one event
+ notifyTracker(tracker, v1, VertexState.RUNNING);
+ VertexStateUpdateListener mockListener11 = mock(VertexStateUpdateListener.class);
+ VertexStateUpdateListener mockListener12 = mock(VertexStateUpdateListener.class);
+ VertexStateUpdateListener mockListener13 = mock(VertexStateUpdateListener.class);
+ VertexStateUpdateListener mockListener14 = mock(VertexStateUpdateListener.class);
+ // Register for all states
+ tracker.registerForVertexUpdates(v1.getName(), null, mockListener11);
+ // Register for all states
+ tracker.registerForVertexUpdates(v1.getName(), EnumSet.allOf(
+ VertexState.class), mockListener12);
+ // Register for specific state, event generated
+ tracker.registerForVertexUpdates(v1.getName(), EnumSet.of(
+ VertexState.RUNNING), mockListener13);
+ // Register for specific state, event not generated
+ tracker.registerForVertexUpdates(v1.getName(), EnumSet.of(
+ VertexState.SUCCEEDED), mockListener14);
+ ArgumentCaptor<VertexStateUpdate> argumentCaptor =
+ ArgumentCaptor.forClass(VertexStateUpdate.class);
+
+ verify(mockListener11, times(1)).onStateUpdated(argumentCaptor.capture());
+ assertEquals(VertexState.RUNNING,
+ argumentCaptor.getValue().getVertexState());
+ verify(mockListener12, times(1)).onStateUpdated(argumentCaptor.capture());
+ assertEquals(VertexState.RUNNING,
+ argumentCaptor.getValue().getVertexState());
+ verify(mockListener13, times(1)).onStateUpdated(argumentCaptor.capture());
+ assertEquals(VertexState.RUNNING,
+ argumentCaptor.getValue().getVertexState());
+ verify(mockListener14, never()).onStateUpdated(any(VertexStateUpdate.class));
+
+ // Vertex has not notified of state
+ VertexStateUpdateListener mockListener2 = mock(VertexStateUpdateListener.class);
+ tracker.registerForVertexUpdates(v2.getName(), null, mockListener2);
+ verify(mockListener2, never()).onStateUpdated(any(VertexStateUpdate.class));
+
+ // Vertex has notified about parallelism update only
+ tracker.stateChanged(v3.getVertexId(), new VertexStateUpdateParallelismUpdated(v3.getName(), 23, -1));
+ VertexStateUpdateListener mockListener3 = mock(VertexStateUpdateListener.class);
+ tracker.registerForVertexUpdates(v3.getName(), null, mockListener3);
+ verify(mockListener3, times(1)).onStateUpdated(argumentCaptor.capture());
+ assertEquals(VertexState.PARALLELISM_UPDATED,
+ argumentCaptor.getValue().getVertexState());
+ }
+
+ @Test(timeout = 5000)
+ public void testSimpleStateUpdates() {
+ TezDAGID dagId = TezDAGID.getInstance("1", 1, 1);
+ Vertex v1 = createMockVertex(dagId, 1);
+ DAG dag = createMockDag(dagId, v1);
+
+ StateChangeNotifier tracker = new StateChangeNotifier(dag);
+
+ VertexStateUpdateListener mockListener = mock(VertexStateUpdateListener.class);
+ tracker.registerForVertexUpdates(v1.getName(), null, mockListener);
+
+ List<VertexState> expectedStates = Lists.newArrayList(
+ VertexState.RUNNING,
+ VertexState.SUCCEEDED,
+ VertexState.FAILED,
+ VertexState.KILLED,
+ VertexState.RUNNING,
+ VertexState.SUCCEEDED);
+
+ for (VertexState state : expectedStates) {
+ notifyTracker(tracker, v1, state);
+ }
+
+ ArgumentCaptor<VertexStateUpdate> argumentCaptor =
+ ArgumentCaptor.forClass(VertexStateUpdate.class);
+ verify(mockListener, times(expectedStates.size())).onStateUpdated(argumentCaptor.capture());
+ List<VertexStateUpdate> stateUpdatesSent = argumentCaptor.getAllValues();
+
+ Iterator<VertexState> expectedStateIter =
+ expectedStates.iterator();
+ for (int i = 0; i < expectedStates.size(); i++) {
+ assertEquals(expectedStateIter.next(), stateUpdatesSent.get(i).getVertexState());
+ }
+ }
+
+ @Test(timeout = 5000)
+ public void testDuplicateRegistration() {
+ TezDAGID dagId = TezDAGID.getInstance("1", 1, 1);
+ Vertex v1 = createMockVertex(dagId, 1);
+ DAG dag = createMockDag(dagId, v1);
+
+ StateChangeNotifier tracker = new StateChangeNotifier(dag);
+ VertexStateUpdateListener mockListener = mock(VertexStateUpdateListener.class);
+
+ tracker.registerForVertexUpdates(v1.getName(), null, mockListener);
+ try {
+ tracker.registerForVertexUpdates(v1.getName(), null, mockListener);
+ fail("Expecting an error from duplicate registrations of the same listener");
+ } catch (TezUncheckedException e) {
+ // Expected, ignore
+ }
+ }
+
+ @Test(timeout = 5000)
+ public void testSpecificStateUpdates() {
+ TezDAGID dagId = TezDAGID.getInstance("1", 1, 1);
+ Vertex v1 = createMockVertex(dagId, 1);
+ DAG dag = createMockDag(dagId, v1);
+
+ StateChangeNotifier tracker = new StateChangeNotifier(dag);
+
+ VertexStateUpdateListener mockListener = mock(VertexStateUpdateListener.class);
+ tracker.registerForVertexUpdates(v1.getName(), EnumSet.of(
+ VertexState.RUNNING,
+ VertexState.SUCCEEDED), mockListener);
+
+ List<VertexState> states = Lists.newArrayList(
+ VertexState.RUNNING,
+ VertexState.SUCCEEDED,
+ VertexState.FAILED,
+ VertexState.KILLED,
+ VertexState.RUNNING,
+ VertexState.SUCCEEDED);
+ List<VertexState> expectedStates = Lists.newArrayList(
+ VertexState.RUNNING,
+ VertexState.SUCCEEDED,
+ VertexState.RUNNING,
+ VertexState.SUCCEEDED);
+
+ for (VertexState state : states) {
+ notifyTracker(tracker, v1, state);
+ }
+
+ ArgumentCaptor<VertexStateUpdate> argumentCaptor =
+ ArgumentCaptor.forClass(VertexStateUpdate.class);
+ verify(mockListener, times(expectedStates.size())).onStateUpdated(argumentCaptor.capture());
+ List<VertexStateUpdate> stateUpdatesSent = argumentCaptor.getAllValues();
+
+ Iterator<VertexState> expectedStateIter =
+ expectedStates.iterator();
+ for (int i = 0; i < expectedStates.size(); i++) {
+ assertEquals(expectedStateIter.next(), stateUpdatesSent.get(i).getVertexState());
+ }
+ }
+
+ @Test(timeout = 5000)
+ public void testUnregister() {
+ TezDAGID dagId = TezDAGID.getInstance("1", 1, 1);
+ Vertex v1 = createMockVertex(dagId, 1);
+ DAG dag = createMockDag(dagId, v1);
+
+ StateChangeNotifier tracker = new StateChangeNotifier(dag);
+
+ VertexStateUpdateListener mockListener = mock(VertexStateUpdateListener.class);
+ tracker.registerForVertexUpdates(v1.getName(), null, mockListener);
+
+ List<VertexState> expectedStates = Lists.newArrayList(
+ VertexState.RUNNING,
+ VertexState.SUCCEEDED,
+ VertexState.FAILED,
+ VertexState.KILLED,
+ VertexState.RUNNING,
+ VertexState.SUCCEEDED);
+
+ int count = 0;
+ int numExpectedEvents = 3;
+ for (VertexState state : expectedStates) {
+ if (count == numExpectedEvents) {
+ tracker.unregisterForVertexUpdates(v1.getName(), mockListener);
+ }
+ notifyTracker(tracker, v1, state);
+ count++;
+ }
+
+ ArgumentCaptor<VertexStateUpdate> argumentCaptor =
+ ArgumentCaptor.forClass(VertexStateUpdate.class);
+ verify(mockListener, times(numExpectedEvents)).onStateUpdated(argumentCaptor.capture());
+ List<VertexStateUpdate> stateUpdatesSent = argumentCaptor.getAllValues();
+
+ Iterator<VertexState> expectedStateIter =
+ expectedStates.iterator();
+ for (int i = 0; i < numExpectedEvents; i++) {
+ assertEquals(expectedStateIter.next(), stateUpdatesSent.get(i).getVertexState());
+ }
+ }
+
+ private DAG createMockDag(TezDAGID dagId, Vertex... vertices) {
+ DAG dag = mock(DAG.class);
+ doReturn(dagId).when(dag).getID();
+ for (Vertex v : vertices) {
+ String vertexName = v.getName();
+ TezVertexID vertexId = v.getVertexId();
+
+ doReturn(v).when(dag).getVertex(vertexName);
+ doReturn(v).when(dag).getVertex(vertexId);
+ }
+ return dag;
+ }
+
+ private Vertex createMockVertex(TezDAGID dagId, int id) {
+ TezVertexID vertexId = TezVertexID.getInstance(dagId, id);
+ String vertexName = "vertex" + id;
+ Vertex v = mock(Vertex.class);
+ doReturn(vertexId).when(v).getVertexId();
+ doReturn(vertexName).when(v).getName();
+ return v;
+ }
+
+ private void notifyTracker(StateChangeNotifier notifier, Vertex v,
+ VertexState state) {
+ notifier.stateChanged(v.getVertexId(), new VertexStateUpdate(v.getName(), state));
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/88acd71c/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 6e2bd9d..31aaf6f 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -83,6 +83,7 @@ import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.api.client.VertexStatus;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
@@ -103,6 +104,7 @@ import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.RootInputInitializerManager;
+import org.apache.tez.dag.app.dag.StateChangeNotifier;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
import org.apache.tez.dag.app.dag.Vertex;
@@ -195,6 +197,7 @@ public class TestVertexImpl {
private VertexEventDispatcher vertexEventDispatcher;
private DagEventDispatcher dagEventDispatcher;
private HistoryEventHandler historyEventHandler;
+ private StateChangeNotifier updateTracker;
private static TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption;
public static class CountingOutputCommitter extends OutputCommitter {
@@ -1633,16 +1636,17 @@ public class TestVertexImpl {
if (customInitializer == null) {
v = new VertexImplWithControlledInitializerManager(vertexId, vPlan, vPlan.getName(), conf,
dispatcher.getEventHandler(), taskAttemptListener,
- clock, thh, appContext, locationHint, dispatcher);
+ clock, thh, appContext, locationHint, dispatcher, updateTracker);
} else {
v = new VertexImplWithRunningInputInitializer(vertexId, vPlan, vPlan.getName(), conf,
dispatcher.getEventHandler(), taskAttemptListener,
- clock, thh, appContext, locationHint, dispatcher, customInitializer);
+ clock, thh, appContext, locationHint, dispatcher, customInitializer, updateTracker);
}
} else {
v = new VertexImpl(vertexId, vPlan, vPlan.getName(), conf,
dispatcher.getEventHandler(), taskAttemptListener,
- clock, thh, true, appContext, locationHint, vertexGroups, taskSpecificLaunchCmdOption);
+ clock, thh, true, appContext, locationHint, vertexGroups, taskSpecificLaunchCmdOption,
+ updateTracker);
}
vertices.put(vName, v);
vertexIdMap.put(vertexId, v);
@@ -1693,6 +1697,7 @@ public class TestVertexImpl {
}
public void setupPreDagCreation() {
+ LOG.info("____________ RESETTING CURRENT DAG ____________");
conf = new Configuration();
conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
appAttemptId = ApplicationAttemptId.newInstance(
@@ -1736,6 +1741,7 @@ public class TestVertexImpl {
for (PlanVertexGroupInfo groupInfo : dagPlan.getVertexGroupsList()) {
vertexGroups.put(groupInfo.getGroupName(), new VertexGroupInfo(groupInfo));
}
+ updateTracker = new StateChangeNotifier(dag);
setupVertices();
when(dag.getVertex(any(TezVertexID.class))).thenAnswer(new Answer<Vertex>() {
@Override
@@ -1773,7 +1779,7 @@ public class TestVertexImpl {
for (Edge edge : edges.values()) {
edge.initialize();
}
-
+
taskAttemptEventDispatcher = new TaskAttemptEventDispatcher();
dispatcher.register(TaskAttemptEventType.class, taskAttemptEventDispatcher);
taskEventDispatcher = new TaskEventDispatcher();
@@ -2926,6 +2932,42 @@ public class TestVertexImpl {
Assert.assertEquals(VertexState.SUCCEEDED, v1.getState());
}
+ @Test(timeout = 10000)
+ public void testInputInitializerVertexStateUpdates() throws Exception {
+ // v2 running an Input initializer, which is subscribed to events on v1.
+ useCustomInitializer = true;
+ customInitializer = new EventHandlingRootInputInitializer(null);
+ // Using the EventHandlingRootInputInitializer since it keeps the initializer alive till signalled,
+ // which is required to track events that it receives.
+ EventHandlingRootInputInitializer initializer =
+ (EventHandlingRootInputInitializer) customInitializer;
+ setupPreDagCreation();
+ dagPlan = createDAGPlanWithRunningInitializer();
+ setupPostDagCreation();
+
+ VertexImplWithRunningInputInitializer v1 =
+ (VertexImplWithRunningInputInitializer) vertices.get("vertex1");
+
+ initVertex(v1);
+ startVertex(v1);
+ Assert.assertEquals(VertexState.RUNNING, v1.getState());
+
+ // Make v1 succeed
+ for (TezTaskID taskId : v1.getTasks().keySet()) {
+ v1.handle(new VertexEventTaskCompleted(taskId, TaskState.SUCCEEDED));
+ }
+ dispatcher.await();
+
+ Assert.assertEquals(VertexState.SUCCEEDED, v1.getState());
+
+ // At this point, 2 events should have been received - since the dispatcher is complete.
+ Assert.assertEquals(2, initializer.stateUpdateEvents.size());
+ Assert.assertEquals(org.apache.tez.dag.api.event.VertexState.RUNNING,
+ initializer.stateUpdateEvents.get(0).getVertexState());
+ Assert.assertEquals(org.apache.tez.dag.api.event.VertexState.SUCCEEDED,
+ initializer.stateUpdateEvents.get(1).getVertexState());
+ }
+
@SuppressWarnings("unchecked")
@Test(timeout = 10000)
public void testRootInputInitializerEvent() throws Exception {
@@ -3171,7 +3213,8 @@ public class TestVertexImpl {
VertexPlan vPlan = invalidDagPlan.getVertex(0);
VertexImpl v = new VertexImpl(vId, vPlan, vPlan.getName(), conf,
dispatcher.getEventHandler(), taskAttemptListener,
- clock, thh, true, appContext, vertexLocationHint, null, taskSpecificLaunchCmdOption);
+ clock, thh, true, appContext, vertexLocationHint, null, taskSpecificLaunchCmdOption,
+ updateTracker);
vertexIdMap.put(vId, v);
vertices.put(v.getName(), v);
v.handle(new VertexEvent(vId, VertexEventType.V_INIT));
@@ -3202,10 +3245,12 @@ public class TestVertexImpl {
AppContext appContext,
VertexLocationHint vertexLocationHint,
DrainDispatcher dispatcher,
- InputInitializer presetInitializer) {
+ InputInitializer presetInitializer,
+ StateChangeNotifier updateTracker) {
super(vertexId, vertexPlan, vertexName, conf, eventHandler,
taskAttemptListener, clock, thh, true,
- appContext, vertexLocationHint, null, taskSpecificLaunchCmdOption);
+ appContext, vertexLocationHint, null, taskSpecificLaunchCmdOption,
+ updateTracker);
this.presetInitializer = presetInitializer;
}
@@ -3217,7 +3262,7 @@ public class TestVertexImpl {
try {
rootInputInitializerManager =
new RootInputInitializerManagerWithRunningInitializer(this, this.getAppContext(),
- presetInitializer);
+ presetInitializer, stateChangeNotifier);
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -3239,10 +3284,12 @@ public class TestVertexImpl {
Clock clock, TaskHeartbeatHandler thh,
AppContext appContext,
VertexLocationHint vertexLocationHint,
- DrainDispatcher dispatcher) {
+ DrainDispatcher dispatcher,
+ StateChangeNotifier updateTracker) {
super(vertexId, vertexPlan, vertexName, conf, eventHandler,
taskAttemptListener, clock, thh, true,
- appContext, vertexLocationHint, null, taskSpecificLaunchCmdOption);
+ appContext, vertexLocationHint, null, taskSpecificLaunchCmdOption,
+ updateTracker);
this.dispatcher = dispatcher;
}
@@ -3254,7 +3301,7 @@ public class TestVertexImpl {
try {
rootInputInitializerManager =
new RootInputInitializerManagerControlled(this, this.getAppContext(), eventHandler,
- dispatcher);
+ dispatcher, stateChangeNotifier);
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -3273,9 +3320,10 @@ public class TestVertexImpl {
private final InputInitializer presetInitializer;
public RootInputInitializerManagerWithRunningInitializer(Vertex vertex, AppContext appContext,
- InputInitializer presetInitializer) throws
+ InputInitializer presetInitializer,
+ StateChangeNotifier tracker) throws
IOException {
- super(vertex, appContext, UserGroupInformation.getCurrentUser());
+ super(vertex, appContext, UserGroupInformation.getCurrentUser(), tracker);
this.presetInitializer = presetInitializer;
}
@@ -3284,6 +3332,9 @@ public class TestVertexImpl {
protected InputInitializer createInitializer(
RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input,
InputInitializerContext context) {
+ if (presetInitializer instanceof ContextSettableInputInitialzier) {
+ ((ContextSettableInputInitialzier)presetInitializer).setContext(context);
+ }
return presetInitializer;
}
}
@@ -3300,9 +3351,10 @@ public class TestVertexImpl {
public RootInputInitializerManagerControlled(Vertex vertex, AppContext appContext,
EventHandler eventHandler,
- DrainDispatcher dispatcher
+ DrainDispatcher dispatcher,
+ StateChangeNotifier tracker
) throws IOException {
- super(vertex, appContext, UserGroupInformation.getCurrentUser());
+ super(vertex, appContext, UserGroupInformation.getCurrentUser(), tracker);
this.eventHandler = eventHandler;
this.dispatcher = dispatcher;
this.vertexID = vertex.getVertexId();
@@ -3590,7 +3642,8 @@ public class TestVertexImpl {
}
@InterfaceAudience.Private
- public static class EventHandlingRootInputInitializer extends InputInitializer {
+ public static class EventHandlingRootInputInitializer extends InputInitializer
+ implements ContextSettableInputInitialzier {
final AtomicBoolean initStarted = new AtomicBoolean(false);
final AtomicBoolean eventReceived = new AtomicBoolean(false);
@@ -3599,6 +3652,9 @@ public class TestVertexImpl {
private final ReentrantLock lock = new ReentrantLock();
private final Condition eventCondition = lock.newCondition();
+ private final List<VertexStateUpdate> stateUpdateEvents = new LinkedList<VertexStateUpdate>();
+ private volatile InputInitializerContext context;
+
public EventHandlingRootInputInitializer(
InputInitializerContext initializerContext) {
super(initializerContext);
@@ -3606,6 +3662,7 @@ public class TestVertexImpl {
@Override
public List<Event> initialize() throws Exception {
+ context.registerForVertexStatusUpdates("vertex1", null);
initStarted.set(true);
lock.lock();
try {
@@ -3632,5 +3689,18 @@ public class TestVertexImpl {
lock.unlock();
}
}
+
+ @Override
+ public void setContext(InputInitializerContext context) {
+ this.context = context;
+ }
+
+ public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
+ stateUpdateEvents.add(stateUpdate);
+ }
+ }
+
+ private interface ContextSettableInputInitialzier {
+ void setContext(InputInitializerContext context);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/88acd71c/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
index b1a0880..55f9b11 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
@@ -29,13 +29,17 @@ import static org.junit.Assert.assertTrue;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.EnumSet;
import java.util.List;
+import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.lib.MRInputUtils;
@@ -208,6 +212,11 @@ public class TestMRInputSplitDistributor {
}
@Override
+ public void registerForVertexStatusUpdates(String vertexName, Set<VertexState> stateSet) {
+ throw new UnsupportedOperationException("getVertexNumTasks not implemented in this mock");
+ }
+
+ @Override
public UserPayload getUserPayload() {
throw new UnsupportedOperationException("getUserPayload not implemented in this mock");
}
[2/2] git commit: Update CHANGES.txt for TEZ-1447 (cherry picked from
commit 44e6a77cc5e6e09e97c5b1525428bf67967c1d25)
Posted by ss...@apache.org.
Update CHANGES.txt for TEZ-1447
(cherry picked from commit 44e6a77cc5e6e09e97c5b1525428bf67967c1d25)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d9ea4353
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d9ea4353
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d9ea4353
Branch: refs/heads/branch-0.5
Commit: d9ea43537bb9333de5213e56d4367bf0c36cb27a
Parents: 88acd71
Author: Siddharth Seth <ss...@apache.org>
Authored: Sun Sep 7 20:23:05 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Sun Sep 7 20:23:51 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/d9ea4353/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c2c075c..0bdb9d9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,7 @@ ALL CHANGES
TEZ-1527. Fix indentation of Vertex status in DAGClient output.
TEZ-1536. Fix spelling typo "configurartion" in TezClientUtils.
TEZ-1310. Update website documentation framework
+ TEZ-1447. Provide a mechanism for InputInitializers to know about Vertex state changes.
Release 0.5.0: Unreleased