You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/09/11 21:04:08 UTC

git commit: TEZ-434. Rename EventMetaData Generator to enforce clarity on both producer/consumer semantics. (hitesh)

Updated Branches:
  refs/heads/TEZ-398 1cf7f197d -> 0f5298a5e


TEZ-434. Rename EventMetaData Generator to enforce clarity on both producer/consumer semantics. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/0f5298a5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/0f5298a5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/0f5298a5

Branch: refs/heads/TEZ-398
Commit: 0f5298a5e6892285d2438f6bf00855c40251bb10
Parents: 1cf7f19
Author: Hitesh Shah <hi...@apache.org>
Authored: Wed Sep 11 12:03:49 2013 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Wed Sep 11 12:03:49 2013 -0700

----------------------------------------------------------------------
 .../apache/hadoop/mapred/YarnTezDagChild.java    | 13 +++++--------
 .../tez/engine/newapi/impl/EventMetaData.java    | 19 ++++++++++---------
 .../apache/tez/engine/newapi/impl/TezEvent.java  |  2 ++
 .../engine/newapi/impl/TezInputContextImpl.java  |  4 ++--
 .../engine/newapi/impl/TezOutputContextImpl.java |  6 +++---
 .../newapi/impl/TezProcessorContextImpl.java     |  4 ++--
 6 files changed, 24 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0f5298a5/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index 6a0fff4..1106d7c 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -83,7 +83,7 @@ import org.apache.tez.engine.newapi.impl.TezEvent;
 import org.apache.tez.engine.newapi.impl.TezHeartbeatRequest;
 import org.apache.tez.engine.newapi.impl.TezHeartbeatResponse;
 import org.apache.tez.engine.newapi.impl.TezUmbilical;
-import org.apache.tez.engine.newapi.impl.EventMetaData.EventGenerator;
+import org.apache.tez.engine.newapi.impl.EventMetaData.EventProducerConsumerType;
 import org.apache.tez.engine.newruntime.LogicalIOProcessorRuntimeTask;
 import org.apache.tez.mapreduce.input.SimpleInput;
 import org.apache.tez.mapreduce.output.SimpleOutput;
@@ -176,11 +176,6 @@ public class YarnTezDagChild {
               continue;
             }
             // TODO TODONEWTEZ
-            if (!e.getDestinationInfo().getTaskAttemptID().equals(
-                currentTaskAttemptID)) {
-              // error? or block?
-              continue;
-            }
             try {
               taskLock.readLock().lock();
               if (currentTask != null) {
@@ -233,7 +228,8 @@ public class YarnTezDagChild {
   }
 
   public static void main(String[] args) throws Throwable {
-    Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
+    Thread.setDefaultUncaughtExceptionHandler(
+        new YarnUncaughtExceptionHandler());
     if (LOG.isDebugEnabled()) {
       LOG.debug("Child starting");
     }
@@ -386,7 +382,8 @@ public class YarnTezDagChild {
         childUGI.doAs(new PrivilegedExceptionAction<Object>() {
           @Override
           public Object run() throws Exception {
-            EventMetaData sourceInfo = new EventMetaData(EventGenerator.SYSTEM,
+            EventMetaData sourceInfo = new EventMetaData(
+                EventProducerConsumerType.SYSTEM,
                 taskSpec.getVertexName(), "", currentTaskAttemptID);
             try {
               currentTask.initialize();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0f5298a5/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java
index 3ef271f..9ad71e6 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java
@@ -31,7 +31,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
  */
 public class EventMetaData implements Writable {
 
-  public static enum EventGenerator {
+  public static enum EventProducerConsumerType {
     INPUT,
     PROCESSOR,
     OUTPUT,
@@ -39,9 +39,10 @@ public class EventMetaData implements Writable {
   }
 
   /**
-   * Source Type ( one of Input/Output/Processor ) that generated the Event.
+   * Producer Type ( one of Input/Output/Processor ) that generated the Event
+   * or Consumer Type that will consume the Event.
    */
-  private EventGenerator generator;
+  private EventProducerConsumerType producerConsumerType;
 
   /**
    * Name of the vertex where the event was generated.
@@ -66,17 +67,17 @@ public class EventMetaData implements Writable {
   public EventMetaData() {
   }
 
-  public EventMetaData(EventGenerator generator,
+  public EventMetaData(EventProducerConsumerType generator,
       String taskVertexName, String edgeVertexName,
       TezTaskAttemptID taskAttemptID) {
-    this.generator = generator;
+    this.producerConsumerType = generator;
     this.taskVertexName = taskVertexName;
     this.edgeVertexName = edgeVertexName;
     this.taskAttemptID = taskAttemptID;
   }
 
-  public EventGenerator getEventGenerator() {
-    return generator;
+  public EventProducerConsumerType getEventGenerator() {
+    return producerConsumerType;
   }
 
   public TezTaskAttemptID getTaskAttemptID() {
@@ -93,7 +94,7 @@ public class EventMetaData implements Writable {
 
   @Override
   public void write(DataOutput out) throws IOException {
-    out.writeInt(generator.ordinal());
+    out.writeInt(producerConsumerType.ordinal());
     if (taskVertexName != null) {
       out.writeBoolean(true);
       out.writeUTF(taskVertexName);
@@ -112,7 +113,7 @@ public class EventMetaData implements Writable {
 
   @Override
   public void readFields(DataInput in) throws IOException {
-    generator = EventGenerator.values()[in.readInt()];
+    producerConsumerType = EventProducerConsumerType.values()[in.readInt()];
     if (in.readBoolean()) {
       taskVertexName = in.readUTF();
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0f5298a5/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezEvent.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezEvent.java
index f8cc3ed..fdb8754 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezEvent.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezEvent.java
@@ -220,9 +220,11 @@ public class TezEvent implements Writable {
   public void readFields(DataInput in) throws IOException {
     deserializeEvent(in);
     if (in.readBoolean()) {
+      sourceInfo = new EventMetaData();
       sourceInfo.readFields(in);
     }
     if (in.readBoolean()) {
+      destinationInfo = new EventMetaData();
       destinationInfo.readFields(in);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0f5298a5/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
index 1d76d86..604cbff 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
@@ -27,7 +27,7 @@ import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.engine.newapi.Event;
 import org.apache.tez.engine.newapi.TezInputContext;
-import org.apache.tez.engine.newapi.impl.EventMetaData.EventGenerator;
+import org.apache.tez.engine.newapi.impl.EventMetaData.EventProducerConsumerType;
 
 public class TezInputContextImpl extends TezTaskContextImpl
     implements TezInputContext {
@@ -47,7 +47,7 @@ public class TezInputContextImpl extends TezTaskContextImpl
     this.userPayload = userPayload;
     this.sourceVertexName = sourceVertexName;
     this.sourceInfo = new EventMetaData(
-        EventGenerator.INPUT, taskVertexName, sourceVertexName,
+        EventProducerConsumerType.INPUT, taskVertexName, sourceVertexName,
         taskAttemptID);
     this.uniqueIdentifier = String.format("%s_%s_%6d_%2d_%s", taskAttemptID
         .getTaskID().getVertexID().getDAGId().toString(), taskVertexName,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0f5298a5/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
index e5b81d0..c4a069e 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
@@ -27,7 +27,7 @@ import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.engine.newapi.Event;
 import org.apache.tez.engine.newapi.TezOutputContext;
-import org.apache.tez.engine.newapi.impl.EventMetaData.EventGenerator;
+import org.apache.tez.engine.newapi.impl.EventMetaData.EventProducerConsumerType;
 
 public class TezOutputContextImpl extends TezTaskContextImpl
     implements TezOutputContext {
@@ -47,8 +47,8 @@ public class TezOutputContextImpl extends TezTaskContextImpl
     this.userPayload = userPayload;
     this.destinationVertexName = destinationVertexName;
     this.tezUmbilical = tezUmbilical;
-    this.sourceInfo = new EventMetaData(EventGenerator.OUTPUT, taskVertexName,
-        destinationVertexName, taskAttemptID);
+    this.sourceInfo = new EventMetaData(EventProducerConsumerType.OUTPUT,
+        taskVertexName, destinationVertexName, taskAttemptID);
     this.uniqueIdentifier = String.format("%s_%s_%6d_%2d_%s", taskAttemptID
         .getTaskID().getVertexID().getDAGId().toString(), taskVertexName,
         getTaskIndex(), getAttemptNumber(), destinationVertexName);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0f5298a5/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
index 73c4a54..6f61102 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
@@ -26,7 +26,7 @@ import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.engine.newapi.Event;
 import org.apache.tez.engine.newapi.TezProcessorContext;
-import org.apache.tez.engine.newapi.impl.EventMetaData.EventGenerator;
+import org.apache.tez.engine.newapi.impl.EventMetaData.EventProducerConsumerType;
 
 public class TezProcessorContextImpl extends TezTaskContextImpl
   implements TezProcessorContext {
@@ -42,7 +42,7 @@ public class TezProcessorContextImpl extends TezTaskContextImpl
     super(tezConf, vertexName, taskAttemptID, counters);
     this.userPayload = userPayload;
     this.tezUmbilical = tezUmbilical;
-    this.sourceInfo = new EventMetaData(EventGenerator.PROCESSOR,
+    this.sourceInfo = new EventMetaData(EventProducerConsumerType.PROCESSOR,
         taskVertexName, "", taskAttemptID);
     this.uniqueIdentifier = String.format("%s_%s_%6d_%2d", taskAttemptID
         .getTaskID().getVertexID().getDAGId().toString(), taskVertexName,