You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/10/22 20:07:49 UTC
git commit: TEZ-574. Avoid creating copies of payloads when sending
*Specs over the wire. (sseth)
Updated Branches:
refs/heads/master 142910630 -> a6908afa8
TEZ-574. Avoid creating copies of payloads when sending *Specs over the
wire. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/a6908afa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/a6908afa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/a6908afa
Branch: refs/heads/master
Commit: a6908afa8c2832e9d3fd4420bb55462f848eb00c
Parents: 1429106
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Oct 22 11:07:26 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue Oct 22 11:07:26 2013 -0700
----------------------------------------------------------------------
.../org/apache/tez/dag/api/InputDescriptor.java | 7 ++++
.../apache/tez/dag/api/OutputDescriptor.java | 7 ++++
.../apache/tez/dag/api/ProcessorDescriptor.java | 7 ++++
.../apache/tez/dag/api/TezEntityDescriptor.java | 35 +++++++++++++++++++-
.../org/apache/tez/dag/app/DAGAppMaster.java | 4 +++
.../apache/tez/runtime/api/impl/InputSpec.java | 17 +++-------
.../apache/tez/runtime/api/impl/OutputSpec.java | 17 +++-------
.../apache/tez/runtime/api/impl/TaskSpec.java | 18 +++-------
8 files changed, 71 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a6908afa/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java
index dea9001..24b542a 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java
@@ -18,8 +18,15 @@
package org.apache.tez.dag.api;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+
public class InputDescriptor extends TezEntityDescriptor {
+ @Private // for Writable
+ public InputDescriptor() {
+ super();
+ }
+
public InputDescriptor(String inputClassName) {
super(inputClassName);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a6908afa/tez-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java
index 16fb9b1..d6bde35 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java
@@ -18,8 +18,15 @@
package org.apache.tez.dag.api;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+
public class OutputDescriptor extends TezEntityDescriptor {
+ @Private // for Writable
+ public OutputDescriptor() {
+ super();
+ }
+
public OutputDescriptor(String outputClassName) {
super(outputClassName);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a6908afa/tez-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java
index 092147d..60afb4d 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java
@@ -18,8 +18,15 @@
package org.apache.tez.dag.api;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+
public class ProcessorDescriptor extends TezEntityDescriptor {
+ @Private // for Writable
+ public ProcessorDescriptor() {
+ super();
+ }
+
public ProcessorDescriptor(String processorClassName) {
super(processorClassName);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a6908afa/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java
index 9d4b2c4..cf305eb 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java
@@ -18,11 +18,23 @@
package org.apache.tez.dag.api;
-public abstract class TezEntityDescriptor {
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+public abstract class TezEntityDescriptor implements Writable {
protected byte[] userPayload;
private String className;
+ @Private // for Writable
+ public TezEntityDescriptor() {
+ }
+
public TezEntityDescriptor(String className) {
this.className = className;
}
@@ -39,4 +51,25 @@ public abstract class TezEntityDescriptor {
public String getClassName() {
return this.className;
}
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, className);
+ if (userPayload == null) {
+ out.writeInt(-1);
+ } else {
+ out.writeInt(userPayload.length);
+ out.write(userPayload);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.className = Text.readString(in);
+ int payloadLength = in.readInt();
+ if (payloadLength != -1) {
+ userPayload = new byte[payloadLength];
+ in.readFully(userPayload);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a6908afa/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 29a29a0..4e4e82e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -775,6 +775,10 @@ public class DAGAppMaster extends AbstractService {
throw new TezException("No running dag at present");
}
if(!dagId.equals(currentDAG.getID())) {
+ LOG.warn("Current DAGID : "
+ + (currentDAG.getID() == null ? "NULL" : currentDAG.getID())
+ + ", Looking for string (not found): " + dagIdStr + ", dagIdObj: "
+ + dagId);
throw new TezException("Unknown dagId: " + dagIdStr);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a6908afa/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/InputSpec.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/InputSpec.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/InputSpec.java
index 2610b66..6e5e7b5 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/InputSpec.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/InputSpec.java
@@ -23,9 +23,7 @@ import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
-import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
public class InputSpec implements Writable {
@@ -57,25 +55,18 @@ public class InputSpec implements Writable {
@Override
public void write(DataOutput out) throws IOException {
- // TODONEWTEZ convert to PB
+ // TODO TEZ-305 convert this to PB
out.writeUTF(sourceVertexName);
out.writeInt(physicalEdgeCount);
- byte[] inputDescBytes =
- DagTypeConverters.convertToDAGPlan(inputDescriptor).toByteArray();
- out.writeInt(inputDescBytes.length);
- out.write(inputDescBytes);
+ inputDescriptor.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
sourceVertexName = in.readUTF();
physicalEdgeCount = in.readInt();
- int inputDescLen = in.readInt();
- byte[] inputDescBytes = new byte[inputDescLen];
- in.readFully(inputDescBytes);
- inputDescriptor =
- DagTypeConverters.convertInputDescriptorFromDAGPlan(
- TezEntityDescriptorProto.parseFrom(inputDescBytes));
+ inputDescriptor = new InputDescriptor();
+ inputDescriptor.readFields(in);
}
public String toString() {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a6908afa/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/OutputSpec.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/OutputSpec.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/OutputSpec.java
index 96a407f..c45a117 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/OutputSpec.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/OutputSpec.java
@@ -23,9 +23,7 @@ import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
-import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.OutputDescriptor;
-import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
public class OutputSpec implements Writable {
@@ -57,25 +55,18 @@ public class OutputSpec implements Writable {
@Override
public void write(DataOutput out) throws IOException {
- // TODONEWTEZ convert to PB
+ // TODO TEZ-305 convert this to PB
out.writeUTF(destinationVertexName);
out.writeInt(physicalEdgeCount);
- byte[] inputDescBytes =
- DagTypeConverters.convertToDAGPlan(outputDescriptor).toByteArray();
- out.writeInt(inputDescBytes.length);
- out.write(inputDescBytes);
+ outputDescriptor.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
destinationVertexName = in.readUTF();
physicalEdgeCount = in.readInt();
- int inputDescLen = in.readInt();
- byte[] inputDescBytes = new byte[inputDescLen];
- in.readFully(inputDescBytes);
- outputDescriptor =
- DagTypeConverters.convertOutputDescriptorFromDAGPlan(
- TezEntityDescriptorProto.parseFrom(inputDescBytes));
+ outputDescriptor = new OutputDescriptor();
+ outputDescriptor.readFields(in);
}
public String toString() {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a6908afa/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
index 534de99..e76dbe4 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
@@ -24,9 +24,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.Writable;
-import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
import org.apache.tez.dag.records.TezTaskAttemptID;
public class TaskSpec implements Writable {
@@ -81,10 +79,7 @@ public class TaskSpec implements Writable {
public void write(DataOutput out) throws IOException {
taskAttemptId.write(out);
out.writeUTF(vertexName);
- byte[] procDesc =
- DagTypeConverters.convertToDAGPlan(processorDescriptor).toByteArray();
- out.writeInt(procDesc.length);
- out.write(procDesc);
+ processorDescriptor.write(out);
out.writeInt(inputSpecList.size());
for (InputSpec inputSpec : inputSpecList) {
inputSpec.write(out);
@@ -100,14 +95,9 @@ public class TaskSpec implements Writable {
taskAttemptId = new TezTaskAttemptID();
taskAttemptId.readFields(in);
vertexName = in.readUTF();
- int procDescLength = in.readInt();
- // TODO at least 3 buffer copies here. Need to convert this to full PB
- // TEZ-305
- byte[] procDescBytes = new byte[procDescLength];
- in.readFully(procDescBytes);
- processorDescriptor =
- DagTypeConverters.convertProcessorDescriptorFromDAGPlan(
- TezEntityDescriptorProto.parseFrom(procDescBytes));
+ // TODO TEZ-305 convert this to PB
+ processorDescriptor = new ProcessorDescriptor();
+ processorDescriptor.readFields(in);
int numInputSpecs = in.readInt();
inputSpecList = new ArrayList<InputSpec>(numInputSpecs);
for (int i = 0; i < numInputSpecs; i++) {