You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/11/27 19:43:27 UTC
[2/2] git commit: TEZ-645. Re-use ID instances in the AM,
intern vertex names etc where possible. (sseth)
TEZ-645. Re-use ID instances in the AM, intern vertex names etc where
possible. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/b3665f41
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/b3665f41
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/b3665f41
Branch: refs/heads/master
Commit: b3665f4118cfe888e90f54c1d5e08ba969dbf972
Parents: fc3a948
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Nov 27 10:43:04 2013 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Nov 27 10:43:04 2013 -0800
----------------------------------------------------------------------
.../apache/tez/runtime/api/LogicalOutput.java | 2 +-
.../java/org/apache/tez/common/IDUtils.java | 14 ++--
.../org/apache/tez/dag/records/TezDAGID.java | 69 +++++++++++++----
.../java/org/apache/tez/dag/records/TezID.java | 4 +
.../tez/dag/records/TezTaskAttemptID.java | 39 +++++++---
.../org/apache/tez/dag/records/TezTaskID.java | 40 ++++++++--
.../org/apache/tez/dag/records/TezVertexID.java | 37 ++++++++--
.../org/apache/tez/dag/records/TestTezIds.java | 8 +-
.../apache/tez/dag/app/ContainerContext.java | 2 +-
.../org/apache/tez/dag/app/DAGAppMaster.java | 2 +-
.../org/apache/tez/dag/app/dag/impl/Edge.java | 2 +-
.../dag/impl/RootInputLeafOutputDescriptor.java | 2 +-
.../app/dag/impl/RootInputVertexManager.java | 2 +-
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 16 ++--
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 2 +-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 3 +-
.../tez/dag/app/rm/ContainerContextMatcher.java | 2 +-
.../apache/tez/dag/app/rm/TaskScheduler.java | 2 +-
.../apache/tez/dag/utils/TezBuilderUtils.java | 6 +-
.../tez/dag/app/dag/impl/TestDAGImpl.java | 72 +++++++++---------
.../tez/dag/app/dag/impl/TestDAGScheduler.java | 8 +-
.../tez/dag/app/dag/impl/TestTaskAttempt.java | 48 ++++++------
.../tez/dag/app/dag/impl/TestTaskImpl.java | 6 +-
.../tez/dag/app/dag/impl/TestVertexImpl.java | 78 ++++++++++----------
.../dag/app/dag/impl/TestVertexScheduler.java | 52 ++++++-------
.../tez/dag/app/rm/TestContainerReuse.java | 62 ++++++++--------
.../dag/app/rm/container/TestAMContainer.java | 24 +++---
.../processor/FilterByWordInputProcessor.java | 2 +-
.../processor/FilterByWordOutputProcessor.java | 2 +-
.../hadoop/mapred/split/TezGroupedSplit.java | 2 +-
.../hadoop/mapreduce/split/TezGroupedSplit.java | 2 +-
.../tez/mapreduce/hadoop/IDConverter.java | 8 +-
.../tez/mapreduce/hadoop/InputSplitInfo.java | 2 +-
.../tez/mapreduce/hadoop/MRTaskStatus.java | 4 +-
.../org/apache/tez/mapreduce/TezTestUtils.java | 18 ++---
.../tez/mapreduce/processor/MapUtils.java | 1 -
.../processor/reduce/TestReduceProcessor.java | 1 -
.../tez/runtime/api/impl/EventMetaData.java | 12 +--
.../apache/tez/runtime/api/impl/InputSpec.java | 5 +-
.../apache/tez/runtime/api/impl/OutputSpec.java | 5 +-
.../apache/tez/runtime/api/impl/TaskSpec.java | 18 ++---
.../runtime/api/impl/TezHeartbeatRequest.java | 3 +-
.../broadcast/output/FileBasedKVWriter.java | 2 +-
.../library/common/sort/impl/TestIFile.java | 2 +-
.../output/TestOnFileUnorderedKVOutput.java | 10 +--
.../runtime/library/testutils/KVDataGen.java | 2 +-
46 files changed, 406 insertions(+), 299 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalOutput.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalOutput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalOutput.java
index 475eaef..a8d0ce3 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalOutput.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalOutput.java
@@ -33,4 +33,4 @@ public interface LogicalOutput extends Output {
* the number of physical outputs
*/
public void setNumPhysicalOutputs(int numOutputs);
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-common/src/main/java/org/apache/tez/common/IDUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/IDUtils.java b/tez-common/src/main/java/org/apache/tez/common/IDUtils.java
index 1270e5a..d15b0d3 100644
--- a/tez-common/src/main/java/org/apache/tez/common/IDUtils.java
+++ b/tez-common/src/main/java/org/apache/tez/common/IDUtils.java
@@ -42,9 +42,9 @@ public class IDUtils {
if(parts[0].equals(TezTaskID.TASK)) {
ApplicationId appId = ApplicationId.newInstance(
Long.valueOf(parts[1]), Integer.parseInt(parts[2]));
- TezDAGID dagId = new TezDAGID(appId, Integer.parseInt(parts[3]));
- TezVertexID vId = new TezVertexID(dagId, Integer.parseInt(parts[4]));
- return new TezTaskID(vId, Integer.parseInt(parts[5]));
+ TezDAGID dagId = TezDAGID.getInstance(appId, Integer.parseInt(parts[3]));
+ TezVertexID vId = TezVertexID.getInstance(dagId, Integer.parseInt(parts[4]));
+ return TezTaskID.getInstance(vId, Integer.parseInt(parts[5]));
} else
exceptionMsg = "Bad TaskType identifier. TaskId string : " + str
+ " is not properly formed.";
@@ -72,10 +72,10 @@ public class IDUtils {
if(parts[0].equals(TezTaskAttemptID.ATTEMPT)) {
ApplicationId appId = ApplicationId.newInstance(
Long.valueOf(parts[1]), Integer.parseInt(parts[2]));
- TezDAGID dagId = new TezDAGID(appId, Integer.parseInt(parts[3]));
- TezVertexID vId = new TezVertexID(dagId, Integer.parseInt(parts[4]));
- TezTaskID tId = new TezTaskID(vId, Integer.parseInt(parts[5]));
- return new TezTaskAttemptID(tId, Integer.parseInt(parts[6]));
+ TezDAGID dagId = TezDAGID.getInstance(appId, Integer.parseInt(parts[3]));
+ TezVertexID vId = TezVertexID.getInstance(dagId, Integer.parseInt(parts[4]));
+ TezTaskID tId = TezTaskID.getInstance(vId, Integer.parseInt(parts[5]));
+ return TezTaskAttemptID.getInstance(tId, Integer.parseInt(parts[6]));
} else
exceptionMsg = "Bad TaskType identifier. TaskAttemptId string : "
+ str + " is not properly formed.";
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java
index 227207a..54ca8b0 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java
@@ -25,6 +25,11 @@ import java.text.NumberFormat;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
/**
* TezDAGID represents the immutable and unique identifier for
* a Tez DAG.
@@ -37,31 +42,56 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
*/
public class TezDAGID extends TezID {
+ private static LoadingCache<TezDAGID, TezDAGID> dagIdCache = CacheBuilder.newBuilder().softValues().
+ build(
+ new CacheLoader<TezDAGID, TezDAGID>() {
+ @Override
+ public TezDAGID load(TezDAGID key) throws Exception {
+ return key;
+ }
+ }
+ );
+
private ApplicationId applicationId;
- public TezDAGID() {
- }
-
/**
- * Constructs a DAGID object from given {@link ApplicationId}.
+ * Get a DAGID object from given {@link ApplicationId}.
* @param applicationId Application that this dag belongs to
* @param id the dag number
*/
- public TezDAGID(ApplicationId applicationId, int id) {
- super(id);
- if(applicationId == null) {
- throw new IllegalArgumentException("applicationId cannot be null");
- }
- this.applicationId = applicationId;
+ public static TezDAGID getInstance(ApplicationId applicationId, int id) {
+ // The newly created TezDAGIds are primarily for their hashCode method, and
+ // will be short-lived.
+ // Alternately the cache can be keyed by the hash of the incoming paramters.
+ Preconditions.checkArgument(applicationId != null, "ApplicationID cannot be null");
+ return dagIdCache.getUnchecked(new TezDAGID(applicationId, id));
}
-
+
/**
- * Constructs a DAGID object from given parts.
+ * Get a DAGID object from given parts.
* @param yarnRMIdentifier YARN RM identifier
* @param applicationId application number
* @param id the dag number
*/
- public TezDAGID(String yarnRMIdentifier, int appId, int id) {
+ public static TezDAGID getInstance(String yarnRMIdentifier, int appId, int id) {
+ // The newly created TezDAGIds are primarily for their hashCode method, and
+ // will be short-lived.
+ // Alternately the cache can be keyed by the hash of the incoming paramters.
+ Preconditions.checkArgument(yarnRMIdentifier != null, "yarnRMIdentifier cannot be null");
+ return dagIdCache.getUnchecked(new TezDAGID(yarnRMIdentifier, appId, id));
+ }
+
+ // Public for Writable serialization. Verify if this is actually required.
+ public TezDAGID() {
+ }
+
+ private TezDAGID(ApplicationId applicationId, int id) {
+ super(id);
+ this.applicationId = applicationId;
+ }
+
+
+ private TezDAGID(String yarnRMIdentifier, int appId, int id) {
this(ApplicationId.newInstance(Long.valueOf(yarnRMIdentifier),
appId), id);
}
@@ -89,11 +119,22 @@ public class TezDAGID extends TezID {
@Override
+ // Can't do much about this instance if used via the RPC layer. Any downstream
+ // users can however avoid using this method.
public void readFields(DataInput in) throws IOException {
+ // ApplicationId could be cached in this case. All of this will change for Protobuf RPC.
applicationId = ApplicationId.newInstance(in.readLong(), in.readInt());
super.readFields(in);
}
+ public static TezDAGID readTezDAGID(DataInput in) throws IOException {
+ long clusterId = in.readLong();
+ int appId = in.readInt();
+ int dagIdInt = TezID.readID(in);
+ TezDAGID dagID = getInstance(ApplicationId.newInstance(clusterId, appId), dagIdInt);
+ return dagID;
+ }
+
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(applicationId.getClusterTimestamp());
@@ -135,7 +176,7 @@ public class TezDAGID extends TezID {
int appId = tezAppIdFormat.get().parse(split[2]).intValue();
int id;
id = tezDagIdFormat.get().parse(split[3]).intValue();
- return new TezDAGID(rmId, appId, id);
+ return TezDAGID.getInstance(rmId, appId, id);
} catch (Exception e) {
e.printStackTrace();
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-common/src/main/java/org/apache/tez/dag/records/TezID.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezID.java
index 7fe5d8c..4cb37ac 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TezID.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezID.java
@@ -88,6 +88,10 @@ public abstract class TezID implements WritableComparable<TezID> {
public void readFields(DataInput in) throws IOException {
this.id = in.readInt();
}
+
+ public static int readID(DataInput in) throws IOException {
+ return in.readInt();
+ }
@Override
public void write(DataOutput out) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java
index 4b2f424..022d6a2 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java
@@ -26,6 +26,10 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.tez.dag.records.TezTaskID;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
/**
* TezTaskAttemptID represents the immutable and unique identifier for
* a task attempt. Each task attempt is one particular instance of a Tez Task
@@ -47,8 +51,18 @@ public class TezTaskAttemptID extends TezID {
public static final String ATTEMPT = "attempt";
private TezTaskID taskId;
+ private static LoadingCache<TezTaskAttemptID, TezTaskAttemptID> taskAttemptIDCache = CacheBuilder.newBuilder().softValues().
+ build(
+ new CacheLoader<TezTaskAttemptID, TezTaskAttemptID>() {
+ @Override
+ public TezTaskAttemptID load(TezTaskAttemptID key) throws Exception {
+ return key;
+ }
+ }
+ );
+
+ // Public for Writable serialization. Verify if this is actually required.
public TezTaskAttemptID() {
- taskId = new TezTaskID();
}
/**
@@ -56,7 +70,11 @@ public class TezTaskAttemptID extends TezID {
* @param taskId TaskID that this task belongs to
* @param id the task attempt number
*/
- public TezTaskAttemptID(TezTaskID taskId, int id) {
+ public static TezTaskAttemptID getInstance(TezTaskID taskID, int id) {
+ return taskAttemptIDCache.getUnchecked(new TezTaskAttemptID(taskID, id));
+ }
+
+ private TezTaskAttemptID(TezTaskID taskId, int id) {
super(id);
if(taskId == null) {
throw new IllegalArgumentException("taskId cannot be null");
@@ -108,21 +126,22 @@ public class TezTaskAttemptID extends TezID {
}
@Override
+ // Can't do much about this instance if used via the RPC layer. Any downstream
+ // users can however avoid using this method.
public void readFields(DataInput in) throws IOException {
- taskId.readFields(in);
+ taskId = TezTaskID.readTezTaskID(in);
super.readFields(in);
}
+
+ public static TezTaskAttemptID readTezTaskAttemptID(DataInput in) throws IOException {
+ TezTaskID taskID = TezTaskID.readTezTaskID(in);
+ int attemptIdInt = TezID.readID(in);
+ return getInstance(taskID, attemptIdInt);
+ }
@Override
public void write(DataOutput out) throws IOException {
taskId.write(out);
super.write(out);
}
-
- // FIXME TEZ DAG needs to be removed
- public static TezTaskAttemptID read(DataInput in) throws IOException {
- TezTaskAttemptID tId = new TezTaskAttemptID();
- tId.readFields(in);
- return tId;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java
index 6b2fc2e..1223ec1 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java
@@ -27,6 +27,11 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.tez.dag.records.TezVertexID;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
/**
* TaskID represents the immutable and unique identifier for
@@ -51,10 +56,20 @@ public class TezTaskID extends TezID {
}
};
+ private static LoadingCache<TezTaskID, TezTaskID> taskIDCache = CacheBuilder.newBuilder().softValues().
+ build(
+ new CacheLoader<TezTaskID, TezTaskID>() {
+ @Override
+ public TezTaskID load(TezTaskID key) throws Exception {
+ return key;
+ }
+ }
+ );
+
private TezVertexID vertexId;
+ // Public for Writable serialization. Verify if this is actually required.
public TezTaskID() {
- vertexId = new TezVertexID();
}
/**
@@ -63,12 +78,15 @@ public class TezTaskID extends TezID {
* @param type the {@link TezTaskType} of the task
* @param id the tip number
*/
- public TezTaskID(TezVertexID vertexId, int id) {
+ public static TezTaskID getInstance(TezVertexID vertexID, int id) {
+ Preconditions.checkArgument(vertexID != null, "vertexID cannot be null");
+ return taskIDCache.getUnchecked(new TezTaskID(vertexID, id));
+ }
+
+ private TezTaskID(TezVertexID vertexID, int id) {
super(id);
- if(vertexId == null) {
- throw new IllegalArgumentException("vertexId cannot be null");
- }
- this.vertexId = vertexId;
+ Preconditions.checkArgument(vertexID != null, "vertexID cannot be null");
+ this.vertexId = vertexID;
}
/** Returns the {@link TezVertexID} object that this task belongs to */
@@ -117,10 +135,18 @@ public class TezTaskID extends TezID {
}
@Override
+ // Can't do much about this instance if used via the RPC layer. Any downstream
+ // users can however avoid using this method.
public void readFields(DataInput in) throws IOException {
- vertexId.readFields(in);
+ vertexId = TezVertexID.readTezVertexID(in);
super.readFields(in);
}
+
+ public static TezTaskID readTezTaskID(DataInput in) throws IOException {
+ TezVertexID vertexID = TezVertexID.readTezVertexID(in);
+ int taskIdInt = TezID.readID(in);
+ return getInstance(vertexID, taskIdInt);
+ }
@Override
public void write(DataOutput out) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java
index e24f35e..66d022c 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java
@@ -26,6 +26,11 @@ import java.text.NumberFormat;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
/**
* TezVertexID represents the immutable and unique identifier for
* a Vertex in a Tez DAG. Each TezVertexID encompasses multiple Tez Tasks.
@@ -52,8 +57,19 @@ public class TezVertexID extends TezID {
}
};
+ private static LoadingCache<TezVertexID, TezVertexID> vertexIDCache = CacheBuilder.newBuilder().softValues().
+ build(
+ new CacheLoader<TezVertexID, TezVertexID>() {
+ @Override
+ public TezVertexID load(TezVertexID key) throws Exception {
+ return key;
+ }
+ }
+ );
+
private TezDAGID dagId;
+ // Public for Writable serialization. Verify if this is actually required.
public TezVertexID() {
}
@@ -63,11 +79,13 @@ public class TezVertexID extends TezID {
* @param type the {@link TezTaskType} of the task
* @param id the tip number
*/
- public TezVertexID(TezDAGID dagId, int id) {
+ public static TezVertexID getInstance(TezDAGID dagId, int id) {
+ Preconditions.checkArgument(dagId != null, "DagID cannot be null");
+ return vertexIDCache.getUnchecked(new TezVertexID(dagId, id));
+ }
+
+ private TezVertexID(TezDAGID dagId, int id) {
super(id);
- if(dagId == null) {
- throw new IllegalArgumentException("dagId cannot be null");
- }
this.dagId = dagId;
}
@@ -98,11 +116,18 @@ public class TezVertexID extends TezID {
}
@Override
+ // Can't do much about this instance if used via the RPC layer. Any downstream
+ // users can however avoid using this method.
public void readFields(DataInput in) throws IOException {
- dagId = new TezDAGID();
- dagId.readFields(in);
+ dagId = TezDAGID.readTezDAGID(in);
super.readFields(in);
}
+
+ public static TezVertexID readTezVertexID(DataInput in) throws IOException {
+ TezDAGID dagID = TezDAGID.readTezDAGID(in);
+ int vertexIdInt = TezID.readID(in);
+ return getInstance(dagID, vertexIdInt);
+ }
@Override
public void write(DataOutput out) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-common/src/test/java/org/apache/tez/dag/records/TestTezIds.java
----------------------------------------------------------------------
diff --git a/tez-common/src/test/java/org/apache/tez/dag/records/TestTezIds.java b/tez-common/src/test/java/org/apache/tez/dag/records/TestTezIds.java
index a3d838f..dcf1309 100644
--- a/tez-common/src/test/java/org/apache/tez/dag/records/TestTezIds.java
+++ b/tez-common/src/test/java/org/apache/tez/dag/records/TestTezIds.java
@@ -82,10 +82,10 @@ public class TestTezIds {
@Test
public void testIdStringify() {
ApplicationId appId = ApplicationId.newInstance(9999, 72);
- TezDAGID dagId = new TezDAGID(appId, 1);
- TezVertexID vId = new TezVertexID(dagId, 35);
- TezTaskID tId = new TezTaskID(vId, 389);
- TezTaskAttemptID taId = new TezTaskAttemptID(tId, 2);
+ TezDAGID dagId = TezDAGID.getInstance(appId, 1);
+ TezVertexID vId = TezVertexID.getInstance(dagId, 35);
+ TezTaskID tId = TezTaskID.getInstance(vId, 389);
+ TezTaskAttemptID taId = TezTaskAttemptID.getInstance(tId, 2);
String dagIdStr = dagId.toString();
String vIdStr = vId.toString();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerContext.java
index a4ce701..7d822b7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerContext.java
@@ -148,4 +148,4 @@ public class ContainerContext {
}
return true;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index e5ade3d..b0c770e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -474,7 +474,7 @@ public class DAGAppMaster extends AbstractService {
/** Create and initialize (but don't start) a single dag. */
protected DAG createDAG(DAGPlan dagPB) {
- TezDAGID dagId = new TezDAGID(appAttemptID.getApplicationId(),
+ TezDAGID dagId = TezDAGID.getInstance(appAttemptID.getApplicationId(),
dagCounter.incrementAndGet());
// Prepare the TaskAttemptListener server for authentication of Containers
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index 0f0f4c7..0e480d4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -161,7 +161,7 @@ public class Edge {
}
TezTaskID srcTaskId = srcTask.getTaskId();
int taskAttemptIndex = event.getVersion();
- TezTaskAttemptID srcTaskAttemptId = new TezTaskAttemptID(srcTaskId,
+ TezTaskAttemptID srcTaskAttemptId = TezTaskAttemptID.getInstance(srcTaskId,
taskAttemptIndex);
eventHandler.handle(new TaskAttemptEventOutputFailed(srcTaskAttemptId,
tezEvent, numConsumers));
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputLeafOutputDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputLeafOutputDescriptor.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputLeafOutputDescriptor.java
index d2d2c13..98872a7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputLeafOutputDescriptor.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputLeafOutputDescriptor.java
@@ -43,4 +43,4 @@ public class RootInputLeafOutputDescriptor<T extends TezEntityDescriptor> {
return this.initializerClassName;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
index 0d1aabb..6ffe581 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
@@ -116,7 +116,7 @@ public class RootInputVertexManager implements VertexScheduler {
Preconditions.checkState(managedVertex.getTasks().size() != 0);
TezEvent tezEvent = new TezEvent(event, sourceInfo);
tezEvent.setDestinationInfo(destInfoMap.get(inputName));
- sendEventToTask(new TezTaskID(managedVertex.getVertexId(),
+ sendEventToTask(TezTaskID.getInstance(managedVertex.getVertexId(),
((RootInputDataInformationEvent) event).getIndex()), tezEvent);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 3e21b9d..d6e9d0b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -32,6 +32,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -59,7 +60,6 @@ import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerContext;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
-import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
@@ -284,10 +284,8 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskSpec createRemoteTaskSpec() {
Vertex vertex = getVertex();
ProcessorDescriptor procDesc = vertex.getProcessorDescriptor();
- DAG dag = vertex.getDAG();
int taskId = getTaskID().getId();
- return new TaskSpec(getID(), dag.getUserName(),
- vertex.getName(), procDesc, vertex.getInputSpecList(taskId),
+ return new TaskSpec(getID(), vertex.getName(), procDesc, vertex.getInputSpecList(taskId),
vertex.getOutputSpecList(taskId));
}
@@ -307,7 +305,7 @@ public class TaskAttemptImpl implements TaskAttempt,
//result.setShuffleFinishTime(this.reportedStatus.shuffleFinishTime);
result.setDiagnosticInfo(StringUtils.join(LINE_SEPARATOR, getDiagnostics()));
//result.setPhase(reportedStatus.phase);
- //result.setStateString(reportedStatus.stateString);
+ //result.setStateString(reportedStatus.statef);
result.setCounters(getCounters());
result.setContainerId(this.getAssignedContainerID());
result.setNodeManagerHost(trackerName);
@@ -920,16 +918,16 @@ public class TaskAttemptImpl implements TaskAttempt,
ta.containerId = event.getContainerId();
ta.containerNodeId = container.getNodeId();
- ta.nodeHttpAddress = container.getNodeHttpAddress();
- ta.nodeRackName = RackResolver.resolve(ta.containerNodeId.getHost())
- .getNetworkLocation();
+ ta.nodeHttpAddress = StringInterner.weakIntern(container.getNodeHttpAddress());
+ ta.nodeRackName = StringInterner.weakIntern(RackResolver.resolve(ta.containerNodeId.getHost())
+ .getNetworkLocation());
ta.launchTime = ta.clock.getTime();
// TODO Resolve to host / IP in case of a local address.
InetSocketAddress nodeHttpInetAddr = NetUtils
.createSocketAddr(ta.nodeHttpAddress); // TODO: Costly?
- ta.trackerName = nodeHttpInetAddr.getHostName();
+ ta.trackerName = StringInterner.weakIntern(nodeHttpInetAddr.getHostName());
ta.httpPort = nodeHttpInetAddr.getPort();
ta.sendEvent(createJobCounterUpdateEventTALaunched(ta));
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 76c36c5..30e6e2a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -310,7 +310,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
// TODO Avoid reading this from configuration for each task.
maxAttempts = this.conf.getInt(TezConfiguration.TEZ_AM_MAX_TASK_ATTEMPTS,
TezConfiguration.TEZ_AM_MAX_TASK_ATTEMPTS_DEFAULT);
- taskId = new TezTaskID(vertexId, taskIndex);
+ taskId = TezTaskID.getInstance(vertexId, taskIndex);
this.taskAttemptListener = taskAttemptListener;
this.taskHeartbeatHandler = thh;
this.eventHandler = eventHandler;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 179ed1d..f068617 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.MRVertexOutputCommitter;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -479,7 +480,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
AppContext appContext, VertexLocationHint vertexLocationHint) {
this.vertexId = vertexId;
this.vertexPlan = vertexPlan;
- this.vertexName = vertexName;
+ this.vertexName = StringInterner.weakIntern(vertexName);
this.conf = conf;
//this.metrics = metrics;
this.clock = clock;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerContextMatcher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerContextMatcher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerContextMatcher.java
index 4e98dcc..aa689e3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerContextMatcher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerContextMatcher.java
@@ -53,4 +53,4 @@ public class ContainerContextMatcher implements ContainerSignatureMatcher {
return context1.isExactMatch(context2);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
index 67966b1..ab038fd 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
@@ -1699,4 +1699,4 @@ public class TaskScheduler extends AbstractService
+ (firstContainerSignature != null? firstContainerSignature.toString():"null");
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-dag/src/main/java/org/apache/tez/dag/utils/TezBuilderUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/TezBuilderUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/TezBuilderUtils.java
index a734bb8..b7e6f72 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/utils/TezBuilderUtils.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/TezBuilderUtils.java
@@ -30,11 +30,11 @@ import org.apache.tez.dag.records.TezVertexID;
public class TezBuilderUtils {
public static TezVertexID newVertexID(TezDAGID dagId, int vertexId) {
- return new TezVertexID(dagId, vertexId);
+ return TezVertexID.getInstance(dagId, vertexId);
}
public static TezTaskAttemptID newTaskAttemptId(TezTaskID taskId, int id) {
- return new TezTaskAttemptID(taskId, id);
+ return TezTaskAttemptID.getInstance(taskId, id);
}
public static DAGReport newDAGReport() {
@@ -48,7 +48,7 @@ public class TezBuilderUtils {
}
public static TezTaskID newTaskId(TezDAGID dagId, int vertexId, int taskId) {
- return new TezTaskID(newVertexID(dagId, vertexId), taskId);
+ return TezTaskID.getInstance(newVertexID(dagId, vertexId), taskId);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index b0ad929..49a3307 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -490,7 +490,7 @@ public class TestDAGImpl {
conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
appAttemptId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(100, 1), 1);
- dagId = new TezDAGID(appAttemptId.getApplicationId(), 1);
+ dagId = TezDAGID.getInstance(appAttemptId.getApplicationId(), 1);
Assert.assertNotNull(dagId);
dagPlan = createTestDAGPlan();
dispatcher = new DrainDispatcher();
@@ -504,7 +504,7 @@ public class TestDAGImpl {
jobTokenSecretManager, fsTokens, clock, "user", thh, appContext);
doReturn(dag).when(appContext).getCurrentDAG();
mrrAppContext = mock(AppContext.class);
- mrrDagId = new TezDAGID(appAttemptId.getApplicationId(), 2);
+ mrrDagId = TezDAGID.getInstance(appAttemptId.getApplicationId(), 2);
mrrDagPlan = createTestMRRDAGPlan();
mrrDag = new DAGImpl(mrrDagId, conf, mrrDagPlan,
dispatcher.getEventHandler(), taskAttemptListener,
@@ -562,7 +562,7 @@ public class TestDAGImpl {
dispatcher.await();
for (int i = 0 ; i < 6; ++i ) {
- TezVertexID vId = new TezVertexID(dagId, i);
+ TezVertexID vId = TezVertexID.getInstance(dagId, i);
Vertex v = dag.getVertex(vId);
Assert.assertEquals(VertexState.RUNNING, v.getState());
if (i < 2) {
@@ -577,7 +577,7 @@ public class TestDAGImpl {
}
for (int i = 0 ; i < 6; ++i ) {
- TezVertexID vId = new TezVertexID(dagId, i);
+ TezVertexID vId = TezVertexID.getInstance(dagId, i);
LOG.info("Distance from root: v" + i + ":"
+ dag.getVertex(vId).getDistanceFromRoot());
}
@@ -590,12 +590,12 @@ public class TestDAGImpl {
startDAG(dag);
dispatcher.await();
- TezVertexID vId = new TezVertexID(dagId, 1);
+ TezVertexID vId = TezVertexID.getInstance(dagId, 1);
Vertex v = dag.getVertex(vId);
dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(
- new TezTaskID(vId, 0), TaskState.SUCCEEDED));
+ TezTaskID.getInstance(vId, 0), TaskState.SUCCEEDED));
dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(
- new TezTaskID(vId, 1), TaskState.SUCCEEDED));
+ TezTaskID.getInstance(vId, 1), TaskState.SUCCEEDED));
dispatcher.await();
Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
@@ -610,12 +610,12 @@ public class TestDAGImpl {
startDAG(dag);
dispatcher.await();
- TezVertexID vId = new TezVertexID(dagId, 1);
+ TezVertexID vId = TezVertexID.getInstance(dagId, 1);
Vertex v = dag.getVertex(vId);
dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(
- new TezTaskID(vId, 0), TaskState.SUCCEEDED));
+ TezTaskID.getInstance(vId, 0), TaskState.SUCCEEDED));
dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(
- new TezTaskID(vId, 1), TaskState.SUCCEEDED));
+ TezTaskID.getInstance(vId, 1), TaskState.SUCCEEDED));
dispatcher.await();
Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
@@ -624,14 +624,14 @@ public class TestDAGImpl {
verify(dag.dagScheduler, times(1)).vertexCompleted(v);
dispatcher.getEventHandler().handle(
- new VertexEventTaskReschedule(new TezTaskID(vId, 0)));
+ new VertexEventTaskReschedule(TezTaskID.getInstance(vId, 0)));
dispatcher.await();
Assert.assertEquals(VertexState.RUNNING, v.getState());
Assert.assertEquals(0, dag.getSuccessfulVertices());
Assert.assertEquals(0, dag.numCompletedVertices);
dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(
- new TezTaskID(vId, 0), TaskState.SUCCEEDED));
+ TezTaskID.getInstance(vId, 0), TaskState.SUCCEEDED));
dispatcher.await();
Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
@@ -655,7 +655,7 @@ public class TestDAGImpl {
Assert.assertEquals(DAGState.KILLED, dag.getState());
for (int i = 0 ; i < 6; ++i ) {
- TezVertexID vId = new TezVertexID(dagId, i);
+ TezVertexID vId = TezVertexID.getInstance(dagId, i);
Vertex v = dag.getVertex(vId);
Assert.assertEquals(VertexState.KILLED, v.getState());
}
@@ -669,14 +669,14 @@ public class TestDAGImpl {
startDAG(dag);
dispatcher.await();
- TezVertexID vId1 = new TezVertexID(dagId, 1);
+ TezVertexID vId1 = TezVertexID.getInstance(dagId, 1);
Vertex v1 = dag.getVertex(vId1);
((EventHandler<VertexEvent>) v1).handle(new VertexEventTaskCompleted(
- new TezTaskID(vId1, 0), TaskState.SUCCEEDED));
- TezVertexID vId0 = new TezVertexID(dagId, 0);
+ TezTaskID.getInstance(vId1, 0), TaskState.SUCCEEDED));
+ TezVertexID vId0 = TezVertexID.getInstance(dagId, 0);
Vertex v0 = dag.getVertex(vId0);
((EventHandler<VertexEvent>) v0).handle(new VertexEventTaskCompleted(
- new TezTaskID(vId0, 0), TaskState.SUCCEEDED));
+ TezTaskID.getInstance(vId0, 0), TaskState.SUCCEEDED));
dispatcher.await();
Assert.assertEquals(VertexState.SUCCEEDED, v0.getState());
@@ -689,7 +689,7 @@ public class TestDAGImpl {
Assert.assertEquals(VertexState.SUCCEEDED, v0.getState());
Assert.assertEquals(VertexState.TERMINATING, v1.getState());
for (int i = 2 ; i < 6; ++i ) {
- TezVertexID vId = new TezVertexID(dagId, i);
+ TezVertexID vId = TezVertexID.getInstance(dagId, i);
Vertex v = dag.getVertex(vId);
Assert.assertEquals(VertexState.KILLED, v.getState());
}
@@ -715,22 +715,22 @@ public class TestDAGImpl {
for (int i = 0; i < 6; ++i) {
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
- new TezVertexID(dagId, 0), VertexState.SUCCEEDED));
+ TezVertexID.getInstance(dagId, 0), VertexState.SUCCEEDED));
}
dispatcher.await();
Assert.assertEquals(DAGState.RUNNING, dag.getState());
Assert.assertEquals(1, dag.getSuccessfulVertices());
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
- new TezVertexID(dagId, 1), VertexState.SUCCEEDED));
+ TezVertexID.getInstance(dagId, 1), VertexState.SUCCEEDED));
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
- new TezVertexID(dagId, 2), VertexState.SUCCEEDED));
+ TezVertexID.getInstance(dagId, 2), VertexState.SUCCEEDED));
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
- new TezVertexID(dagId, 3), VertexState.SUCCEEDED));
+ TezVertexID.getInstance(dagId, 3), VertexState.SUCCEEDED));
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
- new TezVertexID(dagId, 4), VertexState.SUCCEEDED));
+ TezVertexID.getInstance(dagId, 4), VertexState.SUCCEEDED));
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
- new TezVertexID(dagId, 5), VertexState.SUCCEEDED));
+ TezVertexID.getInstance(dagId, 5), VertexState.SUCCEEDED));
dispatcher.await();
Assert.assertEquals(DAGState.SUCCEEDED, dag.getState());
Assert.assertEquals(6, dag.getSuccessfulVertices());
@@ -744,21 +744,21 @@ public class TestDAGImpl {
dispatcher.await();
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
- new TezVertexID(dagId, 0), VertexState.SUCCEEDED));
+ TezVertexID.getInstance(dagId, 0), VertexState.SUCCEEDED));
dispatcher.await();
Assert.assertEquals(DAGState.RUNNING, dag.getState());
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
- new TezVertexID(dagId, 1), VertexState.SUCCEEDED));
+ TezVertexID.getInstance(dagId, 1), VertexState.SUCCEEDED));
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
- new TezVertexID(dagId, 2), VertexState.FAILED));
+ TezVertexID.getInstance(dagId, 2), VertexState.FAILED));
dispatcher.await();
Assert.assertEquals(DAGState.FAILED, dag.getState());
Assert.assertEquals(2, dag.getSuccessfulVertices());
// Expect running vertices to be killed on first failure
for (int i = 3; i < 6; ++i) {
- TezVertexID vId = new TezVertexID(dagId, i);
+ TezVertexID vId = TezVertexID.getInstance(dagId, i);
Vertex v = dag.getVertex(vId);
Assert.assertEquals(VertexState.KILLED, v.getState());
}
@@ -779,16 +779,16 @@ public class TestDAGImpl {
dispatcher.await();
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
- new TezVertexID(dagId, 0), VertexState.SUCCEEDED));
+ TezVertexID.getInstance(dagId, 0), VertexState.SUCCEEDED));
dispatcher.await();
Assert.assertEquals(DAGState.RUNNING, dag.getState());
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
- new TezVertexID(dagId, 1), VertexState.SUCCEEDED));
+ TezVertexID.getInstance(dagId, 1), VertexState.SUCCEEDED));
dispatcher.getEventHandler().handle(new DAGEvent(dagId, DAGEventType.DAG_KILL));
for (int i = 2; i < 6; ++i) {
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
- new TezVertexID(dagId, i), VertexState.SUCCEEDED));
+ TezVertexID.getInstance(dagId, i), VertexState.SUCCEEDED));
}
dispatcher.await();
Assert.assertEquals(DAGState.KILLED, dag.getState());
@@ -810,27 +810,27 @@ public class TestDAGImpl {
dispatcher.await();
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
- new TezVertexID(dagId, 0), VertexState.SUCCEEDED));
+ TezVertexID.getInstance(dagId, 0), VertexState.SUCCEEDED));
dispatcher.await();
Assert.assertEquals(DAGState.RUNNING, dag.getState());
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
- new TezVertexID(dagId, 1), VertexState.SUCCEEDED));
+ TezVertexID.getInstance(dagId, 1), VertexState.SUCCEEDED));
dispatcher.getEventHandler().handle(new DAGEvent(dagId, DAGEventType.DAG_KILL));
for (int i = 2; i < 5; ++i) {
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
- new TezVertexID(dagId, i), VertexState.SUCCEEDED));
+ TezVertexID.getInstance(dagId, i), VertexState.SUCCEEDED));
}
dispatcher.await();
Assert.assertEquals(DAGState.KILLED, dag.getState());
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
- new TezVertexID(dagId, 5), VertexState.KILLED));
+ TezVertexID.getInstance(dagId, 5), VertexState.KILLED));
dispatcher.await();
Assert.assertEquals(DAGState.KILLED, dag.getState());
Assert.assertEquals(5, dag.getSuccessfulVertices());
- Assert.assertEquals(dag.getVertex(new TezVertexID(dagId, 5)).getTerminationCause(), VertexTerminationCause.DAG_KILL);
+ Assert.assertEquals(dag.getVertex(TezVertexID.getInstance(dagId, 5)).getTerminationCause(), VertexTerminationCause.DAG_KILL);
Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
index 8a67977..391dc0c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
@@ -78,13 +78,13 @@ public class TestDAGScheduler {
@Test(timeout=10000)
public void testDAGSchedulerMRR() {
DAG mockDag = mock(DAG.class);
- TezDAGID dagId = new TezDAGID("1", 1, 1);
+ TezDAGID dagId = TezDAGID.getInstance("1", 1, 1);
TaskSchedulerEventHandler mockTaskScheduler =
mock(TaskSchedulerEventHandler.class);
Vertex mockVertex1 = mock(Vertex.class);
- TezVertexID mockVertexId1 = new TezVertexID(dagId, 1);
+ TezVertexID mockVertexId1 = TezVertexID.getInstance(dagId, 1);
when(mockVertex1.getVertexId()).thenReturn(mockVertexId1);
when(mockVertex1.getDistanceFromRoot()).thenReturn(0);
TaskAttempt mockAttempt1 = mock(TaskAttempt.class);
@@ -93,7 +93,7 @@ public class TestDAGScheduler {
when(mockDag.getVertex(mockVertexId1)).thenReturn(mockVertex1);
Vertex mockVertex2 = mock(Vertex.class);
- TezVertexID mockVertexId2 = new TezVertexID(dagId, 2);
+ TezVertexID mockVertexId2 = TezVertexID.getInstance(dagId, 2);
when(mockVertex2.getVertexId()).thenReturn(mockVertexId2);
when(mockVertex2.getDistanceFromRoot()).thenReturn(1);
TaskAttempt mockAttempt2 = mock(TaskAttempt.class);
@@ -105,7 +105,7 @@ public class TestDAGScheduler {
when(mockAttempt2f.getIsRescheduled()).thenReturn(true);
Vertex mockVertex3 = mock(Vertex.class);
- TezVertexID mockVertexId3 = new TezVertexID(dagId, 3);
+ TezVertexID mockVertexId3 = TezVertexID.getInstance(dagId, 3);
when(mockVertex3.getVertexId()).thenReturn(mockVertexId3);
when(mockVertex3.getDistanceFromRoot()).thenReturn(2);
TaskAttempt mockAttempt3 = mock(TaskAttempt.class);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 6b8f544..dcf9aef 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -129,8 +129,8 @@ public class TestTaskAttempt {
hosts.add("host3");
TaskLocationHint locationHint = new TaskLocationHint(hosts, null);
- TezTaskID taskID = new TezTaskID(
- new TezVertexID(new TezDAGID("1", 1, 1), 1), 1);
+ TezTaskID taskID = TezTaskID.getInstance(
+ TezVertexID.getInstance(TezDAGID.getInstance("1", 1, 1), 1), 1);
TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
mock(TaskAttemptListener.class), new Configuration(), new SystemClock(),
mock(TaskHeartbeatHandler.class), mock(AppContext.class),
@@ -172,8 +172,8 @@ public class TestTaskAttempt {
TaskLocationHint locationHint = new TaskLocationHint(
new TreeSet<String>(Arrays.asList(hosts)), null);
- TezTaskID taskID = new TezTaskID(
- new TezVertexID(new TezDAGID("1", 1, 1), 1), 1);
+ TezTaskID taskID = TezTaskID.getInstance(
+ TezVertexID.getInstance(TezDAGID.getInstance("1", 1, 1), 1), 1);
TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
mock(TaskAttemptListener.class), new Configuration(),
new SystemClock(), mock(TaskHeartbeatHandler.class),
@@ -300,10 +300,10 @@ public class TestTaskAttempt {
ApplicationId appId = ApplicationId.newInstance(1, 2);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
appId, 0);
- TezDAGID dagID = new TezDAGID(appId, 1);
- TezVertexID vertexID = new TezVertexID(dagID, 1);
- TezTaskID taskID = new TezTaskID(vertexID, 1);
- TezTaskAttemptID taskAttemptID = new TezTaskAttemptID(taskID, 0);
+ TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+ TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
+ TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
+ TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 0);
MockEventHandler eventHandler = new MockEventHandler();
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
@@ -350,10 +350,10 @@ public class TestTaskAttempt {
ApplicationId appId = ApplicationId.newInstance(1, 2);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
appId, 0);
- TezDAGID dagID = new TezDAGID(appId, 1);
- TezVertexID vertexID = new TezVertexID(dagID, 1);
- TezTaskID taskID = new TezTaskID(vertexID, 1);
- TezTaskAttemptID taskAttemptID = new TezTaskAttemptID(taskID, 0);
+ TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+ TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
+ TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
+ TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 0);
MockEventHandler eventHandler = spy(new MockEventHandler());
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
@@ -441,10 +441,10 @@ public class TestTaskAttempt {
ApplicationId appId = ApplicationId.newInstance(1, 2);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
appId, 0);
- TezDAGID dagID = new TezDAGID(appId, 1);
- TezVertexID vertexID = new TezVertexID(dagID, 1);
- TezTaskID taskID = new TezTaskID(vertexID, 1);
- TezTaskAttemptID taskAttemptID = new TezTaskAttemptID(taskID, 0);
+ TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+ TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
+ TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
+ TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 0);
MockEventHandler eventHandler = new MockEventHandler();
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
@@ -503,10 +503,10 @@ public class TestTaskAttempt {
ApplicationId appId = ApplicationId.newInstance(1, 2);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
appId, 0);
- TezDAGID dagID = new TezDAGID(appId, 1);
- TezVertexID vertexID = new TezVertexID(dagID, 1);
- TezTaskID taskID = new TezTaskID(vertexID, 1);
- TezTaskAttemptID taskAttemptID = new TezTaskAttemptID(taskID, 0);
+ TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+ TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
+ TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
+ TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 0);
MockEventHandler eventHandler = spy(new MockEventHandler());
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
@@ -591,10 +591,10 @@ public class TestTaskAttempt {
ApplicationId appId = ApplicationId.newInstance(1, 2);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
appId, 0);
- TezDAGID dagID = new TezDAGID(appId, 1);
- TezVertexID vertexID = new TezVertexID(dagID, 1);
- TezTaskID taskID = new TezTaskID(vertexID, 1);
- TezTaskAttemptID taskAttemptID = new TezTaskAttemptID(taskID, 0);
+ TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+ TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
+ TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
+ TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 0);
MockEventHandler mockEh = new MockEventHandler();
MockEventHandler eventHandler = spy(mockEh);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 5bdc7d7..4cdc2fd 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -115,8 +115,8 @@ public class TestTaskImpl {
locationHint = new TaskLocationHint(null, null);
appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
- dagId = new TezDAGID(appId, 1);
- vertexId = new TezVertexID(dagId, 1);
+ dagId = TezDAGID.getInstance(appId, 1);
+ vertexId = TezVertexID.getInstance(dagId, 1);
appContext = mock(AppContext.class);
taskResource = Resource.newInstance(1024, 1);
localResources = new HashMap<String, LocalResource>();
@@ -134,7 +134,7 @@ public class TestTaskImpl {
}
private TezTaskID getNewTaskID() {
- TezTaskID taskID = new TezTaskID(vertexId, ++taskCounter);
+ TezTaskID taskID = TezTaskID.getInstance(vertexId, ++taskCounter);
return taskID;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index abb0d53..0b8fde0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -689,7 +689,7 @@ public class TestVertexImpl {
for (int i = 0; i < vCnt; ++i) {
VertexPlan vPlan = dagPlan.getVertex(i);
String vName = vPlan.getName();
- TezVertexID vertexId = new TezVertexID(dagId, i+1);
+ TezVertexID vertexId = TezVertexID.getInstance(dagId, i+1);
VertexImpl v;
if (useCustomInitializer) {
v = new VertexImplWithCustomInitializer(vertexId, vPlan, vPlan.getName(), conf,
@@ -753,7 +753,7 @@ public class TestVertexImpl {
conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
appAttemptId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(100, 1), 1);
- dagId = new TezDAGID(appAttemptId.getApplicationId(), 1);
+ dagId = TezDAGID.getInstance(appAttemptId.getApplicationId(), 1);
}
public void setupPostDagCreation() {
@@ -950,8 +950,8 @@ public class TestVertexImpl {
VertexImpl v = vertices.get("vertex2");
startVertex(v);
- TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
- TezTaskID t2 = new TezTaskID(v.getVertexId(), 1);
+ TezTaskID t1 = TezTaskID.getInstance(v.getVertexId(), 0);
+ TezTaskID t2 = TezTaskID.getInstance(v.getVertexId(), 1);
dispatcher.getEventHandler().handle(
new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
@@ -975,8 +975,8 @@ public class TestVertexImpl {
VertexImpl v = vertices.get("vertex2");
startVertex(v);
- TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
- TezTaskID t2 = new TezTaskID(v.getVertexId(), 1);
+ TezTaskID t1 = TezTaskID.getInstance(v.getVertexId(), 0);
+ TezTaskID t2 = TezTaskID.getInstance(v.getVertexId(), 1);
dispatcher.getEventHandler().handle(
new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
@@ -1003,7 +1003,7 @@ public class TestVertexImpl {
VertexImpl v = vertices.get("vertex2");
startVertex(v);
- TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
+ TezTaskID t1 = TezTaskID.getInstance(v.getVertexId(), 0);
dispatcher.getEventHandler().handle(
new VertexEventTaskCompleted(t1, TaskState.FAILED));
@@ -1052,13 +1052,13 @@ public class TestVertexImpl {
dispatcher.getEventHandler().handle(
new VertexEventTaskCompleted(
- new TezTaskID(v.getVertexId(), 0), TaskState.SUCCEEDED));
+ TezTaskID.getInstance(v.getVertexId(), 0), TaskState.SUCCEEDED));
dispatcher.await();
Assert.assertEquals(VertexState.KILLED, v.getState());
dispatcher.getEventHandler().handle(
new VertexEventTaskCompleted(
- new TezTaskID(v.getVertexId(), 1), TaskState.KILLED));
+ TezTaskID.getInstance(v.getVertexId(), 1), TaskState.KILLED));
dispatcher.await();
Assert.assertEquals(VertexState.KILLED, v.getState());
}
@@ -1078,13 +1078,13 @@ public class TestVertexImpl {
dispatcher.getEventHandler().handle(
new VertexEventTaskCompleted(
- new TezTaskID(v.getVertexId(), 0), TaskState.SUCCEEDED));
+ TezTaskID.getInstance(v.getVertexId(), 0), TaskState.SUCCEEDED));
dispatcher.await();
Assert.assertEquals(VertexState.KILLED, v.getState());
dispatcher.getEventHandler().handle(
new VertexEventTaskCompleted(
- new TezTaskID(v.getVertexId(), 1), TaskState.SUCCEEDED));
+ TezTaskID.getInstance(v.getVertexId(), 1), TaskState.SUCCEEDED));
dispatcher.await();
Assert.assertEquals(VertexState.KILLED, v.getState());
}
@@ -1097,8 +1097,8 @@ public class TestVertexImpl {
VertexImpl v = vertices.get("vertex2");
startVertex(v);
- TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
- TezTaskID t2 = new TezTaskID(v.getVertexId(), 1);
+ TezTaskID t1 = TezTaskID.getInstance(v.getVertexId(), 0);
+ TezTaskID t2 = TezTaskID.getInstance(v.getVertexId(), 1);
dispatcher.getEventHandler().handle(
new VertexEventTaskCompleted(t1, TaskState.FAILED));
@@ -1144,8 +1144,8 @@ public class TestVertexImpl {
v.setVertexOutputCommitter(committer);
startVertex(v);
- TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
- TezTaskID t2 = new TezTaskID(v.getVertexId(), 1);
+ TezTaskID t1 = TezTaskID.getInstance(v.getVertexId(), 0);
+ TezTaskID t2 = TezTaskID.getInstance(v.getVertexId(), 1);
dispatcher.getEventHandler().handle(
new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
@@ -1210,17 +1210,17 @@ public class TestVertexImpl {
LOG.info("Verifying v6 state " + v6.getState());
Assert.assertEquals(VertexState.RUNNING, v6.getState());
- TezTaskID t1_v4 = new TezTaskID(v4.getVertexId(), 0);
- TezTaskID t2_v4 = new TezTaskID(v4.getVertexId(), 1);
- TezTaskID t1_v5 = new TezTaskID(v5.getVertexId(), 0);
- TezTaskID t2_v5 = new TezTaskID(v5.getVertexId(), 1);
+ TezTaskID t1_v4 = TezTaskID.getInstance(v4.getVertexId(), 0);
+ TezTaskID t2_v4 = TezTaskID.getInstance(v4.getVertexId(), 1);
+ TezTaskID t1_v5 = TezTaskID.getInstance(v5.getVertexId(), 0);
+ TezTaskID t2_v5 = TezTaskID.getInstance(v5.getVertexId(), 1);
- TezTaskAttemptID ta1_t1_v4 = new TezTaskAttemptID(t1_v4, 0);
- TezTaskAttemptID ta2_t1_v4 = new TezTaskAttemptID(t1_v4, 0);
- TezTaskAttemptID ta1_t2_v4 = new TezTaskAttemptID(t2_v4, 0);
- TezTaskAttemptID ta1_t1_v5 = new TezTaskAttemptID(t1_v5, 0);
- TezTaskAttemptID ta1_t2_v5 = new TezTaskAttemptID(t2_v5, 0);
- TezTaskAttemptID ta2_t2_v5 = new TezTaskAttemptID(t2_v5, 0);
+ TezTaskAttemptID ta1_t1_v4 = TezTaskAttemptID.getInstance(t1_v4, 0);
+ TezTaskAttemptID ta2_t1_v4 = TezTaskAttemptID.getInstance(t1_v4, 0);
+ TezTaskAttemptID ta1_t2_v4 = TezTaskAttemptID.getInstance(t2_v4, 0);
+ TezTaskAttemptID ta1_t1_v5 = TezTaskAttemptID.getInstance(t1_v5, 0);
+ TezTaskAttemptID ta1_t2_v5 = TezTaskAttemptID.getInstance(t2_v5, 0);
+ TezTaskAttemptID ta2_t2_v5 = TezTaskAttemptID.getInstance(t2_v5, 0);
v4.handle(new VertexEventTaskAttemptCompleted(ta1_t1_v4, TaskAttemptStateInternal.FAILED));
v4.handle(new VertexEventTaskAttemptCompleted(ta2_t1_v4, TaskAttemptStateInternal.SUCCEEDED));
@@ -1251,8 +1251,8 @@ public class TestVertexImpl {
VertexImpl v = vertices.get("vertex2");
startVertex(v);
- TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
- TezTaskID t2 = new TezTaskID(v.getVertexId(), 1);
+ TezTaskID t1 = TezTaskID.getInstance(v.getVertexId(), 0);
+ TezTaskID t2 = TezTaskID.getInstance(v.getVertexId(), 1);
dispatcher.getEventHandler().handle(
new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
@@ -1279,8 +1279,8 @@ public class TestVertexImpl {
startVertex(v);
- TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
- TezTaskID t2 = new TezTaskID(v.getVertexId(), 1);
+ TezTaskID t1 = TezTaskID.getInstance(v.getVertexId(), 0);
+ TezTaskID t2 = TezTaskID.getInstance(v.getVertexId(), 1);
dispatcher.getEventHandler().handle(
new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
@@ -1314,8 +1314,8 @@ public class TestVertexImpl {
startVertex(v);
- TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
- TezTaskID t2 = new TezTaskID(v.getVertexId(), 1);
+ TezTaskID t1 = TezTaskID.getInstance(v.getVertexId(), 0);
+ TezTaskID t2 = TezTaskID.getInstance(v.getVertexId(), 1);
dispatcher.getEventHandler().handle(
new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
@@ -1357,8 +1357,8 @@ public class TestVertexImpl {
startVertex(v);
- TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
- TezTaskID t2 = new TezTaskID(v.getVertexId(), 1);
+ TezTaskID t1 = TezTaskID.getInstance(v.getVertexId(), 0);
+ TezTaskID t2 = TezTaskID.getInstance(v.getVertexId(), 1);
dispatcher.getEventHandler().handle(
new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
@@ -1393,8 +1393,8 @@ public class TestVertexImpl {
startVertex(v);
- TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
- TezTaskID t2 = new TezTaskID(v.getVertexId(), 1);
+ TezTaskID t1 = TezTaskID.getInstance(v.getVertexId(), 0);
+ TezTaskID t2 = TezTaskID.getInstance(v.getVertexId(), 1);
dispatcher.getEventHandler().handle(
new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
@@ -1434,8 +1434,8 @@ public class TestVertexImpl {
startVertex(v);
- TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
- TezTaskID t2 = new TezTaskID(v.getVertexId(), 1);
+ TezTaskID t1 = TezTaskID.getInstance(v.getVertexId(), 0);
+ TezTaskID t2 = TezTaskID.getInstance(v.getVertexId(), 1);
dispatcher.getEventHandler().handle(
new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
@@ -1596,9 +1596,9 @@ public class TestVertexImpl {
public void testVertexWithNoTasks() {
TezVertexID vId = null;
try {
- TezDAGID invalidDagId = new TezDAGID(
+ TezDAGID invalidDagId = TezDAGID.getInstance(
dagId.getApplicationId(), 1000);
- vId = new TezVertexID(invalidDagId, 1);
+ vId = TezVertexID.getInstance(invalidDagId, 1);
VertexPlan vPlan = invalidDagPlan.getVertex(0);
VertexImpl v = new VertexImpl(vId, vPlan, vPlan.getName(), conf,
dispatcher.getEventHandler(), taskAttemptListener, fsTokens,
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
index f506c99..6c0ee10 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
@@ -60,11 +60,11 @@ public class TestVertexScheduler {
conf.setLong(TezConfiguration.TEZ_AM_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, 1000L);
ShuffleVertexManager scheduler = null;
EventHandler mockEventHandler = mock(EventHandler.class);
- TezDAGID dagId = new TezDAGID("1", 1, 1);
+ TezDAGID dagId = TezDAGID.getInstance("1", 1, 1);
HashMap<Vertex, Edge> mockInputVertices =
new HashMap<Vertex, Edge>();
Vertex mockSrcVertex1 = mock(Vertex.class);
- TezVertexID mockSrcVertexId1 = new TezVertexID(dagId, 1);
+ TezVertexID mockSrcVertexId1 = TezVertexID.getInstance(dagId, 1);
EdgeProperty eProp1 = new EdgeProperty(
EdgeProperty.DataMovementType.SCATTER_GATHER,
EdgeProperty.DataSourceType.PERSISTED,
@@ -73,7 +73,7 @@ public class TestVertexScheduler {
new InputDescriptor("in"));
when(mockSrcVertex1.getVertexId()).thenReturn(mockSrcVertexId1);
Vertex mockSrcVertex2 = mock(Vertex.class);
- TezVertexID mockSrcVertexId2 = new TezVertexID(dagId, 2);
+ TezVertexID mockSrcVertexId2 = TezVertexID.getInstance(dagId, 2);
EdgeProperty eProp2 = new EdgeProperty(
EdgeProperty.DataMovementType.SCATTER_GATHER,
EdgeProperty.DataSourceType.PERSISTED,
@@ -82,7 +82,7 @@ public class TestVertexScheduler {
new InputDescriptor("in"));
when(mockSrcVertex2.getVertexId()).thenReturn(mockSrcVertexId2);
Vertex mockSrcVertex3 = mock(Vertex.class);
- TezVertexID mockSrcVertexId3 = new TezVertexID(dagId, 3);
+ TezVertexID mockSrcVertexId3 = TezVertexID.getInstance(dagId, 3);
EdgeProperty eProp3 = new EdgeProperty(
EdgeProperty.DataMovementType.BROADCAST,
EdgeProperty.DataSourceType.PERSISTED,
@@ -92,7 +92,7 @@ public class TestVertexScheduler {
when(mockSrcVertex3.getVertexId()).thenReturn(mockSrcVertexId3);
Vertex mockManagedVertex = mock(Vertex.class);
- TezVertexID mockManagedVertexId = new TezVertexID(dagId, 4);
+ TezVertexID mockManagedVertexId = TezVertexID.getInstance(dagId, 4);
when(mockManagedVertex.getVertexId()).thenReturn(mockManagedVertexId);
when(mockManagedVertex.getInputVertices()).thenReturn(mockInputVertices);
@@ -107,13 +107,13 @@ public class TestVertexScheduler {
Assert.assertTrue(scheduler.bipartiteSources.containsKey(mockSrcVertexId2));
final HashMap<TezTaskID, Task> managedTasks = new HashMap<TezTaskID, Task>();
- final TezTaskID mockTaskId1 = new TezTaskID(mockManagedVertexId, 0);
+ final TezTaskID mockTaskId1 = TezTaskID.getInstance(mockManagedVertexId, 0);
managedTasks.put(mockTaskId1, null);
- final TezTaskID mockTaskId2 = new TezTaskID(mockManagedVertexId, 1);
+ final TezTaskID mockTaskId2 = TezTaskID.getInstance(mockManagedVertexId, 1);
managedTasks.put(mockTaskId2, null);
- final TezTaskID mockTaskId3 = new TezTaskID(mockManagedVertexId, 2);
+ final TezTaskID mockTaskId3 = TezTaskID.getInstance(mockManagedVertexId, 2);
managedTasks.put(mockTaskId3, null);
- final TezTaskID mockTaskId4 = new TezTaskID(mockManagedVertexId, 3);
+ final TezTaskID mockTaskId4 = TezTaskID.getInstance(mockManagedVertexId, 3);
managedTasks.put(mockTaskId4, null);
when(mockManagedVertex.getTotalTasks()).thenReturn(managedTasks.size());
@@ -151,13 +151,13 @@ public class TestVertexScheduler {
when(mockSrcVertex2.getTotalTasks()).thenReturn(2);
TezTaskAttemptID mockSrcAttemptId11 =
- new TezTaskAttemptID(new TezTaskID(mockSrcVertexId1, 0), 0);
+ TezTaskAttemptID.getInstance(TezTaskID.getInstance(mockSrcVertexId1, 0), 0);
TezTaskAttemptID mockSrcAttemptId12 =
- new TezTaskAttemptID(new TezTaskID(mockSrcVertexId1, 1), 0);
+ TezTaskAttemptID.getInstance(TezTaskID.getInstance(mockSrcVertexId1, 1), 0);
TezTaskAttemptID mockSrcAttemptId21 =
- new TezTaskAttemptID(new TezTaskID(mockSrcVertexId2, 0), 0);
+ TezTaskAttemptID.getInstance(TezTaskID.getInstance(mockSrcVertexId2, 0), 0);
TezTaskAttemptID mockSrcAttemptId31 =
- new TezTaskAttemptID(new TezTaskID(mockSrcVertexId3, 0), 0);
+ TezTaskAttemptID.getInstance(TezTaskID.getInstance(mockSrcVertexId3, 0), 0);
byte[] payload =
VertexManagerEventPayloadProto.newBuilder().setOutputSize(5000L).build().toByteArray();
@@ -231,11 +231,11 @@ public class TestVertexScheduler {
Configuration conf = new Configuration();
ShuffleVertexManager scheduler = null;
EventHandler mockEventHandler = mock(EventHandler.class);
- TezDAGID dagId = new TezDAGID("1", 1, 1);
+ TezDAGID dagId = TezDAGID.getInstance("1", 1, 1);
HashMap<Vertex, Edge> mockInputVertices =
new HashMap<Vertex, Edge>();
Vertex mockSrcVertex1 = mock(Vertex.class);
- TezVertexID mockSrcVertexId1 = new TezVertexID(dagId, 1);
+ TezVertexID mockSrcVertexId1 = TezVertexID.getInstance(dagId, 1);
EdgeProperty eProp1 = new EdgeProperty(
EdgeProperty.DataMovementType.SCATTER_GATHER,
EdgeProperty.DataSourceType.PERSISTED,
@@ -244,7 +244,7 @@ public class TestVertexScheduler {
new InputDescriptor("in"));
when(mockSrcVertex1.getVertexId()).thenReturn(mockSrcVertexId1);
Vertex mockSrcVertex2 = mock(Vertex.class);
- TezVertexID mockSrcVertexId2 = new TezVertexID(dagId, 2);
+ TezVertexID mockSrcVertexId2 = TezVertexID.getInstance(dagId, 2);
EdgeProperty eProp2 = new EdgeProperty(
EdgeProperty.DataMovementType.SCATTER_GATHER,
EdgeProperty.DataSourceType.PERSISTED,
@@ -253,7 +253,7 @@ public class TestVertexScheduler {
new InputDescriptor("in"));
when(mockSrcVertex2.getVertexId()).thenReturn(mockSrcVertexId2);
Vertex mockSrcVertex3 = mock(Vertex.class);
- TezVertexID mockSrcVertexId3 = new TezVertexID(dagId, 3);
+ TezVertexID mockSrcVertexId3 = TezVertexID.getInstance(dagId, 3);
EdgeProperty eProp3 = new EdgeProperty(
EdgeProperty.DataMovementType.BROADCAST,
EdgeProperty.DataSourceType.PERSISTED,
@@ -263,7 +263,7 @@ public class TestVertexScheduler {
when(mockSrcVertex3.getVertexId()).thenReturn(mockSrcVertexId3);
Vertex mockManagedVertex = mock(Vertex.class);
- TezVertexID mockManagedVertexId = new TezVertexID(dagId, 3);
+ TezVertexID mockManagedVertexId = TezVertexID.getInstance(dagId, 3);
when(mockManagedVertex.getVertexId()).thenReturn(mockManagedVertexId);
when(mockManagedVertex.getInputVertices()).thenReturn(mockInputVertices);
@@ -287,11 +287,11 @@ public class TestVertexScheduler {
Assert.assertTrue(scheduler.bipartiteSources.containsKey(mockSrcVertexId2));
HashMap<TezTaskID, Task> managedTasks = new HashMap<TezTaskID, Task>();
- TezTaskID mockTaskId1 = new TezTaskID(mockManagedVertexId, 0);
+ TezTaskID mockTaskId1 = TezTaskID.getInstance(mockManagedVertexId, 0);
managedTasks.put(mockTaskId1, null);
- TezTaskID mockTaskId2 = new TezTaskID(mockManagedVertexId, 1);
+ TezTaskID mockTaskId2 = TezTaskID.getInstance(mockManagedVertexId, 1);
managedTasks.put(mockTaskId2, null);
- TezTaskID mockTaskId3 = new TezTaskID(mockManagedVertexId, 2);
+ TezTaskID mockTaskId3 = TezTaskID.getInstance(mockManagedVertexId, 2);
managedTasks.put(mockTaskId3, null);
when(mockManagedVertex.getTotalTasks()).thenReturn(3);
@@ -344,15 +344,15 @@ public class TestVertexScheduler {
Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
TezTaskAttemptID mockSrcAttemptId11 =
- new TezTaskAttemptID(new TezTaskID(mockSrcVertexId1, 0), 0);
+ TezTaskAttemptID.getInstance(TezTaskID.getInstance(mockSrcVertexId1, 0), 0);
TezTaskAttemptID mockSrcAttemptId12 =
- new TezTaskAttemptID(new TezTaskID(mockSrcVertexId1, 1), 0);
+ TezTaskAttemptID.getInstance(TezTaskID.getInstance(mockSrcVertexId1, 1), 0);
TezTaskAttemptID mockSrcAttemptId21 =
- new TezTaskAttemptID(new TezTaskID(mockSrcVertexId2, 0), 0);
+ TezTaskAttemptID.getInstance(TezTaskID.getInstance(mockSrcVertexId2, 0), 0);
TezTaskAttemptID mockSrcAttemptId22 =
- new TezTaskAttemptID(new TezTaskID(mockSrcVertexId2, 1), 0);
+ TezTaskAttemptID.getInstance(TezTaskID.getInstance(mockSrcVertexId2, 1), 0);
TezTaskAttemptID mockSrcAttemptId31 =
- new TezTaskAttemptID(new TezTaskID(mockSrcVertexId3, 0), 0);
+ TezTaskAttemptID.getInstance(TezTaskID.getInstance(mockSrcVertexId3, 0), 0);
// min, max > 0 and min == max
scheduler = createScheduler(conf, mockManagedVertex, 0.25f, 0.25f);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index 4e30763..54fcde2 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -101,8 +101,8 @@ public class TestContainerReuse {
TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
CapturingEventHandler eventHandler = new CapturingEventHandler();
- TezDAGID dagID = new TezDAGID("0", 0, 0);
- TezVertexID vertexID = new TezVertexID(dagID, 1);
+ TezDAGID dagID = TezDAGID.getInstance("0", 0, 0);
+ TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
TezAMRMClientAsync<CookieContainerRequest> rmClient =
@@ -152,12 +152,12 @@ public class TestContainerReuse {
String [] defaultRack = {"/default-rack"};
- TezTaskAttemptID taID11 = new TezTaskAttemptID(
- new TezTaskID(vertexID, 1), 1);
- TezTaskAttemptID taID21 = new TezTaskAttemptID(
- new TezTaskID(vertexID, 2), 1);
- TezTaskAttemptID taID31 = new TezTaskAttemptID(
- new TezTaskID(vertexID, 3), 1);
+ TezTaskAttemptID taID11 = TezTaskAttemptID.getInstance(
+ TezTaskID.getInstance(vertexID, 1), 1);
+ TezTaskAttemptID taID21 = TezTaskAttemptID.getInstance(
+ TezTaskID.getInstance(vertexID, 2), 1);
+ TezTaskAttemptID taID31 = TezTaskAttemptID.getInstance(
+ TezTaskID.getInstance(vertexID, 3), 1);
TaskAttempt ta11 = mock(TaskAttempt.class);
TaskAttempt ta21 = mock(TaskAttempt.class);
TaskAttempt ta31 = mock(TaskAttempt.class);
@@ -238,8 +238,8 @@ public class TestContainerReuse {
TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
CapturingEventHandler eventHandler = new CapturingEventHandler();
- TezDAGID dagID = new TezDAGID("0", 0, 0);
- TezVertexID vertexID = new TezVertexID(dagID, 1);
+ TezDAGID dagID = TezDAGID.getInstance("0", 0, 0);
+ TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
TezAMRMClientAsync<CookieContainerRequest> rmClient =
@@ -288,9 +288,9 @@ public class TestContainerReuse {
String [] defaultRack = {"/default-rack"};
- TezTaskAttemptID taID11 = new TezTaskAttemptID(new TezTaskID(vertexID, 1), 1);
- TezTaskAttemptID taID21 = new TezTaskAttemptID(new TezTaskID(vertexID, 2), 1);
- TezTaskAttemptID taID31 = new TezTaskAttemptID(new TezTaskID(vertexID, 3), 1);
+ TezTaskAttemptID taID11 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID, 1), 1);
+ TezTaskAttemptID taID21 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID, 2), 1);
+ TezTaskAttemptID taID31 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID, 3), 1);
TaskAttempt ta11 = mock(TaskAttempt.class);
TaskAttempt ta21 = mock(TaskAttempt.class);
TaskAttempt ta31 = mock(TaskAttempt.class);
@@ -345,7 +345,7 @@ public class TestContainerReuse {
TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
CapturingEventHandler eventHandler = new CapturingEventHandler();
- TezDAGID dagID = new TezDAGID("0", 0, 0);
+ TezDAGID dagID = TezDAGID.getInstance("0", 0, 0);
AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
TezAMRMClientAsync<CookieContainerRequest> rmClient = spy(new AMRMClientAsyncForTest(rmClientCore, 100));
@@ -383,25 +383,25 @@ public class TestContainerReuse {
String []racks = {"/default-rack"};
Priority priority1 = Priority.newInstance(1);
- TezVertexID vertexID1 = new TezVertexID(dagID, 1);
+ TezVertexID vertexID1 = TezVertexID.getInstance(dagID, 1);
//Vertex 1, Task 1, Attempt 1, host1
- TezTaskAttemptID taID11 = new TezTaskAttemptID(new TezTaskID(vertexID1, 1), 1);
+ TezTaskAttemptID taID11 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 1), 1);
TaskAttempt ta11 = mock(TaskAttempt.class);
AMSchedulerEventTALaunchRequest lrEvent1 = createLaunchRequestEvent(taID11, ta11, resource1, host1, racks, priority1);
//Vertex 1, Task 2, Attempt 1, host1
- TezTaskAttemptID taID12 = new TezTaskAttemptID(new TezTaskID(vertexID1, 2), 1);
+ TezTaskAttemptID taID12 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 2), 1);
TaskAttempt ta12 = mock(TaskAttempt.class);
AMSchedulerEventTALaunchRequest lrEvent2 = createLaunchRequestEvent(taID12, ta12, resource1, host1, racks, priority1);
//Vertex 1, Task 3, Attempt 1, host2
- TezTaskAttemptID taID13 = new TezTaskAttemptID(new TezTaskID(vertexID1, 3), 1);
+ TezTaskAttemptID taID13 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 3), 1);
TaskAttempt ta13 = mock(TaskAttempt.class);
AMSchedulerEventTALaunchRequest lrEvent3 = createLaunchRequestEvent(taID13, ta13, resource1, host2, racks, priority1);
//Vertex 1, Task 4, Attempt 1, host2
- TezTaskAttemptID taID14 = new TezTaskAttemptID(new TezTaskID(vertexID1, 4), 1);
+ TezTaskAttemptID taID14 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 4), 1);
TaskAttempt ta14 = mock(TaskAttempt.class);
AMSchedulerEventTALaunchRequest lrEvent4 = createLaunchRequestEvent(taID14, ta14, resource1, host2, racks, priority1);
@@ -482,7 +482,7 @@ public class TestContainerReuse {
TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
CapturingEventHandler eventHandler = new CapturingEventHandler();
- TezDAGID dagID = new TezDAGID("0", 0, 0);
+ TezDAGID dagID = TezDAGID.getInstance("0", 0, 0);
AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
TezAMRMClientAsync<CookieContainerRequest> rmClient =
@@ -529,17 +529,17 @@ public class TestContainerReuse {
Priority priority = Priority.newInstance(3);
- TezVertexID vertexID = new TezVertexID(dagID, 1);
+ TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
//Vertex 1, Task 1, Attempt 1, no locality information.
- TezTaskAttemptID taID11 = new TezTaskAttemptID(new TezTaskID(vertexID, 1), 1);
+ TezTaskAttemptID taID11 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID, 1), 1);
TaskAttempt ta11 = mock(TaskAttempt.class);
doReturn(vertexID).when(ta11).getVertexID();
AMSchedulerEventTALaunchRequest lrEvent11 = createLaunchRequestEvent(
taID11, ta11, resource1, emptyHosts, racks, priority);
//Vertex1, Task2, Attempt 1, no locality information.
- TezTaskAttemptID taID12 = new TezTaskAttemptID(new TezTaskID(vertexID, 2), 1);
+ TezTaskAttemptID taID12 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID, 2), 1);
TaskAttempt ta12 = mock(TaskAttempt.class);
doReturn(vertexID).when(ta12).getVertexID();
AMSchedulerEventTALaunchRequest lrEvent12 = createLaunchRequestEvent(
@@ -604,7 +604,7 @@ public class TestContainerReuse {
TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
CapturingEventHandler eventHandler = new CapturingEventHandler();
- TezDAGID dagID = new TezDAGID("0", 0, 0);
+ TezDAGID dagID = TezDAGID.getInstance("0", 0, 0);
AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
TezAMRMClientAsync<CookieContainerRequest> rmClient =
@@ -652,20 +652,20 @@ public class TestContainerReuse {
Priority priority1 = Priority.newInstance(3);
Priority priority2 = Priority.newInstance(4);
- TezVertexID vertexID1 = new TezVertexID(dagID, 1);
- TezVertexID vertexID2 = new TezVertexID(dagID, 2);
+ TezVertexID vertexID1 = TezVertexID.getInstance(dagID, 1);
+ TezVertexID vertexID2 = TezVertexID.getInstance(dagID, 2);
//Vertex 1, Task 1, Attempt 1, host1
- TezTaskAttemptID taID11 = new TezTaskAttemptID(
- new TezTaskID(vertexID1, 1), 1);
+ TezTaskAttemptID taID11 = TezTaskAttemptID.getInstance(
+ TezTaskID.getInstance(vertexID1, 1), 1);
TaskAttempt ta11 = mock(TaskAttempt.class);
doReturn(vertexID1).when(ta11).getVertexID();
AMSchedulerEventTALaunchRequest lrEvent11 = createLaunchRequestEvent(
taID11, ta11, resource1, host1, racks, priority1);
//Vertex2, Task1, Attempt 1, host1
- TezTaskAttemptID taID21 = new TezTaskAttemptID(
- new TezTaskID(vertexID2, 1), 1);
+ TezTaskAttemptID taID21 = TezTaskAttemptID.getInstance(
+ TezTaskID.getInstance(vertexID2, 1), 1);
TaskAttempt ta21 = mock(TaskAttempt.class);
doReturn(vertexID2).when(ta21).getVertexID();
AMSchedulerEventTALaunchRequest lrEvent21 = createLaunchRequestEvent(
@@ -728,7 +728,7 @@ public class TestContainerReuse {
String[] hosts, String[] racks, Priority priority,
ContainerContext containerContext) {
AMSchedulerEventTALaunchRequest lr = new AMSchedulerEventTALaunchRequest(
- taID, capability, new TaskSpec(taID, "user", "vertexName",
+ taID, capability, new TaskSpec(taID, "vertexName",
new ProcessorDescriptor("processorClassName"),
Collections.singletonList(new InputSpec("vertexName",
new InputDescriptor("inputClassName"), 1)),
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index fc89e82..9298cca 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -273,7 +273,7 @@ public class TestAMContainer {
wc.assignTaskAttempt(wc.taskAttemptID);
wc.verifyState(AMContainerState.IDLE);
- TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2);
+ TezTaskAttemptID taID2 = TezTaskAttemptID.getInstance(wc.taskID, 2);
wc.assignTaskAttempt(taID2);
wc.verifyState(AMContainerState.STOP_REQUESTED);
@@ -313,7 +313,7 @@ public class TestAMContainer {
wc.pullTaskToRun();
wc.verifyState(AMContainerState.RUNNING);
- TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2);
+ TezTaskAttemptID taID2 = TezTaskAttemptID.getInstance(wc.taskID, 2);
wc.assignTaskAttempt(taID2);
wc.verifyState(AMContainerState.STOP_REQUESTED);
@@ -352,7 +352,7 @@ public class TestAMContainer {
wc.pullTaskToRun();
wc.verifyState(AMContainerState.LAUNCHING);
- TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2);
+ TezTaskAttemptID taID2 = TezTaskAttemptID.getInstance(wc.taskID, 2);
wc.assignTaskAttempt(taID2);
wc.verifyState(AMContainerState.STOP_REQUESTED);
@@ -579,7 +579,7 @@ public class TestAMContainer {
wc.containerCompleted();
wc.verifyState(AMContainerState.COMPLETED);
- TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2);
+ TezTaskAttemptID taID2 = TezTaskAttemptID.getInstance(wc.taskID, 2);
wc.assignTaskAttempt(taID2);
@@ -656,7 +656,7 @@ public class TestAMContainer {
wc.taskAttemptSucceeded(wc.taskAttemptID);
wc.verifyState(AMContainerState.IDLE);
- TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2);
+ TezTaskAttemptID taID2 = TezTaskAttemptID.getInstance(wc.taskID, 2);
wc.assignTaskAttempt(taID2);
wc.pullTaskToRun();
wc.taskAttemptSucceeded(taID2);
@@ -702,7 +702,7 @@ public class TestAMContainer {
wc.pullTaskToRun();
wc.taskAttemptSucceeded(wc.taskAttemptID);
- TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2);
+ TezTaskAttemptID taID2 = TezTaskAttemptID.getInstance(wc.taskID, 2);
wc.assignTaskAttempt(taID2);
wc.pullTaskToRun();
wc.verifyState(AMContainerState.RUNNING);
@@ -748,7 +748,7 @@ public class TestAMContainer {
wc.pullTaskToRun();
wc.taskAttemptSucceeded(wc.taskAttemptID);
- TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2);
+ TezTaskAttemptID taID2 = TezTaskAttemptID.getInstance(wc.taskID, 2);
wc.assignTaskAttempt(taID2);
wc.pullTaskToRun();
wc.taskAttemptSucceeded(taID2);
@@ -780,7 +780,7 @@ public class TestAMContainer {
wc.pullTaskToRun();
wc.taskAttemptSucceeded(wc.taskAttemptID);
- TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2);
+ TezTaskAttemptID taID2 = TezTaskAttemptID.getInstance(wc.taskID, 2);
wc.assignTaskAttempt(taID2);
wc.pullTaskToRun();
wc.taskAttemptSucceeded(taID2);
@@ -854,10 +854,10 @@ public class TestAMContainer {
when(appContext.getApplicationAttemptId()).thenReturn(appAttemptID);
when(appContext.getApplicationID()).thenReturn(applicationID);
- dagID = new TezDAGID(applicationID, 1);
- vertexID = new TezVertexID(dagID, 1);
- taskID = new TezTaskID(vertexID, 1);
- taskAttemptID = new TezTaskAttemptID(taskID, 1);
+ dagID = TezDAGID.getInstance(applicationID, 1);
+ vertexID = TezVertexID.getInstance(dagID, 1);
+ taskID = TezTaskID.getInstance(vertexID, 1);
+ taskAttemptID = TezTaskAttemptID.getInstance(taskID, 1);
taskSpec = mock(TaskSpec.class);
doReturn(taskAttemptID).when(taskSpec).getTaskAttemptID();