You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/09/05 02:35:52 UTC
git commit: TEZ-416. Simplify Event class heirarchy. (hitesh)
Updated Branches:
refs/heads/TEZ-398 52ff7aca3 -> bc9cee33c
TEZ-416. Simplify Event class heirarchy. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/bc9cee33
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/bc9cee33
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/bc9cee33
Branch: refs/heads/TEZ-398
Commit: bc9cee33c0a9c17e1b664c9521ef6c6ab4362c1c
Parents: 52ff7ac
Author: Hitesh Shah <hi...@apache.org>
Authored: Wed Sep 4 17:35:24 2013 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Wed Sep 4 17:35:24 2013 -0700
----------------------------------------------------------------------
.../org/apache/tez/engine/newapi/Event.java | 14 ----
.../apache/tez/engine/newapi/SystemEvent.java | 30 -------
.../tez/engine/newapi/TezInputContext.java | 6 ++
.../tez/engine/newapi/TezOutputContext.java | 7 ++
.../tez/engine/newapi/TezTaskContext.java | 4 +-
.../org/apache/tez/engine/newapi/UserEvent.java | 77 ------------------
.../newapi/events/InputDataErrorEvent.java | 53 ++++++++++++
.../newapi/events/TaskCommunicationEvent.java | 84 ++++++++++++++++++++
.../events/system/InputDataErrorEvent.java | 53 ------------
.../tez/engine/newapi/impl/EventMetaData.java | 42 +++++++---
.../tez/engine/newapi/rpc/impl/TezEvent.java | 64 +++++++++++++++
.../engine/newapi/rpc/impl/TezUserEvent.java | 57 -------------
.../engine/newapi/impl/TezInputContextImpl.java | 15 ++--
.../newapi/impl/TezOutputContextImpl.java | 19 +++--
.../engine/newapi/impl/TezTaskContextImpl.java | 18 +++--
.../LogicalIOProcessorRuntimeTask.java | 6 +-
16 files changed, 287 insertions(+), 262 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc9cee33/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/Event.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/Event.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/Event.java
index f5bcdd8..20157c7 100644
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/Event.java
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/Event.java
@@ -25,18 +25,4 @@ package org.apache.tez.engine.newapi;
*/
public abstract class Event {
- public static enum EventType {
- SYSTEM,
- USER
- }
-
- private final EventType eventType;
-
- public Event(EventType eventType) {
- this.eventType = eventType;
- }
-
- public EventType getEventType() {
- return eventType;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc9cee33/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/SystemEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/SystemEvent.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/SystemEvent.java
deleted file mode 100644
index be22f57..0000000
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/SystemEvent.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi;
-
-/**
- * Base class for all system events that are understood by the Tez framework.
- * SystemEvents can be generated either by the framework or the user code.
- */
-public abstract class SystemEvent extends Event {
-
- protected SystemEvent() {
- super(EventType.SYSTEM);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc9cee33/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezInputContext.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezInputContext.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezInputContext.java
index 38780ab..a56c9d4 100644
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezInputContext.java
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezInputContext.java
@@ -23,4 +23,10 @@ package org.apache.tez.engine.newapi;
*/
public interface TezInputContext extends TezTaskContext {
+ /**
+ * Get the Vertex Name of the Source that generated data for this Input
+ * @return Name of the Source Vertex
+ */
+ public String getSourceVertexName();
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc9cee33/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezOutputContext.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezOutputContext.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezOutputContext.java
index 3a0fceb..34c6028 100644
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezOutputContext.java
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezOutputContext.java
@@ -23,4 +23,11 @@ package org.apache.tez.engine.newapi;
*/
public interface TezOutputContext extends TezTaskContext {
+ /**
+ * Get the Vertex Name of the Destination that is the recipient of this
+ * Output's data
+ * @return Name of the Destination Vertex
+ */
+ public String getDestinationVertexName();
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc9cee33/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java
index 0724885..4cc5668 100644
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java
@@ -42,10 +42,10 @@ public interface TezTaskContext {
public int getAttemptNumber();
/**
- * Get the name of the Vertex
+ * Get the name of the Vertex in which the task is running
* @return Vertex Name
*/
- public String getVertexName();
+ public String getTaskVertexName();
public TezCounters getCounters();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc9cee33/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/UserEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/UserEvent.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/UserEvent.java
deleted file mode 100644
index e95eec4..0000000
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/UserEvent.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi;
-
-public class UserEvent extends Event {
-
- /**
- * Index(i) of the i-th (physical) Input or Output that generated an Event.
- * For a Processor-generated event, this is ignored.
- */
- private final int sourceIndex;
-
- /**
- * Index(i) of the i-th (physical) Input or Output that is meant to receive
- * this Event. For a Processor event, this is ignored.
- */
- private int targetIndex;
-
- /**
- * User Payload for this User Event
- */
- private final byte[] userPayload;
-
- /**
- * User Event constructor
- * @param index Index to identify the physical edge of the input/output
- * @param userPayload User Payload of the User Event
- */
- public UserEvent(int index,
- byte[] userPayload) {
- super(EventType.USER);
- this.userPayload = userPayload;
- this.sourceIndex = index;
- }
-
- /**
- * Constructor for Processor-generated User Events
- * @param userPayload
- */
- public UserEvent(byte[] userPayload) {
- this(-1, userPayload);
- }
-
- public byte[] getUserPayload() {
- return userPayload;
- }
-
- public int getSourceIndex() {
- return sourceIndex;
- }
-
- public int getTargetIndex() {
- return targetIndex;
- }
-
- void setTargetIndex(int targetIndex) {
- this.targetIndex = targetIndex;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc9cee33/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/InputDataErrorEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/InputDataErrorEvent.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/InputDataErrorEvent.java
new file mode 100644
index 0000000..4528e4d
--- /dev/null
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/InputDataErrorEvent.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.newapi.events;
+
+import org.apache.tez.engine.newapi.Event;
+
+/**
+ * Event generated by an Input to indicate error when trying to
+ * retrieve data.
+ */
+public class InputDataErrorEvent extends Event {
+
+ /**
+ * Diagnostics/trace of the error that occurred on the Input's edge.
+ */
+ private final String diagnostics;
+
+ /**
+ * Index of the physical edge on which the error occurred.
+ */
+ private final int index;
+
+ protected InputDataErrorEvent(String diagnostics, int index) {
+ super();
+ this.diagnostics = diagnostics;
+ this.index = index;
+ }
+
+ public String getDiagnostics() {
+ return diagnostics;
+ }
+
+ public int getIndex() {
+ return index;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc9cee33/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/TaskCommunicationEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/TaskCommunicationEvent.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/TaskCommunicationEvent.java
new file mode 100644
index 0000000..9d8f01a
--- /dev/null
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/TaskCommunicationEvent.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.newapi.events;
+
+import org.apache.tez.engine.newapi.Event;
+
+/**
+ * Event used by user code to send information between tasks. An output can
+ * generate an Event of this type to sending information regarding output data
+ * ( such as URI for file-based output data, port info in case of
+ * streaming-based data transfers ) to the Input on the destination vertex.
+ */
+public final class TaskCommunicationEvent extends Event {
+
+ /**
+ * Index(i) of the i-th (physical) Input or Output that generated an Event.
+ * For a Processor-generated event, this is ignored.
+ */
+ private final int sourceIndex;
+
+ /**
+ * Index(i) of the i-th (physical) Input or Output that is meant to receive
+ * this Event. For a Processor event, this is ignored.
+ */
+ private int targetIndex;
+
+ /**
+ * User Payload for this Event
+ */
+ private final byte[] userPayload;
+
+ /**
+ * User Event constructor
+ * @param index Index to identify the physical edge of the input/output
+ * @param userPayload User Payload of the User Event
+ */
+ public TaskCommunicationEvent(int index,
+ byte[] userPayload) {
+ this.userPayload = userPayload;
+ this.sourceIndex = index;
+ }
+
+ /**
+ * Constructor for Processor-generated User Events
+ * @param userPayload
+ */
+ public TaskCommunicationEvent(byte[] userPayload) {
+ this(-1, userPayload);
+ }
+
+ public byte[] getUserPayload() {
+ return userPayload;
+ }
+
+ public int getSourceIndex() {
+ return sourceIndex;
+ }
+
+ public int getTargetIndex() {
+ return targetIndex;
+ }
+
+ void setTargetIndex(int targetIndex) {
+ this.targetIndex = targetIndex;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc9cee33/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/system/InputDataErrorEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/system/InputDataErrorEvent.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/system/InputDataErrorEvent.java
deleted file mode 100644
index 0b55878..0000000
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/system/InputDataErrorEvent.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.events.system;
-
-import org.apache.tez.engine.newapi.SystemEvent;
-
-/**
- * System event generated by an Input to indicate error when trying to
- * retrieve data.
- */
-public class InputDataErrorEvent extends SystemEvent {
-
- /**
- * Diagnostics/trace of the error that occurred on the Input's edge.
- */
- private final String diagnostics;
-
- /**
- * Index of the physical edge on which the error occurred.
- */
- private final int index;
-
- protected InputDataErrorEvent(String diagnostics, int index) {
- super();
- this.diagnostics = diagnostics;
- this.index = index;
- }
-
- public String getDiagnostics() {
- return diagnostics;
- }
-
- public int getIndex() {
- return index;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc9cee33/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java
index 1ad133b..fe06bec 100644
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java
@@ -18,6 +18,8 @@
package org.apache.tez.engine.newapi.impl;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
/**
* Class that encapsulates all the information to identify the unique
* object that either generated an Event or is the recipient of an Event.
@@ -27,7 +29,8 @@ public class EventMetaData {
public static enum EventGenerator {
INPUT,
PROCESSOR,
- OUTPUT
+ OUTPUT,
+ SYSTEM
}
/**
@@ -38,20 +41,41 @@ public class EventMetaData {
/**
* Name of the vertex where the event was generated.
*/
- private final String vertexName;
+ private final String taskVertexName;
+
+ /**
+ * Name of the vertex to which the Input or Output is connected to.
+ */
+ private final String edgeVertexName;
- public EventMetaData(EventGenerator idType,
- String vertexName) {
- this.generator = idType;
- this.vertexName = vertexName;
+ /**
+ * Task Attempt ID
+ */
+ private final TezTaskAttemptID taskAttemptID;
+
+ public EventMetaData(EventGenerator generator,
+ String taskVertexName, String edgeVertexName,
+ TezTaskAttemptID taskAttemptID) {
+ this.generator = generator;
+ this.taskVertexName = taskVertexName;
+ this.edgeVertexName = edgeVertexName;
+ this.taskAttemptID = taskAttemptID;
}
- public EventGenerator getIDType() {
+ public EventGenerator getEventGenerator() {
return generator;
}
- public String getVertexName() {
- return vertexName;
+ public TezTaskAttemptID getTaskAttemptID() {
+ return taskAttemptID;
+ }
+
+ public String getTaskVertexName() {
+ return taskVertexName;
+ }
+
+ public String getEdgeVertexName() {
+ return edgeVertexName;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc9cee33/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TezEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TezEvent.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TezEvent.java
new file mode 100644
index 0000000..19174c8
--- /dev/null
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TezEvent.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.newapi.rpc.impl;
+
+import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.impl.EventMetaData;
+
+public class TezEvent {
+
+ private final String eventClassName;
+
+ private final Event event;
+
+ private EventMetaData sourceInfo;
+
+ private EventMetaData targetInfo;
+
+ public TezEvent(Event event, EventMetaData sourceInfo) {
+ this.event = event;
+ this.eventClassName = event.getClass().getName();
+ this.setSourceInfo(sourceInfo);
+ }
+
+ public Event getEvent() {
+ return event;
+ }
+
+ public EventMetaData getSourceInfo() {
+ return sourceInfo;
+ }
+
+ public void setSourceInfo(EventMetaData sourceInfo) {
+ this.sourceInfo = sourceInfo;
+ }
+
+ public EventMetaData getTargetInfo() {
+ return targetInfo;
+ }
+
+ public void setTargetInfo(EventMetaData targetInfo) {
+ this.targetInfo = targetInfo;
+ }
+
+ public String getEventClassName() {
+ return eventClassName;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc9cee33/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TezUserEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TezUserEvent.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TezUserEvent.java
deleted file mode 100644
index 03d5b7b..0000000
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TezUserEvent.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.rpc.impl;
-
-import org.apache.tez.engine.newapi.UserEvent;
-import org.apache.tez.engine.newapi.impl.EventMetaData;
-
-public class TezUserEvent {
-
- private final UserEvent userEvent;
-
- private EventMetaData sourceInfo;
-
- private EventMetaData targetInfo;
-
- public TezUserEvent(UserEvent userEvent, EventMetaData sourceInfo) {
- this.userEvent = userEvent;
- this.setSourceInfo(sourceInfo);
- }
-
- public UserEvent getUserEvent() {
- return userEvent;
- }
-
- public EventMetaData getSourceInfo() {
- return sourceInfo;
- }
-
- public void setSourceInfo(EventMetaData sourceInfo) {
- this.sourceInfo = sourceInfo;
- }
-
- public EventMetaData getTargetInfo() {
- return targetInfo;
- }
-
- public void setTargetInfo(EventMetaData targetInfo) {
- this.targetInfo = targetInfo;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc9cee33/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
index 4b04474..567057a 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
@@ -23,7 +23,6 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.engine.newapi.Event;
import org.apache.tez.engine.newapi.TezInputContext;
@@ -32,13 +31,15 @@ public class TezInputContextImpl extends TezTaskContextImpl
implements TezInputContext {
private final byte[] userPayload;
+ private final String sourceVertexName;
@Private
- public TezInputContextImpl(Configuration tezConf, String vertexName,
- TezTaskAttemptID taskAttemptID, TezCounters counters,
- byte[] userPayload) {
- super(tezConf, vertexName, taskAttemptID, counters);
+ public TezInputContextImpl(Configuration conf, String taskVertexName,
+ String sourceVertexName, TezTaskAttemptID taskAttemptID,
+ TezCounters counters, byte[] userPayload) {
+ super(conf, taskVertexName, taskAttemptID, counters);
this.userPayload = userPayload;
+ this.sourceVertexName = sourceVertexName;
}
@Override
@@ -52,5 +53,9 @@ public class TezInputContextImpl extends TezTaskContextImpl
return userPayload;
}
+ @Override
+ public String getSourceVertexName() {
+ return sourceVertexName;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc9cee33/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
index 9e488e2..a2ce60b 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
@@ -27,16 +27,20 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.engine.newapi.Event;
import org.apache.tez.engine.newapi.TezOutputContext;
-public class TezOutputContextImpl extends TezTaskContextImpl implements
- TezOutputContext {
+public class TezOutputContextImpl extends TezTaskContextImpl
+ implements TezOutputContext {
private final byte[] userPayload;
+ private final String destinationVertexName;
@Private
- public TezOutputContextImpl(Configuration tezConf, String vertexName,
- TezTaskAttemptID taskAttemptID, TezCounters counters, byte[] userPayload) {
- super(tezConf, vertexName, taskAttemptID, counters);
+ public TezOutputContextImpl(Configuration conf, String taskVertexName,
+ String destinationVertexName,
+ TezTaskAttemptID taskAttemptID, TezCounters counters,
+ byte[] userPayload) {
+ super(conf, taskVertexName, taskAttemptID, counters);
this.userPayload = userPayload;
+ this.destinationVertexName = destinationVertexName;
}
@Override
@@ -50,4 +54,9 @@ public class TezOutputContextImpl extends TezTaskContextImpl implements
return userPayload;
}
+ @Override
+ public String getDestinationVertexName() {
+ return destinationVertexName;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc9cee33/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
index 862b6d2..c89003e 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
@@ -26,17 +26,17 @@ import org.apache.tez.engine.newapi.TezTaskContext;
public abstract class TezTaskContextImpl implements TezTaskContext {
- private final Configuration tezConf;
- private final String vertexName;
+ private final Configuration conf;
+ private final String taskVertexName;
private final TezTaskAttemptID taskAttemptID;
private final TezCounters counters;
@Private
- public TezTaskContextImpl(Configuration tezConf,
- String vertexName, TezTaskAttemptID taskAttemptID,
+ public TezTaskContextImpl(Configuration conf,
+ String taskVertexName, TezTaskAttemptID taskAttemptID,
TezCounters counters) {
- this.tezConf = tezConf;
- this.vertexName = vertexName;
+ this.conf = conf;
+ this.taskVertexName = taskVertexName;
this.taskAttemptID = taskAttemptID;
this.counters = counters;
}
@@ -52,10 +52,12 @@ public abstract class TezTaskContextImpl implements TezTaskContext {
}
@Override
- public String getVertexName() {
- return vertexName;
+ public String getTaskVertexName() {
+ // TODO Auto-generated method stub
+ return taskVertexName;
}
+
@Override
public TezCounters getCounters() {
return counters;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc9cee33/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
index a31346c..7cec1f9 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
@@ -215,14 +215,16 @@ public class LogicalIOProcessorRuntimeTask {
private TezInputContext createInputContext(InputSpec inputSpec) {
TezInputContext inputContext = new TezInputContextImpl(tezConf,
- taskSpec.getVertexName(), taskSpec.getTaskAttemptID(), tezCounters,
+ taskSpec.getVertexName(), inputSpec.getSourceVertexName(),
+ taskSpec.getTaskAttemptID(), tezCounters,
inputSpec.getInputDescriptor().getUserPayload());
return inputContext;
}
private TezOutputContext createOutputContext(OutputSpec outputSpec) {
TezOutputContext outputContext = new TezOutputContextImpl(tezConf,
- taskSpec.getVertexName(), taskSpec.getTaskAttemptID(), tezCounters,
+ taskSpec.getVertexName(), outputSpec.getDestinationVertexName(),
+ taskSpec.getTaskAttemptID(), tezCounters,
outputSpec.getOutputDescriptor().getUserPayload());
return outputContext;
}