You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2017/03/14 20:11:47 UTC
tez git commit: TEZ-1526. LoadingCache for TezTaskID slow for large
jobs (jeagles)
Repository: tez
Updated Branches:
refs/heads/master 5f953bfd9 -> 57c857d26
TEZ-1526. LoadingCache for TezTaskID slow for large jobs (jeagles)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/57c857d2
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/57c857d2
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/57c857d2
Branch: refs/heads/master
Commit: 57c857d267f17dd4e47b53d7691996d73c4476a1
Parents: 5f953bf
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Tue Mar 14 15:11:36 2017 -0500
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Tue Mar 14 15:11:36 2017 -0500
----------------------------------------------------------------------
.../org/apache/tez/dag/records/TezDAGID.java | 64 +++++++-------------
.../java/org/apache/tez/dag/records/TezID.java | 21 +++++++
.../tez/dag/records/TezTaskAttemptID.java | 57 ++++++-----------
.../org/apache/tez/dag/records/TezTaskID.java | 51 ++++++----------
.../org/apache/tez/dag/records/TezVertexID.java | 48 ++++++---------
...tesianProductVertexManagerUnpartitioned.java | 6 +-
6 files changed, 103 insertions(+), 144 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/57c857d2/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java
index b7a2c8f..2e3309e 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java
@@ -21,15 +21,12 @@ package org.apache.tez.dag.records;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.text.NumberFormat;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import com.google.common.base.Preconditions;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
+import org.apache.tez.util.FastNumberFormat;
/**
* TezDAGID represents the immutable and unique identifier for
@@ -43,16 +40,7 @@ import com.google.common.cache.LoadingCache;
*/
public class TezDAGID extends TezID {
- private static LoadingCache<TezDAGID, TezDAGID> dagIdCache = CacheBuilder.newBuilder().softValues().
- build(
- new CacheLoader<TezDAGID, TezDAGID>() {
- @Override
- public TezDAGID load(TezDAGID key) throws Exception {
- return key;
- }
- }
- );
-
+ private static TezIDCache<TezDAGID> tezDAGIDCache = new TezIDCache<>();
private ApplicationId applicationId;
/**
@@ -65,13 +53,12 @@ public class TezDAGID extends TezID {
// will be short-lived.
// Alternately the cache can be keyed by the hash of the incoming paramters.
Preconditions.checkArgument(applicationId != null, "ApplicationID cannot be null");
- return dagIdCache.getUnchecked(new TezDAGID(applicationId, id));
+ return tezDAGIDCache.getInstance(new TezDAGID(applicationId, id));
}
@InterfaceAudience.Private
public static void clearCache() {
- dagIdCache.invalidateAll();
- dagIdCache.cleanUp();
+ tezDAGIDCache.clear();
}
/**
@@ -85,7 +72,7 @@ public class TezDAGID extends TezID {
// will be short-lived.
// Alternately the cache can be keyed by the hash of the incoming paramters.
Preconditions.checkArgument(yarnRMIdentifier != null, "yarnRMIdentifier cannot be null");
- return dagIdCache.getUnchecked(new TezDAGID(yarnRMIdentifier, appId, id));
+ return tezDAGIDCache.getInstance(new TezDAGID(yarnRMIdentifier, appId, id));
}
// Public for Writable serialization. Verify if this is actually required.
@@ -151,25 +138,14 @@ public class TezDAGID extends TezID {
// DO NOT CHANGE THIS. DAGClient replicates this code to create DAG id string
public static final String DAG = "dag";
- static final ThreadLocal<NumberFormat> tezAppIdFormat = new ThreadLocal<NumberFormat>() {
+ static final ThreadLocal<FastNumberFormat> tezAppIdFormat = new ThreadLocal<FastNumberFormat>() {
@Override
- public NumberFormat initialValue() {
- NumberFormat fmt = NumberFormat.getInstance();
- fmt.setGroupingUsed(false);
+ public FastNumberFormat initialValue() {
+ FastNumberFormat fmt = FastNumberFormat.getInstance();
fmt.setMinimumIntegerDigits(4);
return fmt;
}
};
-
- static final ThreadLocal<NumberFormat> tezDagIdFormat = new ThreadLocal<NumberFormat>() {
- @Override
- public NumberFormat initialValue() {
- NumberFormat fmt = NumberFormat.getInstance();
- fmt.setGroupingUsed(false);
- fmt.setMinimumIntegerDigits(1);
- return fmt;
- }
- };
@Override
public String toString() {
@@ -190,10 +166,15 @@ public class TezDAGID extends TezID {
throw new IllegalArgumentException("numDagsPerGroup has to be more than one. Got: " +
numDagsPerGroup);
}
- return DAG_GROUPID_PREFIX + SEPARATOR +
- getApplicationId().getClusterTimestamp() + SEPARATOR +
- tezAppIdFormat.get().format(getApplicationId().getId()) + SEPARATOR +
- tezDagIdFormat.get().format((getId() - 1) / numDagsPerGroup);
+ StringBuilder sb = new StringBuilder();
+ sb.append(DAG_GROUPID_PREFIX);
+ sb.append(SEPARATOR);
+ sb.append(getApplicationId().getClusterTimestamp());
+ sb.append(SEPARATOR);
+ tezAppIdFormat.get().format(getApplicationId().getId(), sb);
+ sb.append(SEPARATOR);
+ sb.append((id - 1) / numDagsPerGroup);
+ return sb.toString();
}
public static TezDAGID fromString(String dagId) {
@@ -225,12 +206,11 @@ public class TezDAGID extends TezID {
* @return the builder that was passed in
*/
protected StringBuilder appendTo(StringBuilder builder) {
- return builder.append(SEPARATOR).
- append(applicationId.getClusterTimestamp()).
- append(SEPARATOR).
- append(tezAppIdFormat.get().format(applicationId.getId())).
- append(SEPARATOR).
- append(tezDagIdFormat.get().format(id));
+ builder.append(SEPARATOR);
+ builder.append(applicationId.getClusterTimestamp());
+ builder.append(SEPARATOR);
+ tezAppIdFormat.get().format(applicationId.getId(), builder);
+ return builder.append(SEPARATOR).append(id);
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/57c857d2/tez-common/src/main/java/org/apache/tez/dag/records/TezID.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezID.java
index 7efbd9a..cd7b27d 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TezID.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezID.java
@@ -21,6 +21,8 @@ package org.apache.tez.dag.records;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.lang.ref.WeakReference;
+import java.util.WeakHashMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -40,6 +42,25 @@ public abstract class TezID implements WritableComparable<TezID> {
public static final char SEPARATOR = '_';
protected int id;
+ public static class TezIDCache<T> {
+ private final WeakHashMap<T, WeakReference<T>> cache = new WeakHashMap<>();
+
+ synchronized T getInstance(final T id) {
+ final WeakReference<T> cached = cache.get(id);
+ if (cached != null) {
+ final T value = cached.get();
+ if (value != null)
+ return value;
+ }
+ cache.put(id, new WeakReference<T>(id));
+ return id;
+ }
+
+ synchronized void clear() {
+ cache.clear();
+ }
+ }
+
/** constructs an ID object from the given int */
public TezID(int id) {
this.id = id;
http://git-wip-us.apache.org/repos/asf/tez/blob/57c857d2/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java
index 296d577..7aee80f 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java
@@ -21,15 +21,10 @@ package org.apache.tez.dag.records;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.text.NumberFormat;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-
/**
* TezTaskAttemptID represents the immutable and unique identifier for
* a task attempt. Each task attempt is one particular instance of a Tez Task
@@ -50,17 +45,9 @@ import com.google.common.cache.LoadingCache;
public class TezTaskAttemptID extends TezID {
public static final String ATTEMPT = "attempt";
private TezTaskID taskId;
-
- private static LoadingCache<TezTaskAttemptID, TezTaskAttemptID> taskAttemptIDCache = CacheBuilder.newBuilder().softValues().
- build(
- new CacheLoader<TezTaskAttemptID, TezTaskAttemptID>() {
- @Override
- public TezTaskAttemptID load(TezTaskAttemptID key) throws Exception {
- return key;
- }
- }
- );
-
+
+ private static TezIDCache<TezTaskAttemptID> tezTaskAttemptIDCache = new TezIDCache<>();
+
// Public for Writable serialization. Verify if this is actually required.
public TezTaskAttemptID() {
}
@@ -71,13 +58,12 @@ public class TezTaskAttemptID extends TezID {
* @param id the task attempt number
*/
public static TezTaskAttemptID getInstance(TezTaskID taskID, int id) {
- return taskAttemptIDCache.getUnchecked(new TezTaskAttemptID(taskID, id));
+ return tezTaskAttemptIDCache.getInstance(new TezTaskAttemptID(taskID, id));
}
@InterfaceAudience.Private
public static void clearCache() {
- taskAttemptIDCache.invalidateAll();
- taskAttemptIDCache.cleanUp();
+ tezTaskAttemptIDCache.clear();
}
private TezTaskAttemptID(TezTaskID taskId, int id) {
@@ -108,7 +94,9 @@ public class TezTaskAttemptID extends TezID {
* @return the builder that was passed in.
*/
protected StringBuilder appendTo(StringBuilder builder) {
- return taskId.appendTo(builder).append(SEPARATOR).append(id);
+ taskId.appendTo(builder);
+ builder.append(SEPARATOR);
+ return builder.append(id);
}
@Override
@@ -151,25 +139,20 @@ public class TezTaskAttemptID extends TezID {
super.write(out);
}
- protected static final ThreadLocal<NumberFormat> tezTaskAttemptIdFormat = new ThreadLocal<NumberFormat>() {
- @Override
- public NumberFormat initialValue() {
- NumberFormat fmt = NumberFormat.getInstance();
- fmt.setGroupingUsed(false);
- fmt.setMinimumIntegerDigits(1);
- return fmt;
- }
- };
-
public static TezTaskAttemptID fromString(String taIdStr) {
try {
- String[] split = taIdStr.split("_");
- String rmId = split[1];
- int appId = TezDAGID.tezAppIdFormat.get().parse(split[2]).intValue();
- int dagId = TezDAGID.tezDagIdFormat.get().parse(split[3]).intValue();
- int vId = TezVertexID.tezVertexIdFormat.get().parse(split[4]).intValue();
- int taskId = TezTaskID.tezTaskIdFormat.get().parse(split[5]).intValue();
- int id = tezTaskAttemptIdFormat.get().parse(split[6]).intValue();
+ int pos1 = taIdStr.indexOf(SEPARATOR);
+ int pos2 = taIdStr.indexOf(SEPARATOR, pos1 + 1);
+ int pos3 = taIdStr.indexOf(SEPARATOR, pos2 + 1);
+ int pos4 = taIdStr.indexOf(SEPARATOR, pos3 + 1);
+ int pos5 = taIdStr.indexOf(SEPARATOR, pos4 + 1);
+ int pos6 = taIdStr.indexOf(SEPARATOR, pos5 + 1);
+ String rmId = taIdStr.substring(pos1 + 1, pos2);
+ int appId = Integer.parseInt(taIdStr.substring(pos2 + 1, pos3));
+ int dagId = Integer.parseInt(taIdStr.substring(pos3 + 1, pos4));
+ int vId = Integer.parseInt(taIdStr.substring(pos4 + 1, pos5));
+ int taskId = Integer.parseInt(taIdStr.substring(pos5 + 1, pos6));
+ int id = Integer.parseInt(taIdStr.substring(pos6 + 1));
return TezTaskAttemptID.getInstance(
TezTaskID.getInstance(
http://git-wip-us.apache.org/repos/asf/tez/blob/57c857d2/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java
index 3d28348..3295f6a 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java
@@ -21,16 +21,12 @@ package org.apache.tez.dag.records;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.text.NumberFormat;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import com.google.common.base.Preconditions;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-
+import org.apache.tez.util.FastNumberFormat;
/**
* TaskID represents the immutable and unique identifier for
@@ -46,26 +42,16 @@ public class TezTaskID extends TezID {
public static final String TASK = "task";
private final int serializingHash;
- static final ThreadLocal<NumberFormat> tezTaskIdFormat = new ThreadLocal<NumberFormat>() {
+ static final ThreadLocal<FastNumberFormat> tezTaskIdFormat = new ThreadLocal<FastNumberFormat>() {
@Override
- public NumberFormat initialValue() {
- NumberFormat fmt = NumberFormat.getInstance();
- fmt.setGroupingUsed(false);
+ public FastNumberFormat initialValue() {
+ FastNumberFormat fmt = FastNumberFormat.getInstance();
fmt.setMinimumIntegerDigits(6);
return fmt;
}
};
- private static LoadingCache<TezTaskID, TezTaskID> taskIDCache = CacheBuilder.newBuilder().softValues().
- build(
- new CacheLoader<TezTaskID, TezTaskID>() {
- @Override
- public TezTaskID load(TezTaskID key) throws Exception {
- return key;
- }
- }
- );
-
+ private static TezIDCache<TezTaskID> tezTaskIDCache = new TezIDCache<>();
private TezVertexID vertexId;
/**
@@ -75,13 +61,12 @@ public class TezTaskID extends TezID {
*/
public static TezTaskID getInstance(TezVertexID vertexID, int id) {
Preconditions.checkArgument(vertexID != null, "vertexID cannot be null");
- return taskIDCache.getUnchecked(new TezTaskID(vertexID, id));
+ return tezTaskIDCache.getInstance(new TezTaskID(vertexID, id));
}
@InterfaceAudience.Private
public static void clearCache() {
- taskIDCache.invalidateAll();
- taskIDCache.cleanUp();
+ tezTaskIDCache.clear();
}
private TezTaskID(TezVertexID vertexID, int id) {
@@ -130,9 +115,9 @@ public class TezTaskID extends TezID {
* @return the builder that was passed in
*/
protected StringBuilder appendTo(StringBuilder builder) {
- return vertexId.appendTo(builder).
- append(SEPARATOR).
- append(tezTaskIdFormat.get().format(id));
+ vertexId.appendTo(builder);
+ builder.append(SEPARATOR);
+ return tezTaskIdFormat.get().format(id, builder);
}
@Override
@@ -170,12 +155,16 @@ public class TezTaskID extends TezID {
public static TezTaskID fromString(String taskIdStr) {
try {
- String[] split = taskIdStr.split("_");
- String rmId = split[1];
- int appId = TezDAGID.tezAppIdFormat.get().parse(split[2]).intValue();
- int dagId = TezDAGID.tezDagIdFormat.get().parse(split[3]).intValue();
- int vId = TezVertexID.tezVertexIdFormat.get().parse(split[4]).intValue();
- int id = tezTaskIdFormat.get().parse(split[5]).intValue();
+ int pos1 = taskIdStr.indexOf(SEPARATOR);
+ int pos2 = taskIdStr.indexOf(SEPARATOR, pos1 + 1);
+ int pos3 = taskIdStr.indexOf(SEPARATOR, pos2 + 1);
+ int pos4 = taskIdStr.indexOf(SEPARATOR, pos3 + 1);
+ int pos5 = taskIdStr.indexOf(SEPARATOR, pos4 + 1);
+ String rmId = taskIdStr.substring(pos1 + 1, pos2);
+ int appId = Integer.parseInt(taskIdStr.substring(pos2 + 1, pos3));
+ int dagId = Integer.parseInt(taskIdStr.substring(pos3 + 1, pos4));
+ int vId = Integer.parseInt(taskIdStr.substring(pos4 + 1, pos5));
+ int id = Integer.parseInt(taskIdStr.substring(pos5 + 1));
return TezTaskID.getInstance(
TezVertexID.getInstance(
http://git-wip-us.apache.org/repos/asf/tez/blob/57c857d2/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java
index d30df16..b56c9ad 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java
@@ -21,15 +21,12 @@ package org.apache.tez.dag.records;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.text.NumberFormat;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import com.google.common.base.Preconditions;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
+import org.apache.tez.util.FastNumberFormat;
/**
* TezVertexID represents the immutable and unique identifier for
@@ -46,27 +43,17 @@ import com.google.common.cache.LoadingCache;
@InterfaceStability.Stable
public class TezVertexID extends TezID {
public static final String VERTEX = "vertex";
- static final ThreadLocal<NumberFormat> tezVertexIdFormat = new ThreadLocal<NumberFormat>() {
+ static final ThreadLocal<FastNumberFormat> tezVertexIdFormat = new ThreadLocal<FastNumberFormat>() {
@Override
- public NumberFormat initialValue() {
- NumberFormat fmt = NumberFormat.getInstance();
- fmt.setGroupingUsed(false);
+ public FastNumberFormat initialValue() {
+ FastNumberFormat fmt = FastNumberFormat.getInstance();
fmt.setMinimumIntegerDigits(2);
return fmt;
}
};
- private static LoadingCache<TezVertexID, TezVertexID> vertexIDCache = CacheBuilder.newBuilder().softValues().
- build(
- new CacheLoader<TezVertexID, TezVertexID>() {
- @Override
- public TezVertexID load(TezVertexID key) throws Exception {
- return key;
- }
- }
- );
-
+ private static TezIDCache<TezVertexID> tezVertexIDCache = new TezIDCache<>();
private TezDAGID dagId;
// Public for Writable serialization. Verify if this is actually required.
@@ -80,13 +67,12 @@ public class TezVertexID extends TezID {
*/
public static TezVertexID getInstance(TezDAGID dagId, int id) {
Preconditions.checkArgument(dagId != null, "DagID cannot be null");
- return vertexIDCache.getUnchecked(new TezVertexID(dagId, id));
+ return tezVertexIDCache.getInstance(new TezVertexID(dagId, id));
}
@InterfaceAudience.Private
public static void clearCache() {
- vertexIDCache.invalidateAll();
- vertexIDCache.cleanUp();
+ tezVertexIDCache.clear();
}
private TezVertexID(TezDAGID dagId, int id) {
@@ -146,9 +132,9 @@ public class TezVertexID extends TezID {
* @return the builder that was passed in
*/
protected StringBuilder appendTo(StringBuilder builder) {
- return dagId.appendTo(builder).
- append(SEPARATOR).
- append(tezVertexIdFormat.get().format(id));
+ dagId.appendTo(builder);
+ builder.append(SEPARATOR);
+ return tezVertexIdFormat.get().format(id, builder);
}
@Override
@@ -158,12 +144,14 @@ public class TezVertexID extends TezID {
public static TezVertexID fromString(String vertexIdStr) {
try {
- String[] split = vertexIdStr.split("_");
- String rmId = split[1];
- int appId = TezDAGID.tezAppIdFormat.get().parse(split[2]).intValue();
- int dagId = TezDAGID.tezDagIdFormat.get().parse(split[3]).intValue();
- int id = tezVertexIdFormat.get().parse(split[4]).intValue();
-
+ int pos1 = vertexIdStr.indexOf(SEPARATOR);
+ int pos2 = vertexIdStr.indexOf(SEPARATOR, pos1 + 1);
+ int pos3 = vertexIdStr.indexOf(SEPARATOR, pos2 + 1);
+ int pos4 = vertexIdStr.indexOf(SEPARATOR, pos3 + 1);
+ String rmId = vertexIdStr.substring(pos1 + 1, pos2);
+ int appId = Integer.parseInt(vertexIdStr.substring(pos2 + 1, pos3));
+ int dagId = Integer.parseInt(vertexIdStr.substring(pos3 + 1, pos4));
+ int id = Integer.parseInt(vertexIdStr.substring(pos4 + 1));
return TezVertexID.getInstance(
TezDAGID.getInstance(rmId, appId, dagId),
id);
http://git-wip-us.apache.org/repos/asf/tez/blob/57c857d2/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java
index 31a3941..d2ce378 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java
@@ -290,11 +290,10 @@ public class TestCartesianProductVertexManagerUnpartitioned {
VertexManagerEvent vmEvent =
VertexManagerEvent.create("cp vertex", proto.toByteString().asReadOnlyByteBuffer());
- Formatter formatter = new Formatter();
for (int i = 0; i < desiredBytesPerGroup/outputBytesPerTaskV0; i++) {
vmEvent.setProducerAttemptIdentifier(
new TaskAttemptIdentifierImpl("dag", "v0", TezTaskAttemptID.fromString(
- formatter.format("attempt_1441301219877_0109_1_00_%06d_0", i).toString())));
+ String.format("attempt_1441301219877_0109_1_00_%06d_0", i))));
vertexManager.onVertexManagerEventReceived(vmEvent);
}
verify(context, never()).reconfigureVertex(anyInt(), any(VertexLocationHint.class),
@@ -313,10 +312,9 @@ public class TestCartesianProductVertexManagerUnpartitioned {
anyMapOf(String.class, EdgeProperty.class));
vmEvent.setProducerAttemptIdentifier(
new TaskAttemptIdentifierImpl("dag", "v1", TezTaskAttemptID.fromString(
- formatter.format("attempt_1441301219877_0109_1_01_%06d_0", i).toString())));
+ String.format("attempt_1441301219877_0109_1_01_%06d_0", i))));
vertexManager.onVertexManagerEventReceived(vmEvent);
}
- formatter.close();
verify(context, times(1)).reconfigureVertex(parallelismCaptor.capture(),
isNull(VertexLocationHint.class), edgePropertiesCaptor.capture());
Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue();