You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zh...@apache.org on 2017/03/28 19:48:21 UTC

[40/50] [abbrv] tez git commit: TEZ-1526. LoadingCache for TezTaskID slow for large jobs (jeagles)

TEZ-1526. LoadingCache for TezTaskID slow for large jobs (jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/57c857d2
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/57c857d2
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/57c857d2

Branch: refs/heads/TEZ-1190
Commit: 57c857d267f17dd4e47b53d7691996d73c4476a1
Parents: 5f953bf
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Tue Mar 14 15:11:36 2017 -0500
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Tue Mar 14 15:11:36 2017 -0500

----------------------------------------------------------------------
 .../org/apache/tez/dag/records/TezDAGID.java    | 64 +++++++-------------
 .../java/org/apache/tez/dag/records/TezID.java  | 21 +++++++
 .../tez/dag/records/TezTaskAttemptID.java       | 57 ++++++-----------
 .../org/apache/tez/dag/records/TezTaskID.java   | 51 ++++++----------
 .../org/apache/tez/dag/records/TezVertexID.java | 48 ++++++---------
 ...tesianProductVertexManagerUnpartitioned.java |  6 +-
 6 files changed, 103 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/57c857d2/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java
index b7a2c8f..2e3309e 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java
@@ -21,15 +21,12 @@ package org.apache.tez.dag.records;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.text.NumberFormat;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 
 import com.google.common.base.Preconditions;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
+import org.apache.tez.util.FastNumberFormat;
 
 /**
  * TezDAGID represents the immutable and unique identifier for
@@ -43,16 +40,7 @@ import com.google.common.cache.LoadingCache;
  */
 public class TezDAGID extends TezID {
 
-  private static LoadingCache<TezDAGID, TezDAGID> dagIdCache = CacheBuilder.newBuilder().softValues().
-      build(
-          new CacheLoader<TezDAGID, TezDAGID>() {
-            @Override
-            public TezDAGID load(TezDAGID key) throws Exception {
-              return key;
-            }
-          }
-      );
-  
+  private static TezIDCache<TezDAGID> tezDAGIDCache = new TezIDCache<>();
   private ApplicationId applicationId;
 
   /**
@@ -65,13 +53,12 @@ public class TezDAGID extends TezID {
     // will be short-lived.
     // Alternately the cache can be keyed by the hash of the incoming paramters.
     Preconditions.checkArgument(applicationId != null, "ApplicationID cannot be null");
-    return dagIdCache.getUnchecked(new TezDAGID(applicationId, id));
+    return tezDAGIDCache.getInstance(new TezDAGID(applicationId, id));
   }
 
   @InterfaceAudience.Private
   public static void clearCache() {
-    dagIdCache.invalidateAll();
-    dagIdCache.cleanUp();
+    tezDAGIDCache.clear();
   }
   
   /**
@@ -85,7 +72,7 @@ public class TezDAGID extends TezID {
     // will be short-lived.
     // Alternately the cache can be keyed by the hash of the incoming paramters.
     Preconditions.checkArgument(yarnRMIdentifier != null, "yarnRMIdentifier cannot be null");
-    return dagIdCache.getUnchecked(new TezDAGID(yarnRMIdentifier, appId, id));
+    return tezDAGIDCache.getInstance(new TezDAGID(yarnRMIdentifier, appId, id));
   }
   
   // Public for Writable serialization. Verify if this is actually required.
@@ -151,25 +138,14 @@ public class TezDAGID extends TezID {
 
   // DO NOT CHANGE THIS. DAGClient replicates this code to create DAG id string
   public static final String DAG = "dag";
-  static final ThreadLocal<NumberFormat> tezAppIdFormat = new ThreadLocal<NumberFormat>() {
+  static final ThreadLocal<FastNumberFormat> tezAppIdFormat = new ThreadLocal<FastNumberFormat>() {
     @Override
-    public NumberFormat initialValue() {
-      NumberFormat fmt = NumberFormat.getInstance();
-      fmt.setGroupingUsed(false);
+    public FastNumberFormat initialValue() {
+      FastNumberFormat fmt = FastNumberFormat.getInstance();
       fmt.setMinimumIntegerDigits(4);
       return fmt;
     }
   };
-  
-  static final ThreadLocal<NumberFormat> tezDagIdFormat = new ThreadLocal<NumberFormat>() {
-    @Override
-    public NumberFormat initialValue() {
-      NumberFormat fmt = NumberFormat.getInstance();
-      fmt.setGroupingUsed(false);
-      fmt.setMinimumIntegerDigits(1);
-      return fmt;
-    }
-  };
 
   @Override
   public String toString() {
@@ -190,10 +166,15 @@ public class TezDAGID extends TezID {
       throw new IllegalArgumentException("numDagsPerGroup has to be more than one. Got: " +
           numDagsPerGroup);
     }
-    return DAG_GROUPID_PREFIX + SEPARATOR +
-        getApplicationId().getClusterTimestamp() + SEPARATOR +
-        tezAppIdFormat.get().format(getApplicationId().getId()) + SEPARATOR +
-        tezDagIdFormat.get().format((getId() - 1) / numDagsPerGroup);
+    StringBuilder sb = new StringBuilder();
+    sb.append(DAG_GROUPID_PREFIX);
+    sb.append(SEPARATOR);
+    sb.append(getApplicationId().getClusterTimestamp());
+    sb.append(SEPARATOR);
+    tezAppIdFormat.get().format(getApplicationId().getId(), sb);
+    sb.append(SEPARATOR);
+    sb.append((id - 1) / numDagsPerGroup);
+    return sb.toString();
   }
 
   public static TezDAGID fromString(String dagId) {
@@ -225,12 +206,11 @@ public class TezDAGID extends TezID {
    * @return the builder that was passed in
    */
   protected StringBuilder appendTo(StringBuilder builder) {
-    return builder.append(SEPARATOR).
-                 append(applicationId.getClusterTimestamp()).
-                 append(SEPARATOR).
-                 append(tezAppIdFormat.get().format(applicationId.getId())).
-                 append(SEPARATOR).
-                 append(tezDagIdFormat.get().format(id));
+    builder.append(SEPARATOR);
+    builder.append(applicationId.getClusterTimestamp());
+    builder.append(SEPARATOR);
+    tezAppIdFormat.get().format(applicationId.getId(), builder);
+    return builder.append(SEPARATOR).append(id);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/57c857d2/tez-common/src/main/java/org/apache/tez/dag/records/TezID.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezID.java
index 7efbd9a..cd7b27d 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TezID.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezID.java
@@ -21,6 +21,8 @@ package org.apache.tez.dag.records;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.lang.ref.WeakReference;
+import java.util.WeakHashMap;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -40,6 +42,25 @@ public abstract class TezID implements WritableComparable<TezID> {
   public static final char SEPARATOR = '_';
   protected int id;
 
+  public static class TezIDCache<T> {
+    private final WeakHashMap<T, WeakReference<T>> cache = new WeakHashMap<>();
+
+    synchronized T getInstance(final T id) {
+      final WeakReference<T> cached = cache.get(id);
+      if (cached != null) {
+        final T value = cached.get();
+        if (value != null)
+          return value;
+      }
+      cache.put(id, new WeakReference<T>(id));
+      return id;
+    }
+
+    synchronized void clear() {
+      cache.clear();
+    }
+  }
+
   /** constructs an ID object from the given int */
   public TezID(int id) {
     this.id = id;

http://git-wip-us.apache.org/repos/asf/tez/blob/57c857d2/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java
index 296d577..7aee80f 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java
@@ -21,15 +21,10 @@ package org.apache.tez.dag.records;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.text.NumberFormat;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-
 /**
  * TezTaskAttemptID represents the immutable and unique identifier for
  * a task attempt. Each task attempt is one particular instance of a Tez Task
@@ -50,17 +45,9 @@ import com.google.common.cache.LoadingCache;
 public class TezTaskAttemptID extends TezID {
   public static final String ATTEMPT = "attempt";
   private TezTaskID taskId;
-  
-  private static LoadingCache<TezTaskAttemptID, TezTaskAttemptID> taskAttemptIDCache = CacheBuilder.newBuilder().softValues().
-      build(
-          new CacheLoader<TezTaskAttemptID, TezTaskAttemptID>() {
-            @Override
-            public TezTaskAttemptID load(TezTaskAttemptID key) throws Exception {
-              return key;
-            }
-          }
-      );
-  
+
+  private static TezIDCache<TezTaskAttemptID> tezTaskAttemptIDCache = new TezIDCache<>();
+
   // Public for Writable serialization. Verify if this is actually required.
   public TezTaskAttemptID() {
   }
@@ -71,13 +58,12 @@ public class TezTaskAttemptID extends TezID {
    * @param id the task attempt number
    */
   public static TezTaskAttemptID getInstance(TezTaskID taskID, int id) {
-    return taskAttemptIDCache.getUnchecked(new TezTaskAttemptID(taskID, id));
+    return tezTaskAttemptIDCache.getInstance(new TezTaskAttemptID(taskID, id));
   }
 
   @InterfaceAudience.Private
   public static void clearCache() {
-    taskAttemptIDCache.invalidateAll();
-    taskAttemptIDCache.cleanUp();
+    tezTaskAttemptIDCache.clear();
   }
 
   private TezTaskAttemptID(TezTaskID taskId, int id) {
@@ -108,7 +94,9 @@ public class TezTaskAttemptID extends TezID {
    * @return the builder that was passed in.
    */
   protected StringBuilder appendTo(StringBuilder builder) {
-    return taskId.appendTo(builder).append(SEPARATOR).append(id);
+    taskId.appendTo(builder);
+    builder.append(SEPARATOR);
+    return builder.append(id);
   }
   
   @Override
@@ -151,25 +139,20 @@ public class TezTaskAttemptID extends TezID {
     super.write(out);
   }
 
-  protected static final ThreadLocal<NumberFormat> tezTaskAttemptIdFormat = new ThreadLocal<NumberFormat>() {
-    @Override
-    public NumberFormat initialValue() {
-      NumberFormat fmt = NumberFormat.getInstance();
-      fmt.setGroupingUsed(false);
-      fmt.setMinimumIntegerDigits(1);
-      return fmt;
-    }
-  };
-
   public static TezTaskAttemptID fromString(String taIdStr) {
     try {
-      String[] split = taIdStr.split("_");
-      String rmId = split[1];
-      int appId = TezDAGID.tezAppIdFormat.get().parse(split[2]).intValue();
-      int dagId = TezDAGID.tezDagIdFormat.get().parse(split[3]).intValue();
-      int vId = TezVertexID.tezVertexIdFormat.get().parse(split[4]).intValue();
-      int taskId = TezTaskID.tezTaskIdFormat.get().parse(split[5]).intValue();
-      int id = tezTaskAttemptIdFormat.get().parse(split[6]).intValue();
+      int pos1 = taIdStr.indexOf(SEPARATOR);
+      int pos2 = taIdStr.indexOf(SEPARATOR, pos1 + 1);
+      int pos3 = taIdStr.indexOf(SEPARATOR, pos2 + 1);
+      int pos4 = taIdStr.indexOf(SEPARATOR, pos3 + 1);
+      int pos5 = taIdStr.indexOf(SEPARATOR, pos4 + 1);
+      int pos6 = taIdStr.indexOf(SEPARATOR, pos5 + 1);
+      String rmId = taIdStr.substring(pos1 + 1, pos2);
+      int appId = Integer.parseInt(taIdStr.substring(pos2 + 1, pos3));
+      int dagId = Integer.parseInt(taIdStr.substring(pos3 + 1, pos4));
+      int vId = Integer.parseInt(taIdStr.substring(pos4 + 1, pos5));
+      int taskId = Integer.parseInt(taIdStr.substring(pos5 + 1, pos6));
+      int id = Integer.parseInt(taIdStr.substring(pos6 + 1));
 
       return TezTaskAttemptID.getInstance(
           TezTaskID.getInstance(

http://git-wip-us.apache.org/repos/asf/tez/blob/57c857d2/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java
index 3d28348..3295f6a 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java
@@ -21,16 +21,12 @@ package org.apache.tez.dag.records;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.text.NumberFormat;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
 import com.google.common.base.Preconditions;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-
+import org.apache.tez.util.FastNumberFormat;
 
 /**
  * TaskID represents the immutable and unique identifier for
@@ -46,26 +42,16 @@ public class TezTaskID extends TezID {
   public static final String TASK = "task";
   private final int serializingHash;
   
-  static final ThreadLocal<NumberFormat> tezTaskIdFormat = new ThreadLocal<NumberFormat>() {
+  static final ThreadLocal<FastNumberFormat> tezTaskIdFormat = new ThreadLocal<FastNumberFormat>() {
     @Override
-    public NumberFormat initialValue() {
-      NumberFormat fmt = NumberFormat.getInstance();
-      fmt.setGroupingUsed(false);
+    public FastNumberFormat initialValue() {
+      FastNumberFormat fmt = FastNumberFormat.getInstance();
       fmt.setMinimumIntegerDigits(6);
       return fmt;
     }
   };
 
-  private static LoadingCache<TezTaskID, TezTaskID> taskIDCache = CacheBuilder.newBuilder().softValues().
-      build(
-          new CacheLoader<TezTaskID, TezTaskID>() {
-            @Override
-            public TezTaskID load(TezTaskID key) throws Exception {
-              return key;
-            }
-          }
-      );
-  
+  private static TezIDCache<TezTaskID> tezTaskIDCache = new TezIDCache<>();
   private TezVertexID vertexId;
 
   /**
@@ -75,13 +61,12 @@ public class TezTaskID extends TezID {
    */
   public static TezTaskID getInstance(TezVertexID vertexID, int id) {
     Preconditions.checkArgument(vertexID != null, "vertexID cannot be null");
-    return taskIDCache.getUnchecked(new TezTaskID(vertexID, id));
+    return tezTaskIDCache.getInstance(new TezTaskID(vertexID, id));
   }
 
   @InterfaceAudience.Private
   public static void clearCache() {
-    taskIDCache.invalidateAll();
-    taskIDCache.cleanUp();
+    tezTaskIDCache.clear();
   }
 
   private TezTaskID(TezVertexID vertexID, int id) {
@@ -130,9 +115,9 @@ public class TezTaskID extends TezID {
    * @return the builder that was passed in
    */
   protected StringBuilder appendTo(StringBuilder builder) {
-    return vertexId.appendTo(builder).
-                 append(SEPARATOR).
-                 append(tezTaskIdFormat.get().format(id));
+    vertexId.appendTo(builder);
+    builder.append(SEPARATOR);
+    return tezTaskIdFormat.get().format(id, builder);
   }
 
   @Override
@@ -170,12 +155,16 @@ public class TezTaskID extends TezID {
 
   public static TezTaskID fromString(String taskIdStr) {
     try {
-      String[] split = taskIdStr.split("_");
-      String rmId = split[1];
-      int appId = TezDAGID.tezAppIdFormat.get().parse(split[2]).intValue();
-      int dagId = TezDAGID.tezDagIdFormat.get().parse(split[3]).intValue();
-      int vId = TezVertexID.tezVertexIdFormat.get().parse(split[4]).intValue();
-      int id = tezTaskIdFormat.get().parse(split[5]).intValue();
+      int pos1 = taskIdStr.indexOf(SEPARATOR);
+      int pos2 = taskIdStr.indexOf(SEPARATOR, pos1 + 1);
+      int pos3 = taskIdStr.indexOf(SEPARATOR, pos2 + 1);
+      int pos4 = taskIdStr.indexOf(SEPARATOR, pos3 + 1);
+      int pos5 = taskIdStr.indexOf(SEPARATOR, pos4 + 1);
+      String rmId = taskIdStr.substring(pos1 + 1, pos2);
+      int appId = Integer.parseInt(taskIdStr.substring(pos2 + 1, pos3));
+      int dagId = Integer.parseInt(taskIdStr.substring(pos3 + 1, pos4));
+      int vId = Integer.parseInt(taskIdStr.substring(pos4 + 1, pos5));
+      int id = Integer.parseInt(taskIdStr.substring(pos5 + 1));
 
       return TezTaskID.getInstance(
               TezVertexID.getInstance(

http://git-wip-us.apache.org/repos/asf/tez/blob/57c857d2/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java
index d30df16..b56c9ad 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java
@@ -21,15 +21,12 @@ package org.apache.tez.dag.records;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.text.NumberFormat;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
 import com.google.common.base.Preconditions;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
+import org.apache.tez.util.FastNumberFormat;
 
 /**
  * TezVertexID represents the immutable and unique identifier for
@@ -46,27 +43,17 @@ import com.google.common.cache.LoadingCache;
 @InterfaceStability.Stable
 public class TezVertexID extends TezID {
   public static final String VERTEX = "vertex";
-  static final ThreadLocal<NumberFormat> tezVertexIdFormat = new ThreadLocal<NumberFormat>() {
+  static final ThreadLocal<FastNumberFormat> tezVertexIdFormat = new ThreadLocal<FastNumberFormat>() {
 
     @Override
-    public NumberFormat initialValue() {
-      NumberFormat fmt = NumberFormat.getInstance();
-      fmt.setGroupingUsed(false);
+    public FastNumberFormat initialValue() {
+      FastNumberFormat fmt = FastNumberFormat.getInstance();
       fmt.setMinimumIntegerDigits(2);
       return fmt;
     }
   };
 
-  private static LoadingCache<TezVertexID, TezVertexID> vertexIDCache = CacheBuilder.newBuilder().softValues().
-      build(
-          new CacheLoader<TezVertexID, TezVertexID>() {
-            @Override
-            public TezVertexID load(TezVertexID key) throws Exception {
-              return key;
-            }
-          }
-      );
-  
+  private static TezIDCache<TezVertexID> tezVertexIDCache = new TezIDCache<>();
   private TezDAGID dagId;
 
   // Public for Writable serialization. Verify if this is actually required.
@@ -80,13 +67,12 @@ public class TezVertexID extends TezID {
    */
   public static TezVertexID getInstance(TezDAGID dagId, int id) {
     Preconditions.checkArgument(dagId != null, "DagID cannot be null");
-    return vertexIDCache.getUnchecked(new TezVertexID(dagId, id));
+    return tezVertexIDCache.getInstance(new TezVertexID(dagId, id));
   }
 
   @InterfaceAudience.Private
   public static void clearCache() {
-    vertexIDCache.invalidateAll();
-    vertexIDCache.cleanUp();
+    tezVertexIDCache.clear();
   }
 
   private TezVertexID(TezDAGID dagId, int id) {
@@ -146,9 +132,9 @@ public class TezVertexID extends TezID {
    * @return the builder that was passed in
    */
   protected StringBuilder appendTo(StringBuilder builder) {
-    return dagId.appendTo(builder).
-        append(SEPARATOR).
-        append(tezVertexIdFormat.get().format(id));
+    dagId.appendTo(builder);
+    builder.append(SEPARATOR);
+    return tezVertexIdFormat.get().format(id, builder);
   }
 
   @Override
@@ -158,12 +144,14 @@ public class TezVertexID extends TezID {
 
   public static TezVertexID fromString(String vertexIdStr) {
     try {
-      String[] split = vertexIdStr.split("_");
-      String rmId = split[1];
-      int appId = TezDAGID.tezAppIdFormat.get().parse(split[2]).intValue();
-      int dagId = TezDAGID.tezDagIdFormat.get().parse(split[3]).intValue();
-      int id = tezVertexIdFormat.get().parse(split[4]).intValue();
-
+      int pos1 = vertexIdStr.indexOf(SEPARATOR);
+      int pos2 = vertexIdStr.indexOf(SEPARATOR, pos1 + 1);
+      int pos3 = vertexIdStr.indexOf(SEPARATOR, pos2 + 1);
+      int pos4 = vertexIdStr.indexOf(SEPARATOR, pos3 + 1);
+      String rmId = vertexIdStr.substring(pos1 + 1, pos2);
+      int appId = Integer.parseInt(vertexIdStr.substring(pos2 + 1, pos3));
+      int dagId = Integer.parseInt(vertexIdStr.substring(pos3 + 1, pos4));
+      int id = Integer.parseInt(vertexIdStr.substring(pos4 + 1));
       return TezVertexID.getInstance(
               TezDAGID.getInstance(rmId, appId, dagId),
               id);

http://git-wip-us.apache.org/repos/asf/tez/blob/57c857d2/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java
index 31a3941..d2ce378 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java
@@ -290,11 +290,10 @@ public class TestCartesianProductVertexManagerUnpartitioned {
     VertexManagerEvent vmEvent =
       VertexManagerEvent.create("cp vertex", proto.toByteString().asReadOnlyByteBuffer());
 
-    Formatter formatter = new Formatter();
     for (int i = 0; i < desiredBytesPerGroup/outputBytesPerTaskV0; i++) {
       vmEvent.setProducerAttemptIdentifier(
         new TaskAttemptIdentifierImpl("dag", "v0", TezTaskAttemptID.fromString(
-          formatter.format("attempt_1441301219877_0109_1_00_%06d_0", i).toString())));
+          String.format("attempt_1441301219877_0109_1_00_%06d_0", i))));
       vertexManager.onVertexManagerEventReceived(vmEvent);
     }
     verify(context, never()).reconfigureVertex(anyInt(), any(VertexLocationHint.class),
@@ -313,10 +312,9 @@ public class TestCartesianProductVertexManagerUnpartitioned {
         anyMapOf(String.class, EdgeProperty.class));
       vmEvent.setProducerAttemptIdentifier(
         new TaskAttemptIdentifierImpl("dag", "v1", TezTaskAttemptID.fromString(
-          formatter.format("attempt_1441301219877_0109_1_01_%06d_0", i).toString())));
+          String.format("attempt_1441301219877_0109_1_01_%06d_0", i))));
       vertexManager.onVertexManagerEventReceived(vmEvent);
     }
-    formatter.close();
     verify(context, times(1)).reconfigureVertex(parallelismCaptor.capture(),
       isNull(VertexLocationHint.class), edgePropertiesCaptor.capture());
     Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue();