You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/09/05 02:35:52 UTC

git commit: TEZ-416. Simplify Event class heirarchy. (hitesh)

Updated Branches:
  refs/heads/TEZ-398 52ff7aca3 -> bc9cee33c


TEZ-416. Simplify Event class heirarchy. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/bc9cee33
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/bc9cee33
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/bc9cee33

Branch: refs/heads/TEZ-398
Commit: bc9cee33c0a9c17e1b664c9521ef6c6ab4362c1c
Parents: 52ff7ac
Author: Hitesh Shah <hi...@apache.org>
Authored: Wed Sep 4 17:35:24 2013 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Wed Sep 4 17:35:24 2013 -0700

----------------------------------------------------------------------
 .../org/apache/tez/engine/newapi/Event.java     | 14 ----
 .../apache/tez/engine/newapi/SystemEvent.java   | 30 -------
 .../tez/engine/newapi/TezInputContext.java      |  6 ++
 .../tez/engine/newapi/TezOutputContext.java     |  7 ++
 .../tez/engine/newapi/TezTaskContext.java       |  4 +-
 .../org/apache/tez/engine/newapi/UserEvent.java | 77 ------------------
 .../newapi/events/InputDataErrorEvent.java      | 53 ++++++++++++
 .../newapi/events/TaskCommunicationEvent.java   | 84 ++++++++++++++++++++
 .../events/system/InputDataErrorEvent.java      | 53 ------------
 .../tez/engine/newapi/impl/EventMetaData.java   | 42 +++++++---
 .../tez/engine/newapi/rpc/impl/TezEvent.java    | 64 +++++++++++++++
 .../engine/newapi/rpc/impl/TezUserEvent.java    | 57 -------------
 .../engine/newapi/impl/TezInputContextImpl.java | 15 ++--
 .../newapi/impl/TezOutputContextImpl.java       | 19 +++--
 .../engine/newapi/impl/TezTaskContextImpl.java  | 18 +++--
 .../LogicalIOProcessorRuntimeTask.java          |  6 +-
 16 files changed, 287 insertions(+), 262 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc9cee33/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/Event.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/Event.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/Event.java
index f5bcdd8..20157c7 100644
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/Event.java
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/Event.java
@@ -25,18 +25,4 @@ package org.apache.tez.engine.newapi;
  */
 public abstract class Event {
 
-  public static enum EventType {
-    SYSTEM,
-    USER
-  }
-
-  private final EventType eventType;
-
-  public Event(EventType eventType) {
-    this.eventType = eventType;
-  }
-
-  public EventType getEventType() {
-    return eventType;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc9cee33/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/SystemEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/SystemEvent.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/SystemEvent.java
deleted file mode 100644
index be22f57..0000000
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/SystemEvent.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi;
-
-/**
- * Base class for all system events that are understood by the Tez framework.
- * SystemEvents can be generated either by the framework or the user code.
- */
-public abstract class SystemEvent extends Event {
-
-  protected SystemEvent() {
-    super(EventType.SYSTEM);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc9cee33/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezInputContext.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezInputContext.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezInputContext.java
index 38780ab..a56c9d4 100644
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezInputContext.java
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezInputContext.java
@@ -23,4 +23,10 @@ package org.apache.tez.engine.newapi;
  */
 public interface TezInputContext extends TezTaskContext {
 
+  /**
+   * Get the Vertex Name of the Source that generated data for this Input
+   * @return Name of the Source Vertex
+   */
+  public String getSourceVertexName();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc9cee33/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezOutputContext.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezOutputContext.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezOutputContext.java
index 3a0fceb..34c6028 100644
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezOutputContext.java
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezOutputContext.java
@@ -23,4 +23,11 @@ package org.apache.tez.engine.newapi;
  */
 public interface TezOutputContext extends TezTaskContext {
 
+  /**
+   * Get the Vertex Name of the Destination that is the recipient of this
+   * Output's data
+   * @return Name of the Destination Vertex
+   */
+  public String getDestinationVertexName();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc9cee33/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java
index 0724885..4cc5668 100644
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java
@@ -42,10 +42,10 @@ public interface TezTaskContext {
   public int getAttemptNumber();
 
   /**
-   * Get the name of the Vertex
+   * Get the name of the Vertex in which the task is running
    * @return Vertex Name
    */
-  public String getVertexName();
+  public String getTaskVertexName();
 
 
   public TezCounters getCounters();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc9cee33/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/UserEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/UserEvent.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/UserEvent.java
deleted file mode 100644
index e95eec4..0000000
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/UserEvent.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi;
-
-public class UserEvent extends Event {
-
-  /**
-   * Index(i) of the i-th (physical) Input or Output that generated an Event.
-   * For a Processor-generated event, this is ignored.
-   */
-  private final int sourceIndex;
-
-  /**
-   * Index(i) of the i-th (physical) Input or Output that is meant to receive
-   * this Event. For a Processor event, this is ignored.
-   */
-  private int targetIndex;
-
-  /**
-   * User Payload for this User Event
-   */
-  private final byte[] userPayload;
-
-  /**
-   * User Event constructor
-   * @param index Index to identify the physical edge of the input/output
-   * @param userPayload User Payload of the User Event
-   */
-  public UserEvent(int index,
-      byte[] userPayload) {
-    super(EventType.USER);
-    this.userPayload = userPayload;
-    this.sourceIndex = index;
-  }
-
-  /**
-   * Constructor for Processor-generated User Events
-   * @param userPayload
-   */
-  public UserEvent(byte[] userPayload) {
-    this(-1, userPayload);
-  }
-
-  public byte[] getUserPayload() {
-    return userPayload;
-  }
-
-  public int getSourceIndex() {
-    return sourceIndex;
-  }
-
-  public int getTargetIndex() {
-    return targetIndex;
-  }
-
-  void setTargetIndex(int targetIndex) {
-    this.targetIndex = targetIndex;
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc9cee33/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/InputDataErrorEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/InputDataErrorEvent.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/InputDataErrorEvent.java
new file mode 100644
index 0000000..4528e4d
--- /dev/null
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/InputDataErrorEvent.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.newapi.events;
+
+import org.apache.tez.engine.newapi.Event;
+
+/**
+ * Event generated by an Input to indicate error when trying to
+ * retrieve data.
+ */
+public class InputDataErrorEvent extends Event {
+
+  /**
+   * Diagnostics/trace of the error that occurred on the Input's edge.
+   */
+  private final String diagnostics;
+
+  /**
+   * Index of the physical edge on which the error occurred.
+   */
+  private final int index;
+
+  protected InputDataErrorEvent(String diagnostics, int index) {
+    super();
+    this.diagnostics = diagnostics;
+    this.index = index;
+  }
+
+  public String getDiagnostics() {
+    return diagnostics;
+  }
+
+  public int getIndex() {
+    return index;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc9cee33/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/TaskCommunicationEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/TaskCommunicationEvent.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/TaskCommunicationEvent.java
new file mode 100644
index 0000000..9d8f01a
--- /dev/null
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/TaskCommunicationEvent.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.newapi.events;
+
+import org.apache.tez.engine.newapi.Event;
+
+/**
+ * Event used by user code to send information between tasks. An output can
+ * generate an Event of this type to sending information regarding output data
+ * ( such as URI for file-based output data, port info in case of
+ * streaming-based data transfers ) to the Input on the destination vertex.
+ */
+public final class TaskCommunicationEvent extends Event {
+
+  /**
+   * Index(i) of the i-th (physical) Input or Output that generated an Event.
+   * For a Processor-generated event, this is ignored.
+   */
+  private final int sourceIndex;
+
+  /**
+   * Index(i) of the i-th (physical) Input or Output that is meant to receive
+   * this Event. For a Processor event, this is ignored.
+   */
+  private int targetIndex;
+
+  /**
+   * User Payload for this Event
+   */
+  private final byte[] userPayload;
+
+  /**
+   * User Event constructor
+   * @param index Index to identify the physical edge of the input/output
+   * @param userPayload User Payload of the User Event
+   */
+  public TaskCommunicationEvent(int index,
+      byte[] userPayload) {
+    this.userPayload = userPayload;
+    this.sourceIndex = index;
+  }
+
+  /**
+   * Constructor for Processor-generated User Events
+   * @param userPayload
+   */
+  public TaskCommunicationEvent(byte[] userPayload) {
+    this(-1, userPayload);
+  }
+
+  public byte[] getUserPayload() {
+    return userPayload;
+  }
+
+  public int getSourceIndex() {
+    return sourceIndex;
+  }
+
+  public int getTargetIndex() {
+    return targetIndex;
+  }
+
+  void setTargetIndex(int targetIndex) {
+    this.targetIndex = targetIndex;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc9cee33/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/system/InputDataErrorEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/system/InputDataErrorEvent.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/system/InputDataErrorEvent.java
deleted file mode 100644
index 0b55878..0000000
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/system/InputDataErrorEvent.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.events.system;
-
-import org.apache.tez.engine.newapi.SystemEvent;
-
-/**
- * System event generated by an Input to indicate error when trying to
- * retrieve data.
- */
-public class InputDataErrorEvent extends SystemEvent {
-
-  /**
-   * Diagnostics/trace of the error that occurred on the Input's edge.
-   */
-  private final String diagnostics;
-
-  /**
-   * Index of the physical edge on which the error occurred.
-   */
-  private final int index;
-
-  protected InputDataErrorEvent(String diagnostics, int index) {
-    super();
-    this.diagnostics = diagnostics;
-    this.index = index;
-  }
-
-  public String getDiagnostics() {
-    return diagnostics;
-  }
-
-  public int getIndex() {
-    return index;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc9cee33/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java
index 1ad133b..fe06bec 100644
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java
@@ -18,6 +18,8 @@
 
 package org.apache.tez.engine.newapi.impl;
 
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
 /**
  * Class that encapsulates all the information to identify the unique
  * object that either generated an Event or is the recipient of an Event.
@@ -27,7 +29,8 @@ public class EventMetaData {
   public static enum EventGenerator {
     INPUT,
     PROCESSOR,
-    OUTPUT
+    OUTPUT,
+    SYSTEM
   }
 
   /**
@@ -38,20 +41,41 @@ public class EventMetaData {
   /**
    * Name of the vertex where the event was generated.
    */
-  private final String vertexName;
+  private final String taskVertexName;
+
+  /**
+   * Name of the vertex to which the Input or Output is connected to.
+   */
+  private final String edgeVertexName;
 
-  public EventMetaData(EventGenerator idType,
-      String vertexName) {
-    this.generator = idType;
-    this.vertexName = vertexName;
+  /**
+   * Task Attempt ID
+   */
+  private final TezTaskAttemptID taskAttemptID;
+
+  public EventMetaData(EventGenerator generator,
+      String taskVertexName, String edgeVertexName,
+      TezTaskAttemptID taskAttemptID) {
+    this.generator = generator;
+    this.taskVertexName = taskVertexName;
+    this.edgeVertexName = edgeVertexName;
+    this.taskAttemptID = taskAttemptID;
   }
 
-  public EventGenerator getIDType() {
+  public EventGenerator getEventGenerator() {
     return generator;
   }
 
-  public String getVertexName() {
-    return vertexName;
+  public TezTaskAttemptID getTaskAttemptID() {
+    return taskAttemptID;
+  }
+
+  public String getTaskVertexName() {
+    return taskVertexName;
+  }
+
+  public String getEdgeVertexName() {
+    return edgeVertexName;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc9cee33/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TezEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TezEvent.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TezEvent.java
new file mode 100644
index 0000000..19174c8
--- /dev/null
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TezEvent.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.newapi.rpc.impl;
+
+import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.impl.EventMetaData;
+
+public class TezEvent {
+
+  private final String eventClassName;
+
+  private final Event event;
+
+  private EventMetaData sourceInfo;
+
+  private EventMetaData targetInfo;
+
+  public TezEvent(Event event, EventMetaData sourceInfo) {
+    this.event = event;
+    this.eventClassName = event.getClass().getName();
+    this.setSourceInfo(sourceInfo);
+  }
+
+  public Event getEvent() {
+    return event;
+  }
+
+  public EventMetaData getSourceInfo() {
+    return sourceInfo;
+  }
+
+  public void setSourceInfo(EventMetaData sourceInfo) {
+    this.sourceInfo = sourceInfo;
+  }
+
+  public EventMetaData getTargetInfo() {
+    return targetInfo;
+  }
+
+  public void setTargetInfo(EventMetaData targetInfo) {
+    this.targetInfo = targetInfo;
+  }
+
+  public String getEventClassName() {
+    return eventClassName;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc9cee33/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TezUserEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TezUserEvent.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TezUserEvent.java
deleted file mode 100644
index 03d5b7b..0000000
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TezUserEvent.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.rpc.impl;
-
-import org.apache.tez.engine.newapi.UserEvent;
-import org.apache.tez.engine.newapi.impl.EventMetaData;
-
-public class TezUserEvent {
-
-  private final UserEvent userEvent;
-
-  private EventMetaData sourceInfo;
-
-  private EventMetaData targetInfo;
-
-  public TezUserEvent(UserEvent userEvent, EventMetaData sourceInfo) {
-    this.userEvent = userEvent;
-    this.setSourceInfo(sourceInfo);
-  }
-
-  public UserEvent getUserEvent() {
-    return userEvent;
-  }
-
-  public EventMetaData getSourceInfo() {
-    return sourceInfo;
-  }
-
-  public void setSourceInfo(EventMetaData sourceInfo) {
-    this.sourceInfo = sourceInfo;
-  }
-
-  public EventMetaData getTargetInfo() {
-    return targetInfo;
-  }
-
-  public void setTargetInfo(EventMetaData targetInfo) {
-    this.targetInfo = targetInfo;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc9cee33/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
index 4b04474..567057a 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
@@ -23,7 +23,6 @@ import java.util.List;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.engine.newapi.Event;
 import org.apache.tez.engine.newapi.TezInputContext;
@@ -32,13 +31,15 @@ public class TezInputContextImpl extends TezTaskContextImpl
     implements TezInputContext {
 
   private final byte[] userPayload;
+  private final String sourceVertexName;
 
   @Private
-  public TezInputContextImpl(Configuration tezConf, String vertexName,
-      TezTaskAttemptID taskAttemptID, TezCounters counters,
-      byte[] userPayload) {
-    super(tezConf, vertexName, taskAttemptID, counters);
+  public TezInputContextImpl(Configuration conf, String taskVertexName,
+      String sourceVertexName, TezTaskAttemptID taskAttemptID,
+      TezCounters counters, byte[] userPayload) {
+    super(conf, taskVertexName, taskAttemptID, counters);
     this.userPayload = userPayload;
+    this.sourceVertexName = sourceVertexName;
   }
 
   @Override
@@ -52,5 +53,9 @@ public class TezInputContextImpl extends TezTaskContextImpl
     return userPayload;
   }
 
+  @Override
+  public String getSourceVertexName() {
+    return sourceVertexName;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc9cee33/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
index 9e488e2..a2ce60b 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
@@ -27,16 +27,20 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.engine.newapi.Event;
 import org.apache.tez.engine.newapi.TezOutputContext;
 
-public class TezOutputContextImpl extends TezTaskContextImpl implements
-    TezOutputContext {
+public class TezOutputContextImpl extends TezTaskContextImpl
+    implements TezOutputContext {
 
   private final byte[] userPayload;
+  private final String destinationVertexName;
 
   @Private
-  public TezOutputContextImpl(Configuration tezConf, String vertexName,
-      TezTaskAttemptID taskAttemptID, TezCounters counters, byte[] userPayload) {
-    super(tezConf, vertexName, taskAttemptID, counters);
+  public TezOutputContextImpl(Configuration conf, String taskVertexName,
+      String destinationVertexName,
+      TezTaskAttemptID taskAttemptID, TezCounters counters,
+      byte[] userPayload) {
+    super(conf, taskVertexName, taskAttemptID, counters);
     this.userPayload = userPayload;
+    this.destinationVertexName = destinationVertexName;
   }
 
   @Override
@@ -50,4 +54,9 @@ public class TezOutputContextImpl extends TezTaskContextImpl implements
     return userPayload;
   }
 
+  @Override
+  public String getDestinationVertexName() {
+    return destinationVertexName;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc9cee33/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
index 862b6d2..c89003e 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
@@ -26,17 +26,17 @@ import org.apache.tez.engine.newapi.TezTaskContext;
 
 public abstract class TezTaskContextImpl implements TezTaskContext {
 
-  private final Configuration tezConf;
-  private final String vertexName;
+  private final Configuration conf;
+  private final String taskVertexName;
   private final TezTaskAttemptID taskAttemptID;
   private final TezCounters counters;
 
   @Private
-  public TezTaskContextImpl(Configuration tezConf,
-      String vertexName, TezTaskAttemptID taskAttemptID,
+  public TezTaskContextImpl(Configuration conf,
+      String taskVertexName, TezTaskAttemptID taskAttemptID,
       TezCounters counters) {
-    this.tezConf = tezConf;
-    this.vertexName = vertexName;
+    this.conf = conf;
+    this.taskVertexName = taskVertexName;
     this.taskAttemptID = taskAttemptID;
     this.counters = counters;
   }
@@ -52,10 +52,12 @@ public abstract class TezTaskContextImpl implements TezTaskContext {
   }
 
   @Override
-  public String getVertexName() {
-    return vertexName;
+  public String getTaskVertexName() {
+    // TODO Auto-generated method stub
+    return taskVertexName;
   }
 
+
   @Override
   public TezCounters getCounters() {
     return counters;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc9cee33/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
index a31346c..7cec1f9 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
@@ -215,14 +215,16 @@ public class LogicalIOProcessorRuntimeTask {
 
   private TezInputContext createInputContext(InputSpec inputSpec) {
     TezInputContext inputContext = new TezInputContextImpl(tezConf,
-        taskSpec.getVertexName(), taskSpec.getTaskAttemptID(), tezCounters,
+        taskSpec.getVertexName(), inputSpec.getSourceVertexName(),
+        taskSpec.getTaskAttemptID(), tezCounters,
         inputSpec.getInputDescriptor().getUserPayload());
     return inputContext;
   }
 
   private TezOutputContext createOutputContext(OutputSpec outputSpec) {
     TezOutputContext outputContext = new TezOutputContextImpl(tezConf,
-        taskSpec.getVertexName(), taskSpec.getTaskAttemptID(), tezCounters,
+        taskSpec.getVertexName(), outputSpec.getDestinationVertexName(),
+        taskSpec.getTaskAttemptID(), tezCounters,
         outputSpec.getOutputDescriptor().getUserPayload());
     return outputContext;
   }