You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/10/22 20:07:49 UTC

git commit: TEZ-574. Avoid creating copies of payloads when sending *Specs over the wire. (sseth)

Updated Branches:
  refs/heads/master 142910630 -> a6908afa8


TEZ-574. Avoid creating copies of payloads when sending *Specs over the
wire. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/a6908afa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/a6908afa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/a6908afa

Branch: refs/heads/master
Commit: a6908afa8c2832e9d3fd4420bb55462f848eb00c
Parents: 1429106
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Oct 22 11:07:26 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue Oct 22 11:07:26 2013 -0700

----------------------------------------------------------------------
 .../org/apache/tez/dag/api/InputDescriptor.java |  7 ++++
 .../apache/tez/dag/api/OutputDescriptor.java    |  7 ++++
 .../apache/tez/dag/api/ProcessorDescriptor.java |  7 ++++
 .../apache/tez/dag/api/TezEntityDescriptor.java | 35 +++++++++++++++++++-
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  4 +++
 .../apache/tez/runtime/api/impl/InputSpec.java  | 17 +++-------
 .../apache/tez/runtime/api/impl/OutputSpec.java | 17 +++-------
 .../apache/tez/runtime/api/impl/TaskSpec.java   | 18 +++-------
 8 files changed, 71 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a6908afa/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java
index dea9001..24b542a 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java
@@ -18,8 +18,15 @@
 
 package org.apache.tez.dag.api;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+
 public class InputDescriptor extends TezEntityDescriptor {
 
+  @Private // for Writable
+  public InputDescriptor() {
+    super();
+  }
+  
   public InputDescriptor(String inputClassName) {
     super(inputClassName);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a6908afa/tez-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java
index 16fb9b1..d6bde35 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java
@@ -18,8 +18,15 @@
 
 package org.apache.tez.dag.api;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+
 public class OutputDescriptor extends TezEntityDescriptor {
 
+  @Private // for Writable
+  public OutputDescriptor() {
+    super();
+  }
+  
   public OutputDescriptor(String outputClassName) {
     super(outputClassName);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a6908afa/tez-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java
index 092147d..60afb4d 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java
@@ -18,8 +18,15 @@
 
 package org.apache.tez.dag.api;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+
 public class ProcessorDescriptor extends TezEntityDescriptor {
 
+  @Private // for Writable
+  public ProcessorDescriptor() {
+    super();
+  }
+  
   public ProcessorDescriptor(String processorClassName) {
     super(processorClassName);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a6908afa/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java
index 9d4b2c4..cf305eb 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java
@@ -18,11 +18,23 @@
 
 package org.apache.tez.dag.api;
 
-public abstract class TezEntityDescriptor {
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+public abstract class TezEntityDescriptor implements Writable {
 
   protected byte[] userPayload;
   private String className;
 
+  @Private // for Writable
+  public TezEntityDescriptor() {
+  }
+  
   public TezEntityDescriptor(String className) {
     this.className = className;
   }
@@ -39,4 +51,25 @@ public abstract class TezEntityDescriptor {
   public String getClassName() {
     return this.className;
   }
+  
+  @Override
+  public void write(DataOutput out) throws IOException {
+    Text.writeString(out, className);
+    if (userPayload == null) {
+      out.writeInt(-1);
+    } else {
+      out.writeInt(userPayload.length);
+      out.write(userPayload);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    this.className = Text.readString(in);
+    int payloadLength = in.readInt();
+    if (payloadLength != -1) {
+      userPayload = new byte[payloadLength];
+      in.readFully(userPayload);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a6908afa/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 29a29a0..4e4e82e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -775,6 +775,10 @@ public class DAGAppMaster extends AbstractService {
         throw new TezException("No running dag at present");
       }
       if(!dagId.equals(currentDAG.getID())) {
+        LOG.warn("Current DAGID : "
+            + (currentDAG.getID() == null ? "NULL" : currentDAG.getID())
+            + ", Looking for string (not found): " + dagIdStr + ", dagIdObj: "
+            + dagId);
         throw new TezException("Unknown dagId: " + dagIdStr);
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a6908afa/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/InputSpec.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/InputSpec.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/InputSpec.java
index 2610b66..6e5e7b5 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/InputSpec.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/InputSpec.java
@@ -23,9 +23,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.hadoop.io.Writable;
-import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
 
 public class InputSpec implements Writable {
 
@@ -57,25 +55,18 @@ public class InputSpec implements Writable {
 
   @Override
   public void write(DataOutput out) throws IOException {
-    // TODONEWTEZ convert to PB
+    // TODO TEZ-305 convert this to PB
     out.writeUTF(sourceVertexName);
     out.writeInt(physicalEdgeCount);
-    byte[] inputDescBytes =
-        DagTypeConverters.convertToDAGPlan(inputDescriptor).toByteArray();
-    out.writeInt(inputDescBytes.length);
-    out.write(inputDescBytes);
+    inputDescriptor.write(out);
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
     sourceVertexName = in.readUTF();
     physicalEdgeCount = in.readInt();
-    int inputDescLen = in.readInt();
-    byte[] inputDescBytes = new byte[inputDescLen];
-    in.readFully(inputDescBytes);
-    inputDescriptor =
-        DagTypeConverters.convertInputDescriptorFromDAGPlan(
-            TezEntityDescriptorProto.parseFrom(inputDescBytes));
+    inputDescriptor = new InputDescriptor();
+    inputDescriptor.readFields(in);
   }
 
   public String toString() {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a6908afa/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/OutputSpec.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/OutputSpec.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/OutputSpec.java
index 96a407f..c45a117 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/OutputSpec.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/OutputSpec.java
@@ -23,9 +23,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.hadoop.io.Writable;
-import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.OutputDescriptor;
-import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
 
 public class OutputSpec implements Writable {
 
@@ -57,25 +55,18 @@ public class OutputSpec implements Writable {
 
   @Override
   public void write(DataOutput out) throws IOException {
-    // TODONEWTEZ convert to PB
+    // TODO TEZ-305 convert this to PB
     out.writeUTF(destinationVertexName);
     out.writeInt(physicalEdgeCount);
-    byte[] inputDescBytes =
-        DagTypeConverters.convertToDAGPlan(outputDescriptor).toByteArray();
-    out.writeInt(inputDescBytes.length);
-    out.write(inputDescBytes);
+    outputDescriptor.write(out);
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
     destinationVertexName = in.readUTF();
     physicalEdgeCount = in.readInt();
-    int inputDescLen = in.readInt();
-    byte[] inputDescBytes = new byte[inputDescLen];
-    in.readFully(inputDescBytes);
-    outputDescriptor =
-        DagTypeConverters.convertOutputDescriptorFromDAGPlan(
-            TezEntityDescriptorProto.parseFrom(inputDescBytes));
+    outputDescriptor = new OutputDescriptor();
+    outputDescriptor.readFields(in);
   }
 
   public String toString() {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a6908afa/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
index 534de99..e76dbe4 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
@@ -24,9 +24,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.io.Writable;
-import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 
 public class TaskSpec implements Writable {
@@ -81,10 +79,7 @@ public class TaskSpec implements Writable {
   public void write(DataOutput out) throws IOException {
     taskAttemptId.write(out);
     out.writeUTF(vertexName);
-    byte[] procDesc =
-        DagTypeConverters.convertToDAGPlan(processorDescriptor).toByteArray();
-    out.writeInt(procDesc.length);
-    out.write(procDesc);
+    processorDescriptor.write(out);
     out.writeInt(inputSpecList.size());
     for (InputSpec inputSpec : inputSpecList) {
       inputSpec.write(out);
@@ -100,14 +95,9 @@ public class TaskSpec implements Writable {
     taskAttemptId = new TezTaskAttemptID();
     taskAttemptId.readFields(in);
     vertexName = in.readUTF();
-    int procDescLength = in.readInt();
-    // TODO at least 3 buffer copies here. Need to convert this to full PB
-    // TEZ-305
-    byte[] procDescBytes = new byte[procDescLength];
-    in.readFully(procDescBytes);
-    processorDescriptor =
-        DagTypeConverters.convertProcessorDescriptorFromDAGPlan(
-            TezEntityDescriptorProto.parseFrom(procDescBytes));
+    // TODO TEZ-305 convert this to PB
+    processorDescriptor = new ProcessorDescriptor();
+    processorDescriptor.readFields(in);
     int numInputSpecs = in.readInt();
     inputSpecList = new ArrayList<InputSpec>(numInputSpecs);
     for (int i = 0; i < numInputSpecs; i++) {